CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/sync/table/query-function.ts
Views: 687
1
/*
2
* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
/*
7
Determine function that does query.
8
*/
9
10
const DISABLE_STANDBY: boolean = true; // if true, never use standby server at all.
11
12
const async = require("async");
13
14
import { delay } from "awaiting";
15
import { SCHEMA } from "@cocalc/util/schema";
16
import { copy } from "@cocalc/util/misc";
17
18
export function query_function(
19
client_query: Function,
20
table: string
21
): Function {
22
const s = SCHEMA[table];
23
if (s == null) {
24
throw Error(`unknown table ${table}`);
25
}
26
const db_standby = s.db_standby;
27
28
if (DISABLE_STANDBY || !db_standby) {
29
// just use default client.query, which queries the master database.
30
return client_query;
31
}
32
33
function do_query(opts: any): void {
34
if (opts == null) {
35
throw Error("opts must be an object");
36
}
37
38
let read_done: boolean = false;
39
const change_queue: { err: any; change: any }[] = [];
40
41
function do_initial_read_query(cb: Function): void {
42
const opts2 = copy(opts);
43
opts2.standby = true;
44
opts2.changes = false;
45
let cb_called: boolean = false;
46
opts2.cb = async function (err, resp): Promise<void> {
47
opts.cb(err, resp);
48
if (!err) {
49
read_done = true;
50
if (change_queue.length > 0) {
51
// CRITICAL: delay, since these must be pushed out in a later event loop.
52
// Without this delay, there will be many random failures.
53
await delay(0);
54
while (change_queue.length > 0) {
55
const x = change_queue.shift();
56
if (x == null) break; // make typescript happy.
57
const { err, change } = x;
58
opts.cb(err, change);
59
}
60
}
61
}
62
if (!cb_called) {
63
cb_called = true;
64
cb(err);
65
}
66
};
67
client_query(opts2);
68
}
69
70
function start_changefeed(cb: Function): void {
71
let first_resp: boolean = true;
72
const opts2 = copy(opts);
73
opts2.standby = false;
74
opts2.changes = true;
75
opts2.cb = function (err, change): void {
76
if (read_done) {
77
opts.cb(err, change);
78
} else {
79
change_queue.push({ err, change });
80
}
81
if (first_resp) {
82
first_resp = false;
83
cb(err);
84
}
85
};
86
opts2.options = opts2.options.concat({ only_changes: true });
87
client_query(opts2);
88
}
89
90
let f: Function;
91
if (db_standby === "unsafe") {
92
/* If db_standby == 'unsafe', then we do not even require
93
the changefeed to be working before doing the full query.
94
This will for sure miss all changes from when the query
95
finishes until the changefeed starts. For some
96
tables this is fine; for others, not. */
97
98
f = async.parallel;
99
} else {
100
// Otherwise, the query could miss a small amount of data,
101
// but only for a tiny window of time.
102
f = async.series;
103
}
104
105
f([do_initial_read_query, start_changefeed]);
106
}
107
108
return do_query;
109
}
110
111