CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/database/postgres/changefeed-query.ts
Views: 687
1
/*
2
* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
/*
7
Does the queries to update changefeeds, deduplicating across
8
both all changefeeds and a small interval of time.
9
*/
10
11
// set to false to completely disable for debugging/testing
12
const THROTTLE: boolean = true;
13
14
// 10ms when running unit tests, still throttle, but make it quick.
15
// Otherwise, we default to 250ms, which is enough to be massively
16
// useful, but also not noticed by user.
17
let THROTTLE_MS: number = process.env.SMC_TEST ? 10 : 500;
18
19
// THROTTLE_MS can be overridden the POSTGRES_THROTTLE_CHANGEFEED_MS
20
// environment variable.
21
if (process.env.POSTGRES_THROTTLE_CHANGEFEED_MS != null) {
22
THROTTLE_MS = parseInt(process.env.POSTGRES_THROTTLE_CHANGEFEED_MS);
23
}
24
25
import { EventEmitter } from "events";
26
27
import { callback } from "awaiting";
28
29
import { once } from "@cocalc/util/async-utils";
30
import { close, copy } from "@cocalc/util/misc";
31
32
const { one_result, all_results } = require("../postgres-base");
33
34
import { PostgreSQL, QueryWhere } from "./types";
35
36
interface QueryOpts {
37
db: PostgreSQL;
38
select: string[];
39
table: string;
40
where: QueryWhere;
41
one: boolean;
42
}
43
44
interface TableQuery {
45
select: string[];
46
where: QueryWhere;
47
}
48
49
function key(obj: { [key: string]: any }): string {
50
return `query-${JSON.stringify(obj)}`;
51
}
52
53
type State = "ready" | "closed";
54
55
class ThrottledTableQueue extends EventEmitter {
56
private table: string;
57
private queue: { [key: string]: TableQuery } = {};
58
private db: PostgreSQL;
59
private process_timer: any;
60
private interval_ms: number;
61
private state: State = "ready";
62
63
constructor(db: PostgreSQL, table: string, interval_ms: number) {
64
super();
65
this.db = db;
66
this.table = table;
67
this.interval_ms = interval_ms;
68
69
// client listens for results of query -- if queries pile up, and
70
// there are many tables, the default of 10 can easily be exceeded.
71
this.setMaxListeners(100);
72
}
73
74
private dbg(f): Function {
75
return this.db._dbg(`ThrottledTableQueue('${this.table}').${f}`);
76
}
77
78
public close(): void {
79
if (this.state == "closed") {
80
return;
81
}
82
if (this.process_timer != null) {
83
clearTimeout(this.process_timer);
84
}
85
for (const k in this.queue) {
86
this.emit(k, "closed");
87
}
88
this.emit("closed");
89
this.removeAllListeners();
90
close(this);
91
this.state = "closed";
92
}
93
94
public enqueue(query: TableQuery): string {
95
if (this.state == "closed") {
96
throw Error("trying to enqueue after close");
97
}
98
const k = key(query);
99
this.queue[k] = query;
100
if (this.process_timer == null) {
101
this.dbg("enqueue")(`will process queue in ${this.interval_ms}ms...`);
102
this.process_timer = setTimeout(
103
this.process_queue.bind(this),
104
this.interval_ms
105
);
106
}
107
return k;
108
}
109
110
private async process_queue(): Promise<void> {
111
const dbg = this.dbg("process_queue");
112
delete this.process_timer; // it just fired
113
// First time we just doing them one at a time.
114
// FURTHER WORK: we could instead do ALL queries simultaneously as
115
// a single query, or at least do them in parallel...
116
117
// Make a copy of whatever is queued up at this moment in time,
118
// then clear the queue. We are responsible for handling all
119
// queries in the queue now, but nothing further. If queries are
120
// slow, it can easily be the case that process_queue is
121
// called several times at once. However, that's fine, since
122
// each call is responsible for only the part of the queue that
123
// existed when it was called.
124
const queue = copy(this.queue);
125
this.queue = {};
126
127
for (const k in queue) {
128
dbg(k);
129
const { select, where } = queue[k];
130
131
try {
132
const result = await callback(
133
one_query,
134
this.db,
135
select,
136
this.table,
137
where
138
);
139
if (this.state == "closed") return;
140
dbg("success", k);
141
this.emit(k, undefined, result);
142
} catch (err) {
143
if (this.state == "closed") return;
144
dbg("fail", k);
145
this.emit(k, err, undefined);
146
}
147
}
148
}
149
}
150
151
const throttled_table_queues: { [table: string]: ThrottledTableQueue } = {};
152
153
function throttled_table_queue(
154
db: PostgreSQL,
155
table: string,
156
interval_ms: number
157
): ThrottledTableQueue {
158
if (throttled_table_queues[table] != null) {
159
return throttled_table_queues[table];
160
}
161
return (throttled_table_queues[table] = new ThrottledTableQueue(
162
db,
163
table,
164
interval_ms
165
));
166
}
167
168
export async function query(opts: QueryOpts): Promise<any> {
169
if (THROTTLE && opts.one) {
170
const Table = throttled_table_queue(opts.db, opts.table, THROTTLE_MS);
171
const k: string = Table.enqueue({ select: opts.select, where: opts.where });
172
const [err, result] = await once(Table, k);
173
if (err != null) {
174
throw err;
175
}
176
return result;
177
}
178
return await callback(
179
opts.one ? one_query : all_query,
180
opts.db,
181
opts.select,
182
opts.table,
183
opts.where
184
);
185
}
186
187
function all_query(db, select, table, where, cb): void {
188
db._query({ select, table, where, cb: all_results(cb) });
189
}
190
191
function one_query(db, select, table, where, cb): void {
192
db._query({ select, table, where, cb: one_result(cb) });
193
}
194
195