CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/database/postgres-synctable.coffee
Views: 687
1
#########################################################################
2
# This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
# License: MS-RSL – see LICENSE.md for details
4
#########################################################################
5
6
# Server side synchronized tables built on PostgreSQL, and basic support
7
# for user get query updates.
8
9
EventEmitter = require('events')
10
11
immutable = require('immutable')
12
async = require('async')
13
14
{defaults, is_array} = misc = require('@cocalc/util/misc')
15
required = defaults.required
16
misc_node = require('@cocalc/backend/misc_node')
17
18
{pg_type, one_result, all_results, quote_field} = require('./postgres-base')
19
20
{SCHEMA} = require('@cocalc/util/schema')
21
22
{Changes} = require('./postgres/changefeed')
23
24
{ProjectAndUserTracker} = require('./postgres/project-and-user-tracker')
25
26
exports.extend_PostgreSQL = (ext) -> class PostgreSQL extends ext
27
28
_ensure_trigger_exists: (table, select, watch, cb) =>
29
dbg = @_dbg("_ensure_trigger_exists(#{table})")
30
dbg("select=#{misc.to_json(select)}")
31
if misc.len(select) == 0
32
cb('there must be at least one column selected')
33
return
34
tgname = trigger_name(table, select, watch)
35
trigger_exists = undefined
36
async.series([
37
(cb) =>
38
dbg("checking whether or not trigger exists")
39
@_query
40
query : "SELECT count(*) FROM pg_trigger WHERE tgname = '#{tgname}'"
41
cb : (err, result) =>
42
if err
43
cb(err)
44
else
45
trigger_exists = parseInt(result.rows[0].count) > 0
46
cb()
47
(cb) =>
48
if trigger_exists
49
dbg("trigger #{tgname} already exists")
50
cb()
51
return
52
dbg("creating trigger #{tgname}")
53
code = trigger_code(table, select, watch)
54
async.series([
55
(cb) =>
56
@_query
57
query : code.function
58
cb : cb
59
(cb) =>
60
@_query
61
query : code.trigger
62
cb : cb
63
], cb)
64
], cb)
65
66
_listen: (table, select, watch, cb) =>
67
dbg = @_dbg("_listen(#{table})")
68
dbg("select = #{misc.to_json(select)}")
69
if not misc.is_object(select)
70
cb('select must be an object')
71
return
72
if misc.len(select) == 0
73
cb('there must be at least one column')
74
return
75
if not misc.is_array(watch)
76
cb('watch must be an array')
77
return
78
@_listening ?= {}
79
tgname = trigger_name(table, select, watch)
80
if @_listening[tgname] > 0
81
dbg("already listening")
82
@_listening[tgname] += 1
83
cb?(undefined, tgname)
84
return
85
async.series([
86
(cb) =>
87
dbg("ensure trigger exists")
88
@_ensure_trigger_exists(table, select, watch, cb)
89
(cb) =>
90
dbg("add listener")
91
@_query
92
query : "LISTEN #{tgname}"
93
cb : cb
94
], (err) =>
95
if err
96
dbg("fail: err = #{err}")
97
cb?(err)
98
else
99
@_listening[tgname] ?= 0
100
@_listening[tgname] += 1
101
dbg("success")
102
cb?(undefined, tgname)
103
)
104
105
_notification: (mesg) =>
106
#@_dbg('notification')(misc.to_json(mesg)) # this is way too verbose...
107
@emit(mesg.channel, JSON.parse(mesg.payload))
108
109
_clear_listening_state: =>
110
@_listening = {}
111
112
_stop_listening: (table, select, watch, cb) =>
113
@_listening ?= {}
114
tgname = trigger_name(table, select, watch)
115
if not @_listening[tgname]? or @_listening[tgname] == 0
116
cb?()
117
return
118
if @_listening[tgname] > 0
119
@_listening[tgname] -= 1
120
if @_listening[tgname] == 0
121
@_query
122
query : "UNLISTEN #{tgname}"
123
cb : cb
124
125
# Server-side changefeed-updated table, which automatically restart changefeed
126
# on error, etc. See SyncTable docs where the class is defined.
127
synctable: (opts) =>
128
opts = defaults opts,
129
table : required
130
columns : undefined
131
where : undefined
132
limit : undefined
133
order_by : undefined
134
where_function : undefined # if given; a function of the *primary* key that returns true if and only if it matches the changefeed
135
idle_timeout_s : undefined # TODO: currently ignored
136
cb : undefined
137
if @is_standby
138
err = "synctable against standby database not allowed"
139
if opts.cb?
140
opts.cb(err)
141
return
142
else
143
throw Error(err)
144
return new SyncTable(@, opts.table, opts.columns, opts.where, opts.where_function, opts.limit, opts.order_by, opts.cb)
145
146
changefeed: (opts) =>
147
opts = defaults opts,
148
table : required # Name of the table
149
select : required # Map from field names to postgres data types. These must
150
# determine entries of table (e.g., primary key).
151
watch : required # Array of field names we watch for changes
152
where : required # Condition involving only the fields in select; or function taking obj with select and returning true or false
153
cb : required
154
if @is_standby
155
opts.cb?("changefeed against standby database not allowed")
156
return
157
new Changes(@, opts.table, opts.select, opts.watch, opts.where, opts.cb)
158
return
159
160
# Event emitter that changes to users of a project, and collabs of a user.
161
# If it emits 'error' -- which is can and will do sometimes -- then
162
# any client of this tracker must give up on using it!
163
project_and_user_tracker: (opts) =>
164
opts = defaults opts,
165
cb : required
166
if @_project_and_user_tracker?
167
opts.cb(undefined, @_project_and_user_tracker)
168
return
169
@_project_and_user_tracker_cbs ?= []
170
@_project_and_user_tracker_cbs.push(opts.cb)
171
if @_project_and_user_tracker_cbs.length > 1
172
return
173
tracker = new ProjectAndUserTracker(@)
174
tracker.once "error", =>
175
# delete, so that future calls create a new one.
176
delete @_project_and_user_tracker
177
try
178
await tracker.init()
179
@_project_and_user_tracker = tracker
180
for cb in @_project_and_user_tracker_cbs
181
cb(undefined, tracker)
182
delete @_project_and_user_tracker_cbs
183
catch err
184
for cb in @_project_and_user_tracker_cbs
185
cb(err)
186
187
class SyncTable extends EventEmitter
188
constructor: (_db, _table, _columns, _where, _where_function, _limit, _order_by, cb) ->
189
super()
190
@_db = _db
191
@_table = _table
192
@_columns = _columns
193
@_where = _where
194
@_where_function = _where_function
195
@_limit = _limit
196
@_order_by = _order_by
197
t = SCHEMA[@_table]
198
if not t?
199
@_state = 'error'
200
cb?("unknown table #{@_table}")
201
return
202
203
try
204
@_primary_key = @_db._primary_key(@_table)
205
catch e
206
cb?(e)
207
return
208
209
@_listen_columns = {"#{@_primary_key}" : pg_type(t.fields[@_primary_key])}
210
211
# We only trigger an update when one of the columns we care about actually changes.
212
213
if @_columns
214
@_watch_columns = misc.copy(@_columns) # don't include primary key since it can't change.
215
if @_primary_key not in @_columns
216
@_columns = @_columns.concat([@_primary_key]) # required
217
@_select_columns = @_columns
218
else
219
@_watch_columns = [] # means all of them
220
@_select_columns = misc.keys(SCHEMA[@_table].fields)
221
222
@_select_query = "SELECT #{(quote_field(x) for x in @_select_columns)} FROM #{@_table}"
223
224
@_init (err) =>
225
if err and not cb?
226
@emit("error", err)
227
return
228
@emit('init')
229
cb?(err, @)
230
231
_dbg: (f) =>
232
return @_db._dbg("SyncTable(table='#{@_table}').#{f}")
233
234
_query_opts: () =>
235
opts = {}
236
opts.query = @_select_query
237
opts.where = @_where
238
opts.limit = @_limit
239
opts.order_by = @_order_by
240
return opts
241
242
close: (cb) =>
243
@removeAllListeners()
244
@_db.removeListener(@_tgname, @_notification)
245
@_db.removeListener('connect', @_reconnect)
246
@_state = 'closed'
247
delete @_value
248
@_db._stop_listening(@_table, @_listen_columns, @_watch_columns, cb)
249
250
connect: (opts) =>
251
opts?.cb?() # NO-OP -- only needed for backward compatibility
252
253
_notification: (obj) =>
254
#console.log 'notification', obj
255
[action, new_val, old_val] = obj
256
if action == 'DELETE' or not new_val?
257
k = old_val[@_primary_key]
258
if @_value.has(k)
259
@_value = @_value.delete(k)
260
process.nextTick(=>@emit('change', k))
261
else
262
k = new_val[@_primary_key]
263
if @_where_function? and not @_where_function(k)
264
# doesn't match -- nothing to do -- ignore
265
return
266
@_changed[k] = true
267
@_update()
268
269
_init: (cb) =>
270
misc.retry_until_success
271
f : @_do_init
272
start_delay : 3000
273
max_delay : 10000
274
log : @_dbg("_init")
275
cb : cb
276
277
_do_init: (cb) =>
278
@_state = 'init' # 'init' -> ['error', 'ready'] -> 'closed'
279
@_value = immutable.Map()
280
@_changed = {}
281
async.series([
282
(cb) =>
283
# ensure database client is listening for primary keys changes to our table
284
@_db._listen @_table, @_listen_columns, @_watch_columns, (err, tgname) =>
285
@_tgname = tgname
286
@_db.on(@_tgname, @_notification)
287
cb(err)
288
(cb) =>
289
opts = @_query_opts()
290
opts.cb = (err, result) =>
291
if err
292
cb(err)
293
else
294
@_process_results(result.rows)
295
@_db.once('connect', @_reconnect)
296
cb()
297
@_db._query(opts)
298
(cb) =>
299
@_update(cb)
300
], (err) =>
301
if err
302
@_state = 'error'
303
cb(err)
304
else
305
@_state = 'ready'
306
cb()
307
)
308
309
_reconnect: (cb) =>
310
dbg = @_dbg("_reconnect")
311
if @_state != 'ready'
312
dbg("only attempt reconnect if we were already successfully connected at some point.")
313
return
314
# Everything was already initialized, but then the connection to the
315
# database was dropped... and then successfully re-connected. Now
316
# we need to (1) setup everything again, and (2) send out notifications
317
# about anything in the table that changed.
318
319
dbg("Save state from before disconnect")
320
before = @_value
321
322
dbg("Clean up everything.")
323
@_db.removeListener(@_tgname, @_notification)
324
@_db.removeListener('connect', @_reconnect)
325
delete @_value
326
327
dbg("connect and initialize")
328
@_init (err) =>
329
if err
330
cb?(err)
331
return
332
if @_value? and before?
333
# It's highly unlikely that before or @_value would not be defined, but it could happen (see #2527)
334
dbg("notify about anything that changed when we were disconnected")
335
before.map (v, k) =>
336
if not v.equals(@_value.get(k))
337
@emit('change', k)
338
@_value.map (v, k) =>
339
if not before.has(k)
340
@emit('change', k)
341
cb?()
342
343
_process_results: (rows) =>
344
if @_state == 'closed' or not @_value?
345
# See https://github.com/sagemathinc/cocalc/issues/4440
346
# for why the @_value check. Remove this when this is
347
# rewritten in typescript and we can guarantee stuff.
348
return
349
for x in rows
350
k = x[@_primary_key]
351
v = immutable.fromJS(misc.map_without_undefined_and_null(x))
352
if not v.equals(@_value.get(k))
353
@_value = @_value.set(k, v)
354
if @_state == 'ready' # only send out change notifications after ready.
355
process.nextTick(=>@emit('change', k))
356
357
# Remove from synctable anything that no longer matches the where criterion.
358
_process_deleted: (rows, changed) =>
359
kept = {}
360
for x in rows
361
kept[x[@_primary_key]] = true
362
for k of changed
363
if not kept[k] and @_value.has(k)
364
# The record with primary_key k no longer matches the where criterion
365
# so we delete it from our synctable.
366
@_value = @_value.delete(k)
367
if @_state == 'ready'
368
process.nextTick(=>@emit('change', k))
369
370
# Grab any entries from table about which we have been notified of changes.
371
_update: (cb) =>
372
if misc.len(@_changed) == 0 # nothing to do
373
cb?()
374
return
375
changed = @_changed
376
@_changed = {} # reset changed set -- could get modified during query below, which is fine.
377
if @_select_columns.length == 1 # special case where we don't have to query for more info
378
@_process_results((("#{@_primary_key}" : x) for x in misc.keys(changed)))
379
cb?()
380
return
381
382
# Have to query to get actual changed data.
383
@_db._query
384
query : @_select_query
385
where : [{"#{@_primary_key} = ANY($)" : misc.keys(changed)}, @_where]
386
cb : (err, result) =>
387
if err
388
@_dbg("update")("error #{err}")
389
for k of changed
390
@_changed[k] = true # will try again later
391
else
392
@_process_results(result.rows)
393
@_process_deleted(result.rows, changed)
394
cb?()
395
396
get: (key) => # key = single key or array of keys
397
if not key? or not @_value?
398
return @_value
399
if is_array(key)
400
# for consistency with @cocalc/sync/synctable
401
r = immutable.Map()
402
for k in key
403
v = @_value.get(k)
404
if v?
405
r = r.set(k, v)
406
return r
407
else
408
return @_value.get(key)
409
410
getIn: (x) =>
411
return @_value?.getIn(x)
412
413
has: (key) =>
414
return @_value?.has(key)
415
416
# wait until some function of this synctable is truthy
417
wait: (opts) =>
418
opts = defaults opts,
419
until : required # waits until "until(@)" evaluates to something truthy
420
timeout : 30 # in *seconds* -- set to 0 to disable (sort of DANGEROUS if 0, obviously.)
421
cb : required # cb(undefined, until(@)) on success and cb('timeout') on failure due to timeout
422
x = opts.until(@)
423
if x
424
opts.cb(undefined, x) # already true
425
return
426
fail_timer = undefined
427
f = =>
428
x = opts.until(@)
429
if x
430
@removeListener('change', f)
431
if fail_timer?
432
clearTimeout(fail_timer)
433
fail_timer = undefined
434
opts.cb(undefined, x)
435
@on('change', f)
436
if opts.timeout
437
fail = =>
438
@removeListener('change', f)
439
opts.cb('timeout')
440
fail_timer = setTimeout(fail, 1000*opts.timeout)
441
return
442
443
###
444
Trigger functions
445
###
446
trigger_name = (table, select, watch) ->
447
if not misc.is_object(select)
448
throw Error("trigger_name -- columns must be a map of colname:type")
449
c = misc.keys(select)
450
c.sort()
451
watch = misc.copy(watch)
452
watch.sort()
453
if watch.length > 0
454
c.push('|')
455
c = c.concat(watch)
456
return 'change_' + misc_node.sha1("#{table} #{c.join(' ')}").slice(0,16)
457
458
###
459
INPUT:
460
table -- name of a table
461
select -- map from field names (of table) to their postgres types
462
change -- array of field names (of table)
463
464
Creates a trigger function that fires whenever any of the given
465
columns changes, and sends the columns in select out as a notification.
466
###
467
468
triggerType = (type) ->
469
if type == 'SERIAL UNIQUE'
470
return 'INTEGER'
471
else
472
return type
473
474
trigger_code = (table, select, watch) ->
475
tgname = trigger_name(table, select, watch)
476
column_decl_old = ("#{field}_old #{triggerType(type) ? 'text'};" for field, type of select)
477
column_decl_new = ("#{field}_new #{triggerType(type) ? 'text'};" for field, type of select)
478
assign_old = ("#{field}_old = OLD.#{field};" for field, _ of select)
479
assign_new = ("#{field}_new = NEW.#{field};" for field, _ of select)
480
build_obj_old = ("'#{field}', #{field}_old" for field, _ of select)
481
build_obj_new = ("'#{field}', #{field}_new" for field, _ of select)
482
if watch.length > 0
483
no_change = ("OLD.#{field} = NEW.#{field}" for field in watch.concat(misc.keys(select))).join(' AND ')
484
else
485
no_change = 'FALSE'
486
if watch.length > 0
487
x = {}
488
for k in watch
489
x[k] = true
490
for k in misc.keys(select)
491
x[k] = true
492
update_of = "OF #{(quote_field(field) for field in misc.keys(x)).join(',')}"
493
else
494
update_of = ""
495
code = {}
496
code.function = """
497
CREATE OR REPLACE FUNCTION #{tgname}() RETURNS TRIGGER AS $$
498
DECLARE
499
notification json;
500
obj_old json;
501
obj_new json;
502
#{column_decl_old.join('\n')}
503
#{column_decl_new.join('\n')}
504
BEGIN
505
-- TG_OP is 'DELETE', 'INSERT' or 'UPDATE'
506
IF TG_OP = 'DELETE' THEN
507
#{assign_old.join('\n')}
508
obj_old = json_build_object(#{build_obj_old.join(',')});
509
END IF;
510
IF TG_OP = 'INSERT' THEN
511
#{assign_new.join('\n')}
512
obj_new = json_build_object(#{build_obj_new.join(',')});
513
END IF;
514
IF TG_OP = 'UPDATE' THEN
515
IF #{no_change} THEN
516
RETURN NULL;
517
END IF;
518
#{assign_old.join('\n')}
519
obj_old = json_build_object(#{build_obj_old.join(',')});
520
#{assign_new.join('\n')}
521
obj_new = json_build_object(#{build_obj_new.join(',')});
522
END IF;
523
notification = json_build_array(TG_OP, obj_new, obj_old);
524
PERFORM pg_notify('#{tgname}', notification::text);
525
RETURN NULL;
526
END;
527
$$ LANGUAGE plpgsql;"""
528
code.trigger = "CREATE TRIGGER #{tgname} AFTER INSERT OR DELETE OR UPDATE #{update_of} ON #{table} FOR EACH ROW EXECUTE PROCEDURE #{tgname}();"
529
return code
530
531
###
532
533
NOTES: The following is a way to back the changes with a small table.
534
This allows to have changes which are larger than the hard 8000 bytes limit.
535
HSY did this with the idea of having a temporary workaround for a bug related to this.
536
https://github.com/sagemathinc/cocalc/issues/1718
537
538
1. Create a table trigger_notifications via the db-schema.
539
For performance reasons, the table itself should be created with "UNLOGGED"
540
see: https://www.postgresql.org/docs/current/static/sql-createtable.html
541
(I've no idea how to specify that in the code here)
542
543
schema.trigger_notifications =
544
primary_key : 'id'
545
fields:
546
id:
547
type : 'uuid'
548
desc : 'primary key'
549
time:
550
type : 'timestamp'
551
desc : 'time of when the change was created -- used for TTL'
552
notification:
553
type : 'map'
554
desc : "notification payload -- up to 1GB"
555
pg_indexes : [ 'time' ]
556
557
2. Modify the trigger function created by trigger_code above such that
558
pg_notifies no longer contains the data structure,
559
but a UUID for an entry in the trigger_notifications table.
560
It creates that UUID on its own and stores the data via a normal insert.
561
562
notification_id = md5(random()::text || clock_timestamp()::text)::uuid;
563
notification = json_build_array(TG_OP, obj_new, obj_old);
564
INSERT INTO trigger_notifications(id, time, notification)
565
VALUES(notification_id, NOW(), notification);
566
567
3. PostgresQL::_notification is modified in such a way, that it looks up that UUID
568
in the trigger_notifications table:
569
570
@_query
571
query: "SELECT notification FROM trigger_notifications WHERE id ='#{mesg.payload}'"
572
cb : (err, result) =>
573
if err
574
dbg("err=#{err}")
575
else
576
payload = result.rows[0].notification
577
# dbg("payload: type=#{typeof(payload)}, data=#{misc.to_json(payload)}")
578
@emit(mesg.channel, payload)
579
580
Fortunately, there is no string -> json conversion necessary.
581
582
4. Below, that function and trigger implement a TTL for the trigger_notifications table.
583
The `date_trunc` is a good idea, because then there is just one lock + delete op
584
per minute, instead of potentially at every write.
585
586
-- 10 minutes TTL for the trigger_notifications table, deleting only every full minute
587
588
CREATE FUNCTION delete_old_trigger_notifications() RETURNS trigger
589
LANGUAGE plpgsql
590
AS $$
591
BEGIN
592
DELETE FROM trigger_notifications
593
WHERE time < date_trunc('minute', NOW() - '10 minute'::interval);
594
RETURN NULL;
595
END;
596
$$;
597
598
-- creating the trigger
599
600
CREATE TRIGGER trigger_delete_old_trigger_notifications
601
AFTER INSERT ON trigger_notifications
602
EXECUTE PROCEDURE delete_old_trigger_notifications();
603
604
###
605
606