CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/database/postgres/changefeed.ts
Views: 687
1
/*
2
* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
/*
7
The Changes class is a useful building block
8
for making changefeeds. It lets you watch when given
9
columns change in a given table, and be notified
10
when a where condition is satisfied.
11
12
IMPORTANT: If an error event is emitted then
13
Changes object will close and not work any further!
14
You must recreate it.
15
*/
16
17
import { EventEmitter } from "events";
18
import * as misc from "@cocalc/util/misc";
19
import { opToFunction, OPERATORS, Operator } from "@cocalc/util/db-schema";
20
import { callback } from "awaiting";
21
import { PostgreSQL, QuerySelect } from "./types";
22
import { query } from "./changefeed-query";
23
24
type WhereCondition = Function | object | object[];
25
26
type ChangeAction = "delete" | "insert" | "update";
27
function parse_action(obj: string): ChangeAction {
28
const s: string = `${obj.toLowerCase()}`;
29
if (s === "delete" || s === "insert" || s === "update") {
30
return s;
31
}
32
throw Error(`invalid action "${s}"`);
33
}
34
35
export interface ChangeEvent {
36
action: ChangeAction;
37
new_val?: object;
38
old_val?: object;
39
}
40
41
export class Changes extends EventEmitter {
42
private db: PostgreSQL;
43
private table: string;
44
private select: QuerySelect;
45
private watch: string[];
46
private where: WhereCondition;
47
48
private trigger_name: string;
49
private closed: boolean;
50
private condition?: { [field: string]: Function };
51
private match_condition: Function;
52
53
private val_update_cache: { [key: string]: any } = {};
54
55
constructor(
56
db: PostgreSQL,
57
table: string,
58
select: QuerySelect,
59
watch: string[],
60
where: WhereCondition,
61
cb: Function
62
) {
63
super();
64
this.handle_change = this.handle_change.bind(this);
65
66
this.db = db;
67
this.table = table;
68
this.select = select;
69
this.watch = watch;
70
this.where = where;
71
this.init(cb);
72
}
73
74
async init(cb: Function): Promise<void> {
75
this.dbg("constructor")(
76
`select=${misc.to_json(this.select)}, watch=${misc.to_json(
77
this.watch
78
)}, @_where=${misc.to_json(this.where)}`
79
);
80
81
try {
82
this.init_where();
83
} catch (e) {
84
cb(`error initializing where conditions -- ${e}`);
85
return;
86
}
87
88
try {
89
this.trigger_name = await callback(
90
this.db._listen,
91
this.table,
92
this.select,
93
this.watch
94
);
95
} catch (err) {
96
cb(err);
97
return;
98
}
99
this.db.on(this.trigger_name, this.handle_change);
100
// NOTE: we close on *connect*, not on disconnect, since then clients
101
// that try to reconnect will only try to do so when we have an actual
102
// connection to the database. No point in worrying them while trying
103
// to reconnect, which only makes matters worse (as they panic and
104
// requests pile up!).
105
106
// This setMaxListeners is here because I keep getting warning about
107
// this despite setting it in the db constructor. Putting this here
108
// definitely does work, whereas having it only in the constructor
109
// definitely does NOT. Don't break this without thought, as it has very bad
110
// consequences when the database connection drops.
111
this.db.setMaxListeners(0);
112
113
this.db.once("connect", this.close);
114
cb(undefined, this);
115
}
116
117
private dbg(f: string): Function {
118
return this.db._dbg(`Changes(table='${this.table}').${f}`);
119
}
120
121
// this breaks the changefeed -- client must recreate it; nothing further will work at all.
122
private fail(err): void {
123
if (this.closed) {
124
return;
125
}
126
this.dbg("_fail")(`err='${err}'`);
127
this.emit("error", new Error(err));
128
this.close();
129
}
130
131
public close(): void {
132
if (this.closed) {
133
return;
134
}
135
this.emit("close", { action: "close" });
136
this.removeAllListeners();
137
if (this.db != null) {
138
this.db.removeListener(this.trigger_name, this.handle_change);
139
this.db.removeListener("connect", this.close);
140
this.db._stop_listening(this.table, this.select, this.watch);
141
}
142
misc.close(this);
143
this.closed = true;
144
}
145
146
public async insert(where): Promise<void> {
147
const where0: { [field: string]: any } = {};
148
for (const k in where) {
149
const v = where[k];
150
where0[`${k} = $`] = v;
151
}
152
let results: { [field: string]: any }[];
153
try {
154
results = await query({
155
db: this.db,
156
select: this.watch.concat(misc.keys(this.select)),
157
table: this.table,
158
where: where0,
159
one: false,
160
});
161
} catch (err) {
162
this.fail(err); // this is game over
163
return;
164
}
165
for (const x of results) {
166
if (this.match_condition(x)) {
167
misc.map_mutate_out_undefined_and_null(x);
168
const change: ChangeEvent = { action: "insert", new_val: x };
169
this.emit("change", change);
170
}
171
}
172
}
173
174
public delete(where): void {
175
// listener is meant to delete everything that *matches* the where, so
176
// there is no need to actually do a query.
177
const change: ChangeEvent = { action: "delete", old_val: where };
178
this.emit("change", change);
179
}
180
181
private async handle_change(mesg): Promise<void> {
182
if (this.closed) {
183
return;
184
}
185
// this.dbg("handle_change")(JSON.stringify(mesg));
186
if (mesg[0] === "DELETE") {
187
if (!this.match_condition(mesg[2])) {
188
return;
189
}
190
this.emit("change", { action: "delete", old_val: mesg[2] });
191
return;
192
}
193
let k: string, r: ChangeEvent, v: any;
194
if (typeof mesg[0] !== "string") {
195
throw Error(`invalid mesg -- mesg[0] must be a string`);
196
}
197
let action: ChangeAction = parse_action(mesg[0]);
198
if (!this.match_condition(mesg[1])) {
199
// object does not match condition
200
if (action !== "update") {
201
// new object that doesn't match condition -- nothing to do.
202
return;
203
}
204
// fill in for each part that we watch in new object the same
205
// data in the old object, in case it is missing.
206
// TODO: when is this actually needed?
207
for (k in mesg[1]) {
208
v = mesg[1][k];
209
if (mesg[2][k] == null) {
210
mesg[2][k] = v;
211
}
212
}
213
if (this.match_condition(mesg[2])) {
214
// the old object was in our changefeed, but the UPDATE made it not
215
// anymore, so we emit delete action.
216
this.emit("change", { action: "delete", old_val: mesg[2] });
217
}
218
// Nothing more to do.
219
return;
220
}
221
if (this.watch.length === 0) {
222
// No additional columns are being watched at all -- we only
223
// care about what's in the mesg.
224
r = { action, new_val: mesg[1] };
225
this.emit("change", r);
226
return;
227
}
228
// Additional columns are watched so we must do a query to get them.
229
// There's no way around this due to the size limits on postgres LISTEN/NOTIFY.
230
const where = {};
231
for (k in mesg[1]) {
232
v = mesg[1][k];
233
where[`${k} = $`] = v;
234
}
235
let result: undefined | { [field: string]: any };
236
try {
237
result = await query({
238
db: this.db,
239
select: this.watch,
240
table: this.table,
241
where,
242
one: true,
243
});
244
} catch (err) {
245
this.fail(err);
246
return;
247
}
248
249
// we do know from stacktraces that new_val_update is called after closed
250
// this must have happened during waiting on the query. aborting early.
251
if (this.closed) {
252
return;
253
}
254
255
if (result == null) {
256
// This happens when record isn't deleted, but some
257
// update results in the object being removed from our
258
// selection criterion... which we view as "delete".
259
this.emit("change", { action: "delete", old_val: mesg[1] });
260
return;
261
}
262
263
const key = JSON.stringify(mesg[1]);
264
const this_val = misc.merge(result, mesg[1]);
265
let new_val;
266
if (action == "update") {
267
const x = this.new_val_update(mesg[1], this_val, key);
268
if (x == null) {
269
// happens if this.closed is true -- double check for safety (and typescript).
270
return;
271
}
272
action = x.action; // may be insert in case no previous cached info.
273
new_val = x.new_val;
274
} else {
275
// not update and not delete (could have been a delete and write
276
// before we did above query, so treat as insert).
277
action = "insert";
278
new_val = this_val;
279
}
280
this.val_update_cache[key] = this_val;
281
282
r = { action, new_val };
283
this.emit("change", r);
284
}
285
286
private new_val_update(
287
primary_part: { [key: string]: any },
288
this_val: { [key: string]: any },
289
key: string
290
):
291
| { new_val: { [key: string]: any }; action: "insert" | "update" }
292
| undefined {
293
if (this.closed) {
294
return;
295
}
296
const prev_val = this.val_update_cache[key];
297
if (prev_val == null) {
298
return { new_val: this_val, action: "insert" }; // not enough info to make a diff
299
}
300
this.dbg("new_val_update")(`${JSON.stringify({ this_val, prev_val })}`);
301
302
// Send only the fields that changed between
303
// prev_val and this_val, along with the primary part.
304
const new_val = misc.copy(primary_part);
305
// Not using lodash isEqual below, since we want equal Date objects
306
// to compare as equal. If JSON is randomly re-ordered, that's fine since
307
// it is just slightly less efficienct.
308
for (const field in this_val) {
309
if (
310
new_val[field] === undefined &&
311
JSON.stringify(this_val[field]) != JSON.stringify(prev_val[field])
312
) {
313
new_val[field] = this_val[field];
314
}
315
}
316
for (const field in prev_val) {
317
if (prev_val[field] != null && this_val[field] == null) {
318
// field was deleted / set to null -- we must inform in the update
319
new_val[field] = null;
320
}
321
}
322
return { new_val, action: "update" };
323
}
324
325
private init_where(): void {
326
if (typeof this.where === "function") {
327
// user provided function
328
this.match_condition = this.where;
329
return;
330
}
331
332
let w: any[];
333
if (misc.is_object(this.where)) {
334
w = [this.where];
335
} else {
336
// TODO: misc.is_object needs to be a typescript checker instead, so
337
// this as isn't needed.
338
w = this.where as object[];
339
}
340
341
this.condition = {};
342
const add_condition = (field: string, op: Operator, val: any): void => {
343
if (this.condition == null) return; // won't happen
344
let f: Function, g: Function;
345
field = field.trim();
346
if (field[0] === '"') {
347
// de-quote
348
field = field.slice(1, field.length - 1);
349
}
350
if (this.select[field] == null) {
351
throw Error(
352
`'${field}' must be in select="${JSON.stringify(this.select)}"`
353
);
354
}
355
if (misc.is_object(val)) {
356
throw Error(`val (=${misc.to_json(val)}) must not be an object`);
357
}
358
if (misc.is_array(val)) {
359
if (op === "=" || op === "==") {
360
// containment
361
f = function (x) {
362
for (const v of val) {
363
if (x === v) {
364
return true;
365
}
366
}
367
return false;
368
};
369
} else if (op === "!=" || op === "<>") {
370
// not contained in
371
f = function (x) {
372
for (const v of val) {
373
if (x === v) {
374
return false;
375
}
376
}
377
return true;
378
};
379
} else {
380
throw Error("if val is an array, then op must be = or !=");
381
}
382
} else if (misc.is_date(val)) {
383
// Inputs to condition come back as JSON, which doesn't know
384
// about timestamps, so we convert them to date objects.
385
if (op == "=" || op == "==") {
386
f = (x) => new Date(x).valueOf() - val === 0;
387
} else if (op == "!=" || op == "<>") {
388
f = (x) => new Date(x).valueOf() - val !== 0;
389
} else {
390
g = opToFunction(op);
391
f = (x) => g(new Date(x), val);
392
}
393
} else {
394
g = opToFunction(op);
395
f = (x) => g(x, val);
396
}
397
this.condition[field] = f;
398
};
399
400
for (const obj of w) {
401
if (misc.is_object(obj)) {
402
for (const k in obj) {
403
const val = obj[k];
404
/*
405
k should be of one of the following forms
406
- "field op $::TYPE"
407
- "field op $" or
408
- "field op any($)"
409
- 'field' (defaults to =)
410
where op is one of =, <, >, <=, >=, !=
411
412
val must be:
413
- something where javascript === and comparisons works as you expect!
414
- or an array, in which case op must be = or !=, and we ALWAYS do inclusion (analogue of any).
415
*/
416
let found = false;
417
for (const op of OPERATORS) {
418
const i = k.indexOf(op);
419
if (i !== -1) {
420
add_condition(k.slice(0, i).trim(), op, val);
421
found = true;
422
break;
423
}
424
}
425
if (!found) {
426
throw Error(`unable to parse '${k}'`);
427
}
428
}
429
} else if (typeof obj === "string") {
430
let found = false;
431
for (const op of OPERATORS) {
432
const i = obj.indexOf(op);
433
if (i !== -1) {
434
add_condition(
435
obj.slice(0, i),
436
op,
437
eval(obj.slice(i + op.length).trim())
438
);
439
found = true;
440
break;
441
}
442
}
443
if (!found) {
444
throw Error(`unable to parse '${obj}'`);
445
}
446
} else {
447
throw Error("NotImplementedError");
448
}
449
}
450
if (misc.len(this.condition) === 0) {
451
delete this.condition;
452
}
453
454
this.match_condition = (obj: object): boolean => {
455
//console.log '_match_condition', obj
456
if (this.condition == null) {
457
return true;
458
}
459
for (const field in this.condition) {
460
const f = this.condition[field];
461
if (!f(obj[field])) {
462
//console.log 'failed due to field ', field
463
return false;
464
}
465
}
466
return true;
467
};
468
}
469
}
470
471