Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Path: blob/master/src/packages/database/postgres-synctable.coffee
Views: 687
#########################################################################1# This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.2# License: MS-RSL – see LICENSE.md for details3#########################################################################45# Server side synchronized tables built on PostgreSQL, and basic support6# for user get query updates.78EventEmitter = require('events')910immutable = require('immutable')11async = require('async')1213{defaults, is_array} = misc = require('@cocalc/util/misc')14required = defaults.required15misc_node = require('@cocalc/backend/misc_node')1617{pg_type, one_result, all_results, quote_field} = require('./postgres-base')1819{SCHEMA} = require('@cocalc/util/schema')2021{Changes} = require('./postgres/changefeed')2223{ProjectAndUserTracker} = require('./postgres/project-and-user-tracker')2425exports.extend_PostgreSQL = (ext) -> class PostgreSQL extends ext2627_ensure_trigger_exists: (table, select, watch, cb) =>28dbg = @_dbg("_ensure_trigger_exists(#{table})")29dbg("select=#{misc.to_json(select)}")30if misc.len(select) == 031cb('there must be at least one column selected')32return33tgname = trigger_name(table, select, watch)34trigger_exists = undefined35async.series([36(cb) =>37dbg("checking whether or not trigger exists")38@_query39query : "SELECT count(*) FROM pg_trigger WHERE tgname = '#{tgname}'"40cb : (err, result) =>41if err42cb(err)43else44trigger_exists = parseInt(result.rows[0].count) > 045cb()46(cb) =>47if trigger_exists48dbg("trigger #{tgname} already exists")49cb()50return51dbg("creating trigger #{tgname}")52code = trigger_code(table, select, watch)53async.series([54(cb) =>55@_query56query : code.function57cb : cb58(cb) =>59@_query60query : code.trigger61cb : cb62], cb)63], cb)6465_listen: (table, select, watch, cb) =>66dbg = @_dbg("_listen(#{table})")67dbg("select = #{misc.to_json(select)}")68if not misc.is_object(select)69cb('select must be an object')70return71if misc.len(select) == 072cb('there must be at least one column')73return74if not misc.is_array(watch)75cb('watch must be an array')76return77@_listening ?= {}78tgname = trigger_name(table, select, watch)79if @_listening[tgname] > 080dbg("already listening")81@_listening[tgname] += 182cb?(undefined, tgname)83return84async.series([85(cb) =>86dbg("ensure trigger exists")87@_ensure_trigger_exists(table, select, watch, cb)88(cb) =>89dbg("add listener")90@_query91query : "LISTEN #{tgname}"92cb : cb93], (err) =>94if err95dbg("fail: err = #{err}")96cb?(err)97else98@_listening[tgname] ?= 099@_listening[tgname] += 1100dbg("success")101cb?(undefined, tgname)102)103104_notification: (mesg) =>105#@_dbg('notification')(misc.to_json(mesg)) # this is way too verbose...106@emit(mesg.channel, JSON.parse(mesg.payload))107108_clear_listening_state: =>109@_listening = {}110111_stop_listening: (table, select, watch, cb) =>112@_listening ?= {}113tgname = trigger_name(table, select, watch)114if not @_listening[tgname]? or @_listening[tgname] == 0115cb?()116return117if @_listening[tgname] > 0118@_listening[tgname] -= 1119if @_listening[tgname] == 0120@_query121query : "UNLISTEN #{tgname}"122cb : cb123124# Server-side changefeed-updated table, which automatically restart changefeed125# on error, etc. See SyncTable docs where the class is defined.126synctable: (opts) =>127opts = defaults opts,128table : required129columns : undefined130where : undefined131limit : undefined132order_by : undefined133where_function : undefined # if given; a function of the *primary* key that returns true if and only if it matches the changefeed134idle_timeout_s : undefined # TODO: currently ignored135cb : undefined136if @is_standby137err = "synctable against standby database not allowed"138if opts.cb?139opts.cb(err)140return141else142throw Error(err)143return new SyncTable(@, opts.table, opts.columns, opts.where, opts.where_function, opts.limit, opts.order_by, opts.cb)144145changefeed: (opts) =>146opts = defaults opts,147table : required # Name of the table148select : required # Map from field names to postgres data types. These must149# determine entries of table (e.g., primary key).150watch : required # Array of field names we watch for changes151where : required # Condition involving only the fields in select; or function taking obj with select and returning true or false152cb : required153if @is_standby154opts.cb?("changefeed against standby database not allowed")155return156new Changes(@, opts.table, opts.select, opts.watch, opts.where, opts.cb)157return158159# Event emitter that changes to users of a project, and collabs of a user.160# If it emits 'error' -- which is can and will do sometimes -- then161# any client of this tracker must give up on using it!162project_and_user_tracker: (opts) =>163opts = defaults opts,164cb : required165if @_project_and_user_tracker?166opts.cb(undefined, @_project_and_user_tracker)167return168@_project_and_user_tracker_cbs ?= []169@_project_and_user_tracker_cbs.push(opts.cb)170if @_project_and_user_tracker_cbs.length > 1171return172tracker = new ProjectAndUserTracker(@)173tracker.once "error", =>174# delete, so that future calls create a new one.175delete @_project_and_user_tracker176try177await tracker.init()178@_project_and_user_tracker = tracker179for cb in @_project_and_user_tracker_cbs180cb(undefined, tracker)181delete @_project_and_user_tracker_cbs182catch err183for cb in @_project_and_user_tracker_cbs184cb(err)185186class SyncTable extends EventEmitter187constructor: (_db, _table, _columns, _where, _where_function, _limit, _order_by, cb) ->188super()189@_db = _db190@_table = _table191@_columns = _columns192@_where = _where193@_where_function = _where_function194@_limit = _limit195@_order_by = _order_by196t = SCHEMA[@_table]197if not t?198@_state = 'error'199cb?("unknown table #{@_table}")200return201202try203@_primary_key = @_db._primary_key(@_table)204catch e205cb?(e)206return207208@_listen_columns = {"#{@_primary_key}" : pg_type(t.fields[@_primary_key])}209210# We only trigger an update when one of the columns we care about actually changes.211212if @_columns213@_watch_columns = misc.copy(@_columns) # don't include primary key since it can't change.214if @_primary_key not in @_columns215@_columns = @_columns.concat([@_primary_key]) # required216@_select_columns = @_columns217else218@_watch_columns = [] # means all of them219@_select_columns = misc.keys(SCHEMA[@_table].fields)220221@_select_query = "SELECT #{(quote_field(x) for x in @_select_columns)} FROM #{@_table}"222223@_init (err) =>224if err and not cb?225@emit("error", err)226return227@emit('init')228cb?(err, @)229230_dbg: (f) =>231return @_db._dbg("SyncTable(table='#{@_table}').#{f}")232233_query_opts: () =>234opts = {}235opts.query = @_select_query236opts.where = @_where237opts.limit = @_limit238opts.order_by = @_order_by239return opts240241close: (cb) =>242@removeAllListeners()243@_db.removeListener(@_tgname, @_notification)244@_db.removeListener('connect', @_reconnect)245@_state = 'closed'246delete @_value247@_db._stop_listening(@_table, @_listen_columns, @_watch_columns, cb)248249connect: (opts) =>250opts?.cb?() # NO-OP -- only needed for backward compatibility251252_notification: (obj) =>253#console.log 'notification', obj254[action, new_val, old_val] = obj255if action == 'DELETE' or not new_val?256k = old_val[@_primary_key]257if @_value.has(k)258@_value = @_value.delete(k)259process.nextTick(=>@emit('change', k))260else261k = new_val[@_primary_key]262if @_where_function? and not @_where_function(k)263# doesn't match -- nothing to do -- ignore264return265@_changed[k] = true266@_update()267268_init: (cb) =>269misc.retry_until_success270f : @_do_init271start_delay : 3000272max_delay : 10000273log : @_dbg("_init")274cb : cb275276_do_init: (cb) =>277@_state = 'init' # 'init' -> ['error', 'ready'] -> 'closed'278@_value = immutable.Map()279@_changed = {}280async.series([281(cb) =>282# ensure database client is listening for primary keys changes to our table283@_db._listen @_table, @_listen_columns, @_watch_columns, (err, tgname) =>284@_tgname = tgname285@_db.on(@_tgname, @_notification)286cb(err)287(cb) =>288opts = @_query_opts()289opts.cb = (err, result) =>290if err291cb(err)292else293@_process_results(result.rows)294@_db.once('connect', @_reconnect)295cb()296@_db._query(opts)297(cb) =>298@_update(cb)299], (err) =>300if err301@_state = 'error'302cb(err)303else304@_state = 'ready'305cb()306)307308_reconnect: (cb) =>309dbg = @_dbg("_reconnect")310if @_state != 'ready'311dbg("only attempt reconnect if we were already successfully connected at some point.")312return313# Everything was already initialized, but then the connection to the314# database was dropped... and then successfully re-connected. Now315# we need to (1) setup everything again, and (2) send out notifications316# about anything in the table that changed.317318dbg("Save state from before disconnect")319before = @_value320321dbg("Clean up everything.")322@_db.removeListener(@_tgname, @_notification)323@_db.removeListener('connect', @_reconnect)324delete @_value325326dbg("connect and initialize")327@_init (err) =>328if err329cb?(err)330return331if @_value? and before?332# It's highly unlikely that before or @_value would not be defined, but it could happen (see #2527)333dbg("notify about anything that changed when we were disconnected")334before.map (v, k) =>335if not v.equals(@_value.get(k))336@emit('change', k)337@_value.map (v, k) =>338if not before.has(k)339@emit('change', k)340cb?()341342_process_results: (rows) =>343if @_state == 'closed' or not @_value?344# See https://github.com/sagemathinc/cocalc/issues/4440345# for why the @_value check. Remove this when this is346# rewritten in typescript and we can guarantee stuff.347return348for x in rows349k = x[@_primary_key]350v = immutable.fromJS(misc.map_without_undefined_and_null(x))351if not v.equals(@_value.get(k))352@_value = @_value.set(k, v)353if @_state == 'ready' # only send out change notifications after ready.354process.nextTick(=>@emit('change', k))355356# Remove from synctable anything that no longer matches the where criterion.357_process_deleted: (rows, changed) =>358kept = {}359for x in rows360kept[x[@_primary_key]] = true361for k of changed362if not kept[k] and @_value.has(k)363# The record with primary_key k no longer matches the where criterion364# so we delete it from our synctable.365@_value = @_value.delete(k)366if @_state == 'ready'367process.nextTick(=>@emit('change', k))368369# Grab any entries from table about which we have been notified of changes.370_update: (cb) =>371if misc.len(@_changed) == 0 # nothing to do372cb?()373return374changed = @_changed375@_changed = {} # reset changed set -- could get modified during query below, which is fine.376if @_select_columns.length == 1 # special case where we don't have to query for more info377@_process_results((("#{@_primary_key}" : x) for x in misc.keys(changed)))378cb?()379return380381# Have to query to get actual changed data.382@_db._query383query : @_select_query384where : [{"#{@_primary_key} = ANY($)" : misc.keys(changed)}, @_where]385cb : (err, result) =>386if err387@_dbg("update")("error #{err}")388for k of changed389@_changed[k] = true # will try again later390else391@_process_results(result.rows)392@_process_deleted(result.rows, changed)393cb?()394395get: (key) => # key = single key or array of keys396if not key? or not @_value?397return @_value398if is_array(key)399# for consistency with @cocalc/sync/synctable400r = immutable.Map()401for k in key402v = @_value.get(k)403if v?404r = r.set(k, v)405return r406else407return @_value.get(key)408409getIn: (x) =>410return @_value?.getIn(x)411412has: (key) =>413return @_value?.has(key)414415# wait until some function of this synctable is truthy416wait: (opts) =>417opts = defaults opts,418until : required # waits until "until(@)" evaluates to something truthy419timeout : 30 # in *seconds* -- set to 0 to disable (sort of DANGEROUS if 0, obviously.)420cb : required # cb(undefined, until(@)) on success and cb('timeout') on failure due to timeout421x = opts.until(@)422if x423opts.cb(undefined, x) # already true424return425fail_timer = undefined426f = =>427x = opts.until(@)428if x429@removeListener('change', f)430if fail_timer?431clearTimeout(fail_timer)432fail_timer = undefined433opts.cb(undefined, x)434@on('change', f)435if opts.timeout436fail = =>437@removeListener('change', f)438opts.cb('timeout')439fail_timer = setTimeout(fail, 1000*opts.timeout)440return441442###443Trigger functions444###445trigger_name = (table, select, watch) ->446if not misc.is_object(select)447throw Error("trigger_name -- columns must be a map of colname:type")448c = misc.keys(select)449c.sort()450watch = misc.copy(watch)451watch.sort()452if watch.length > 0453c.push('|')454c = c.concat(watch)455return 'change_' + misc_node.sha1("#{table} #{c.join(' ')}").slice(0,16)456457###458INPUT:459table -- name of a table460select -- map from field names (of table) to their postgres types461change -- array of field names (of table)462463Creates a trigger function that fires whenever any of the given464columns changes, and sends the columns in select out as a notification.465###466467triggerType = (type) ->468if type == 'SERIAL UNIQUE'469return 'INTEGER'470else471return type472473trigger_code = (table, select, watch) ->474tgname = trigger_name(table, select, watch)475column_decl_old = ("#{field}_old #{triggerType(type) ? 'text'};" for field, type of select)476column_decl_new = ("#{field}_new #{triggerType(type) ? 'text'};" for field, type of select)477assign_old = ("#{field}_old = OLD.#{field};" for field, _ of select)478assign_new = ("#{field}_new = NEW.#{field};" for field, _ of select)479build_obj_old = ("'#{field}', #{field}_old" for field, _ of select)480build_obj_new = ("'#{field}', #{field}_new" for field, _ of select)481if watch.length > 0482no_change = ("OLD.#{field} = NEW.#{field}" for field in watch.concat(misc.keys(select))).join(' AND ')483else484no_change = 'FALSE'485if watch.length > 0486x = {}487for k in watch488x[k] = true489for k in misc.keys(select)490x[k] = true491update_of = "OF #{(quote_field(field) for field in misc.keys(x)).join(',')}"492else493update_of = ""494code = {}495code.function = """496CREATE OR REPLACE FUNCTION #{tgname}() RETURNS TRIGGER AS $$497DECLARE498notification json;499obj_old json;500obj_new json;501#{column_decl_old.join('\n')}502#{column_decl_new.join('\n')}503BEGIN504-- TG_OP is 'DELETE', 'INSERT' or 'UPDATE'505IF TG_OP = 'DELETE' THEN506#{assign_old.join('\n')}507obj_old = json_build_object(#{build_obj_old.join(',')});508END IF;509IF TG_OP = 'INSERT' THEN510#{assign_new.join('\n')}511obj_new = json_build_object(#{build_obj_new.join(',')});512END IF;513IF TG_OP = 'UPDATE' THEN514IF #{no_change} THEN515RETURN NULL;516END IF;517#{assign_old.join('\n')}518obj_old = json_build_object(#{build_obj_old.join(',')});519#{assign_new.join('\n')}520obj_new = json_build_object(#{build_obj_new.join(',')});521END IF;522notification = json_build_array(TG_OP, obj_new, obj_old);523PERFORM pg_notify('#{tgname}', notification::text);524RETURN NULL;525END;526$$ LANGUAGE plpgsql;"""527code.trigger = "CREATE TRIGGER #{tgname} AFTER INSERT OR DELETE OR UPDATE #{update_of} ON #{table} FOR EACH ROW EXECUTE PROCEDURE #{tgname}();"528return code529530###531532NOTES: The following is a way to back the changes with a small table.533This allows to have changes which are larger than the hard 8000 bytes limit.534HSY did this with the idea of having a temporary workaround for a bug related to this.535https://github.com/sagemathinc/cocalc/issues/17185365371. Create a table trigger_notifications via the db-schema.538For performance reasons, the table itself should be created with "UNLOGGED"539see: https://www.postgresql.org/docs/current/static/sql-createtable.html540(I've no idea how to specify that in the code here)541542schema.trigger_notifications =543primary_key : 'id'544fields:545id:546type : 'uuid'547desc : 'primary key'548time:549type : 'timestamp'550desc : 'time of when the change was created -- used for TTL'551notification:552type : 'map'553desc : "notification payload -- up to 1GB"554pg_indexes : [ 'time' ]5555562. Modify the trigger function created by trigger_code above such that557pg_notifies no longer contains the data structure,558but a UUID for an entry in the trigger_notifications table.559It creates that UUID on its own and stores the data via a normal insert.560561notification_id = md5(random()::text || clock_timestamp()::text)::uuid;562notification = json_build_array(TG_OP, obj_new, obj_old);563INSERT INTO trigger_notifications(id, time, notification)564VALUES(notification_id, NOW(), notification);5655663. PostgresQL::_notification is modified in such a way, that it looks up that UUID567in the trigger_notifications table:568569@_query570query: "SELECT notification FROM trigger_notifications WHERE id ='#{mesg.payload}'"571cb : (err, result) =>572if err573dbg("err=#{err}")574else575payload = result.rows[0].notification576# dbg("payload: type=#{typeof(payload)}, data=#{misc.to_json(payload)}")577@emit(mesg.channel, payload)578579Fortunately, there is no string -> json conversion necessary.5805814. Below, that function and trigger implement a TTL for the trigger_notifications table.582The `date_trunc` is a good idea, because then there is just one lock + delete op583per minute, instead of potentially at every write.584585-- 10 minutes TTL for the trigger_notifications table, deleting only every full minute586587CREATE FUNCTION delete_old_trigger_notifications() RETURNS trigger588LANGUAGE plpgsql589AS $$590BEGIN591DELETE FROM trigger_notifications592WHERE time < date_trunc('minute', NOW() - '10 minute'::interval);593RETURN NULL;594END;595$$;596597-- creating the trigger598599CREATE TRIGGER trigger_delete_old_trigger_notifications600AFTER INSERT ON trigger_notifications601EXECUTE PROCEDURE delete_old_trigger_notifications();602603###604605606