Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Path: blob/master/src/packages/database/postgres/changefeed.ts
Views: 687
/*1* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.2* License: MS-RSL – see LICENSE.md for details3*/45/*6The Changes class is a useful building block7for making changefeeds. It lets you watch when given8columns change in a given table, and be notified9when a where condition is satisfied.1011IMPORTANT: If an error event is emitted then12Changes object will close and not work any further!13You must recreate it.14*/1516import { EventEmitter } from "events";17import * as misc from "@cocalc/util/misc";18import { opToFunction, OPERATORS, Operator } from "@cocalc/util/db-schema";19import { callback } from "awaiting";20import { PostgreSQL, QuerySelect } from "./types";21import { query } from "./changefeed-query";2223type WhereCondition = Function | object | object[];2425type ChangeAction = "delete" | "insert" | "update";26function parse_action(obj: string): ChangeAction {27const s: string = `${obj.toLowerCase()}`;28if (s === "delete" || s === "insert" || s === "update") {29return s;30}31throw Error(`invalid action "${s}"`);32}3334export interface ChangeEvent {35action: ChangeAction;36new_val?: object;37old_val?: object;38}3940export class Changes extends EventEmitter {41private db: PostgreSQL;42private table: string;43private select: QuerySelect;44private watch: string[];45private where: WhereCondition;4647private trigger_name: string;48private closed: boolean;49private condition?: { [field: string]: Function };50private match_condition: Function;5152private val_update_cache: { [key: string]: any } = {};5354constructor(55db: PostgreSQL,56table: string,57select: QuerySelect,58watch: string[],59where: WhereCondition,60cb: Function61) {62super();63this.handle_change = this.handle_change.bind(this);6465this.db = db;66this.table = table;67this.select = select;68this.watch = watch;69this.where = where;70this.init(cb);71}7273async init(cb: Function): Promise<void> {74this.dbg("constructor")(75`select=${misc.to_json(this.select)}, watch=${misc.to_json(76this.watch77)}, @_where=${misc.to_json(this.where)}`78);7980try {81this.init_where();82} catch (e) {83cb(`error initializing where conditions -- ${e}`);84return;85}8687try {88this.trigger_name = await callback(89this.db._listen,90this.table,91this.select,92this.watch93);94} catch (err) {95cb(err);96return;97}98this.db.on(this.trigger_name, this.handle_change);99// NOTE: we close on *connect*, not on disconnect, since then clients100// that try to reconnect will only try to do so when we have an actual101// connection to the database. No point in worrying them while trying102// to reconnect, which only makes matters worse (as they panic and103// requests pile up!).104105// This setMaxListeners is here because I keep getting warning about106// this despite setting it in the db constructor. Putting this here107// definitely does work, whereas having it only in the constructor108// definitely does NOT. Don't break this without thought, as it has very bad109// consequences when the database connection drops.110this.db.setMaxListeners(0);111112this.db.once("connect", this.close);113cb(undefined, this);114}115116private dbg(f: string): Function {117return this.db._dbg(`Changes(table='${this.table}').${f}`);118}119120// this breaks the changefeed -- client must recreate it; nothing further will work at all.121private fail(err): void {122if (this.closed) {123return;124}125this.dbg("_fail")(`err='${err}'`);126this.emit("error", new Error(err));127this.close();128}129130public close(): void {131if (this.closed) {132return;133}134this.emit("close", { action: "close" });135this.removeAllListeners();136if (this.db != null) {137this.db.removeListener(this.trigger_name, this.handle_change);138this.db.removeListener("connect", this.close);139this.db._stop_listening(this.table, this.select, this.watch);140}141misc.close(this);142this.closed = true;143}144145public async insert(where): Promise<void> {146const where0: { [field: string]: any } = {};147for (const k in where) {148const v = where[k];149where0[`${k} = $`] = v;150}151let results: { [field: string]: any }[];152try {153results = await query({154db: this.db,155select: this.watch.concat(misc.keys(this.select)),156table: this.table,157where: where0,158one: false,159});160} catch (err) {161this.fail(err); // this is game over162return;163}164for (const x of results) {165if (this.match_condition(x)) {166misc.map_mutate_out_undefined_and_null(x);167const change: ChangeEvent = { action: "insert", new_val: x };168this.emit("change", change);169}170}171}172173public delete(where): void {174// listener is meant to delete everything that *matches* the where, so175// there is no need to actually do a query.176const change: ChangeEvent = { action: "delete", old_val: where };177this.emit("change", change);178}179180private async handle_change(mesg): Promise<void> {181if (this.closed) {182return;183}184// this.dbg("handle_change")(JSON.stringify(mesg));185if (mesg[0] === "DELETE") {186if (!this.match_condition(mesg[2])) {187return;188}189this.emit("change", { action: "delete", old_val: mesg[2] });190return;191}192let k: string, r: ChangeEvent, v: any;193if (typeof mesg[0] !== "string") {194throw Error(`invalid mesg -- mesg[0] must be a string`);195}196let action: ChangeAction = parse_action(mesg[0]);197if (!this.match_condition(mesg[1])) {198// object does not match condition199if (action !== "update") {200// new object that doesn't match condition -- nothing to do.201return;202}203// fill in for each part that we watch in new object the same204// data in the old object, in case it is missing.205// TODO: when is this actually needed?206for (k in mesg[1]) {207v = mesg[1][k];208if (mesg[2][k] == null) {209mesg[2][k] = v;210}211}212if (this.match_condition(mesg[2])) {213// the old object was in our changefeed, but the UPDATE made it not214// anymore, so we emit delete action.215this.emit("change", { action: "delete", old_val: mesg[2] });216}217// Nothing more to do.218return;219}220if (this.watch.length === 0) {221// No additional columns are being watched at all -- we only222// care about what's in the mesg.223r = { action, new_val: mesg[1] };224this.emit("change", r);225return;226}227// Additional columns are watched so we must do a query to get them.228// There's no way around this due to the size limits on postgres LISTEN/NOTIFY.229const where = {};230for (k in mesg[1]) {231v = mesg[1][k];232where[`${k} = $`] = v;233}234let result: undefined | { [field: string]: any };235try {236result = await query({237db: this.db,238select: this.watch,239table: this.table,240where,241one: true,242});243} catch (err) {244this.fail(err);245return;246}247248// we do know from stacktraces that new_val_update is called after closed249// this must have happened during waiting on the query. aborting early.250if (this.closed) {251return;252}253254if (result == null) {255// This happens when record isn't deleted, but some256// update results in the object being removed from our257// selection criterion... which we view as "delete".258this.emit("change", { action: "delete", old_val: mesg[1] });259return;260}261262const key = JSON.stringify(mesg[1]);263const this_val = misc.merge(result, mesg[1]);264let new_val;265if (action == "update") {266const x = this.new_val_update(mesg[1], this_val, key);267if (x == null) {268// happens if this.closed is true -- double check for safety (and typescript).269return;270}271action = x.action; // may be insert in case no previous cached info.272new_val = x.new_val;273} else {274// not update and not delete (could have been a delete and write275// before we did above query, so treat as insert).276action = "insert";277new_val = this_val;278}279this.val_update_cache[key] = this_val;280281r = { action, new_val };282this.emit("change", r);283}284285private new_val_update(286primary_part: { [key: string]: any },287this_val: { [key: string]: any },288key: string289):290| { new_val: { [key: string]: any }; action: "insert" | "update" }291| undefined {292if (this.closed) {293return;294}295const prev_val = this.val_update_cache[key];296if (prev_val == null) {297return { new_val: this_val, action: "insert" }; // not enough info to make a diff298}299this.dbg("new_val_update")(`${JSON.stringify({ this_val, prev_val })}`);300301// Send only the fields that changed between302// prev_val and this_val, along with the primary part.303const new_val = misc.copy(primary_part);304// Not using lodash isEqual below, since we want equal Date objects305// to compare as equal. If JSON is randomly re-ordered, that's fine since306// it is just slightly less efficienct.307for (const field in this_val) {308if (309new_val[field] === undefined &&310JSON.stringify(this_val[field]) != JSON.stringify(prev_val[field])311) {312new_val[field] = this_val[field];313}314}315for (const field in prev_val) {316if (prev_val[field] != null && this_val[field] == null) {317// field was deleted / set to null -- we must inform in the update318new_val[field] = null;319}320}321return { new_val, action: "update" };322}323324private init_where(): void {325if (typeof this.where === "function") {326// user provided function327this.match_condition = this.where;328return;329}330331let w: any[];332if (misc.is_object(this.where)) {333w = [this.where];334} else {335// TODO: misc.is_object needs to be a typescript checker instead, so336// this as isn't needed.337w = this.where as object[];338}339340this.condition = {};341const add_condition = (field: string, op: Operator, val: any): void => {342if (this.condition == null) return; // won't happen343let f: Function, g: Function;344field = field.trim();345if (field[0] === '"') {346// de-quote347field = field.slice(1, field.length - 1);348}349if (this.select[field] == null) {350throw Error(351`'${field}' must be in select="${JSON.stringify(this.select)}"`352);353}354if (misc.is_object(val)) {355throw Error(`val (=${misc.to_json(val)}) must not be an object`);356}357if (misc.is_array(val)) {358if (op === "=" || op === "==") {359// containment360f = function (x) {361for (const v of val) {362if (x === v) {363return true;364}365}366return false;367};368} else if (op === "!=" || op === "<>") {369// not contained in370f = function (x) {371for (const v of val) {372if (x === v) {373return false;374}375}376return true;377};378} else {379throw Error("if val is an array, then op must be = or !=");380}381} else if (misc.is_date(val)) {382// Inputs to condition come back as JSON, which doesn't know383// about timestamps, so we convert them to date objects.384if (op == "=" || op == "==") {385f = (x) => new Date(x).valueOf() - val === 0;386} else if (op == "!=" || op == "<>") {387f = (x) => new Date(x).valueOf() - val !== 0;388} else {389g = opToFunction(op);390f = (x) => g(new Date(x), val);391}392} else {393g = opToFunction(op);394f = (x) => g(x, val);395}396this.condition[field] = f;397};398399for (const obj of w) {400if (misc.is_object(obj)) {401for (const k in obj) {402const val = obj[k];403/*404k should be of one of the following forms405- "field op $::TYPE"406- "field op $" or407- "field op any($)"408- 'field' (defaults to =)409where op is one of =, <, >, <=, >=, !=410411val must be:412- something where javascript === and comparisons works as you expect!413- or an array, in which case op must be = or !=, and we ALWAYS do inclusion (analogue of any).414*/415let found = false;416for (const op of OPERATORS) {417const i = k.indexOf(op);418if (i !== -1) {419add_condition(k.slice(0, i).trim(), op, val);420found = true;421break;422}423}424if (!found) {425throw Error(`unable to parse '${k}'`);426}427}428} else if (typeof obj === "string") {429let found = false;430for (const op of OPERATORS) {431const i = obj.indexOf(op);432if (i !== -1) {433add_condition(434obj.slice(0, i),435op,436eval(obj.slice(i + op.length).trim())437);438found = true;439break;440}441}442if (!found) {443throw Error(`unable to parse '${obj}'`);444}445} else {446throw Error("NotImplementedError");447}448}449if (misc.len(this.condition) === 0) {450delete this.condition;451}452453this.match_condition = (obj: object): boolean => {454//console.log '_match_condition', obj455if (this.condition == null) {456return true;457}458for (const field in this.condition) {459const f = this.condition[field];460if (!f(obj[field])) {461//console.log 'failed due to field ', field462return false;463}464}465return true;466};467}468}469470471