CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/database/postgres/project-and-user-tracker.ts
Views: 687
1
/*
2
* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
/*
7
* decaffeinate suggestions:
8
* DS001: Remove Babel/TypeScript constructor workaround
9
* DS102: Remove unnecessary code created because of implicit returns
10
* DS103: Rewrite code to no longer use __guard__
11
* DS205: Consider reworking code to avoid use of IIFEs
12
* DS207: Consider shorter variations of null checks
13
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
14
*/
15
16
import { EventEmitter } from "events";
17
18
import { callback } from "awaiting";
19
import { callback2 } from "@cocalc/util/async-utils";
20
21
import { close, len } from "@cocalc/util/misc";
22
23
import { PostgreSQL, QueryOptions, QueryResult } from "./types";
24
25
import { ChangeEvent, Changes } from "./changefeed";
26
27
const { all_results } = require("../postgres-base");
28
29
type SetOfAccounts = { [account_id: string]: boolean };
30
type SetOfProjects = { [project_id: string]: boolean };
31
32
type State = "init" | "ready" | "closed";
33
34
export class ProjectAndUserTracker extends EventEmitter {
35
private state: State = "init";
36
37
private db: PostgreSQL;
38
39
private feed: Changes;
40
41
// by a "set" we mean map to boolean...
42
// set of accounts we care about
43
private accounts: SetOfAccounts = {};
44
45
// map from from project_id to set of users of a given project
46
private users: { [project_id: string]: SetOfAccounts } = {};
47
48
// map from account_id to set of projects of a given user
49
private projects: { [account_id: string]: SetOfProjects } = {};
50
51
// map from account_id to map from account_ids to *number* of
52
// projects the two users have in common.
53
private collabs: {
54
[account_id: string]: { [account_id: string]: number };
55
} = {};
56
57
private register_todo: { [account_id: string]: Function[] } = {};
58
59
// used for a runtime sanity check
60
private do_register_lock: boolean = false;
61
62
constructor(db: PostgreSQL) {
63
super();
64
this.db = db;
65
}
66
67
private assert_state(state: State, f: string): void {
68
if (this.state != state) {
69
throw Error(`${f}: state must be ${state} but it is ${this.state}`);
70
}
71
}
72
73
async init(): Promise<void> {
74
this.assert_state("init", "init");
75
const dbg = this.dbg("init");
76
dbg("Initializing Project and user tracker...");
77
78
// every changefeed for a user will result in a listener
79
// on an event on this one object.
80
this.setMaxListeners(1000);
81
82
try {
83
// create changefeed listening on changes to projects table
84
this.feed = await callback2(this.db.changefeed, {
85
table: "projects",
86
select: { project_id: "UUID" },
87
watch: ["users"],
88
where: {},
89
});
90
dbg("Success");
91
} catch (err) {
92
this.handle_error(err);
93
return;
94
}
95
this.feed.on("change", this.handle_change.bind(this));
96
this.feed.on("error", this.handle_error.bind(this));
97
this.feed.on("close", () => this.handle_error("changefeed closed"));
98
this.set_state("ready");
99
}
100
101
private dbg(f) {
102
return this.db._dbg(`Tracker.${f}`);
103
}
104
105
private handle_error(err) {
106
if (this.state == "closed") return;
107
// There was an error in the changefeed.
108
// Error is totally fatal, so we close up shop.
109
const dbg = this.dbg("handle_error");
110
dbg(`err='${err}'`);
111
this.emit("error", err);
112
this.close();
113
}
114
115
private set_state(state: State): void {
116
this.state = state;
117
this.emit(state);
118
}
119
120
close() {
121
if (this.state == "closed") {
122
return;
123
}
124
this.set_state("closed");
125
this.removeAllListeners();
126
if (this.feed != null) {
127
this.feed.close();
128
}
129
if (this.register_todo != null) {
130
// clear any outstanding callbacks
131
for (const account_id in this.register_todo) {
132
const callbacks = this.register_todo[account_id];
133
if (callbacks != null) {
134
for (const cb of callbacks) {
135
cb("closed - project-and-user-tracker");
136
}
137
}
138
}
139
}
140
close(this);
141
this.state = "closed";
142
}
143
144
private handle_change_delete(old_val): void {
145
this.assert_state("ready", "handle_change_delete");
146
const { project_id } = old_val;
147
if (this.users[project_id] == null) {
148
// no users, so nothing to worry about.
149
return;
150
}
151
for (const account_id in this.users[project_id]) {
152
this.remove_user_from_project(account_id, project_id);
153
}
154
return;
155
}
156
157
private handle_change(x: ChangeEvent): void {
158
this.assert_state("ready", "handle_change");
159
if (x.action === "delete") {
160
if (x.old_val == null) return; // should never happen
161
this.handle_change_delete(x.old_val);
162
} else {
163
if (x.new_val == null) return; // should never happen
164
this.handle_change_update(x.new_val);
165
}
166
}
167
168
private async handle_change_update(new_val): Promise<void> {
169
this.assert_state("ready", "handle_change_update");
170
const dbg = this.dbg("handle_change_update");
171
dbg(new_val);
172
// users on a project changed or project created
173
const { project_id } = new_val;
174
let users: QueryResult<{ account_id: string }>[];
175
try {
176
users = await query<{ account_id: string }>(this.db, {
177
query: "SELECT jsonb_object_keys(users) AS account_id FROM projects",
178
where: { "project_id = $::UUID": project_id },
179
});
180
} catch (err) {
181
this.handle_error(err);
182
return;
183
}
184
if (this.users[project_id] == null) {
185
// we are not already watching this project
186
let any = false;
187
for (const { account_id } of users) {
188
if (this.accounts[account_id]) {
189
any = true;
190
break;
191
}
192
}
193
if (!any) {
194
// *and* none of our tracked users are on this project... so don't care
195
return;
196
}
197
}
198
199
// first add any users who got added, and record which accounts are relevant
200
const users_now: SetOfAccounts = {};
201
for (const { account_id } of users) {
202
users_now[account_id] = true;
203
}
204
const users_before: SetOfAccounts =
205
this.users[project_id] != null ? this.users[project_id] : {};
206
for (const account_id in users_now) {
207
if (!users_before[account_id]) {
208
this.add_user_to_project(account_id, project_id);
209
}
210
}
211
for (const account_id in users_before) {
212
if (!users_now[account_id]) {
213
this.remove_user_from_project(account_id, project_id);
214
}
215
}
216
}
217
218
// add and remove user from a project, maintaining our data structures
219
private add_user_to_project(account_id: string, project_id: string): void {
220
this.assert_state("ready", "add_user_to_project");
221
if (
222
this.projects[account_id] != null &&
223
this.projects[account_id][project_id]
224
) {
225
// already added
226
return;
227
}
228
this.emit(`add_user_to_project-${account_id}`, project_id);
229
if (this.users[project_id] == null) {
230
this.users[project_id] = {};
231
}
232
const users = this.users[project_id];
233
users[account_id] = true;
234
235
if (this.projects[account_id] == null) {
236
this.projects[account_id] = {};
237
}
238
const projects = this.projects[account_id];
239
projects[project_id] = true;
240
241
if (this.collabs[account_id] == null) {
242
this.collabs[account_id] = {};
243
}
244
const collabs = this.collabs[account_id];
245
246
for (const other_account_id in users) {
247
if (collabs[other_account_id] != null) {
248
collabs[other_account_id] += 1;
249
} else {
250
collabs[other_account_id] = 1;
251
this.emit(`add_collaborator-${account_id}`, other_account_id);
252
}
253
const other_collabs = this.collabs[other_account_id];
254
if (other_collabs[account_id] != null) {
255
other_collabs[account_id] += 1;
256
} else {
257
other_collabs[account_id] = 1;
258
this.emit(`add_collaborator-${other_account_id}`, account_id);
259
}
260
}
261
}
262
263
private remove_user_from_project(
264
account_id: string,
265
project_id: string,
266
no_emit: boolean = false,
267
): void {
268
this.assert_state("ready", "remove_user_from_project");
269
if (
270
(account_id != null ? account_id.length : undefined) !== 36 ||
271
(project_id != null ? project_id.length : undefined) !== 36
272
) {
273
throw Error("invalid account_id or project_id");
274
}
275
if (
276
!(this.projects[account_id] != null
277
? this.projects[account_id][project_id]
278
: undefined)
279
) {
280
return;
281
}
282
if (!no_emit) {
283
this.emit(`remove_user_from_project-${account_id}`, project_id);
284
}
285
if (this.collabs[account_id] == null) {
286
this.collabs[account_id] = {};
287
}
288
for (const other_account_id in this.users[project_id]) {
289
this.collabs[account_id][other_account_id] -= 1;
290
if (this.collabs[account_id][other_account_id] === 0) {
291
delete this.collabs[account_id][other_account_id];
292
if (!no_emit) {
293
this.emit(`remove_collaborator-${account_id}`, other_account_id);
294
}
295
}
296
this.collabs[other_account_id][account_id] -= 1;
297
if (this.collabs[other_account_id][account_id] === 0) {
298
delete this.collabs[other_account_id][account_id];
299
if (!no_emit) {
300
this.emit(`remove_collaborator-${other_account_id}`, account_id);
301
}
302
}
303
}
304
delete this.users[project_id][account_id];
305
delete this.projects[account_id][project_id];
306
}
307
308
// Register the given account so that this client watches the database
309
// in order to be aware of all projects and collaborators of the
310
// given account.
311
public async register(account_id: string): Promise<void> {
312
await callback(this.register_cb.bind(this), account_id);
313
}
314
315
private register_cb(account_id: string, cb: Function): void {
316
if (this.state == "closed") return;
317
const dbg = this.dbg(`register(account_id="${account_id}"`);
318
if (this.accounts[account_id] != null) {
319
dbg(
320
`already registered -- listener counts ${JSON.stringify(
321
this.listener_counts(account_id),
322
)}`,
323
);
324
cb();
325
return;
326
}
327
if (len(this.register_todo) === 0) {
328
// no registration is currently happening
329
this.register_todo[account_id] = [cb];
330
// kick things off -- this will keep registering accounts
331
// until everything is done, then this.register_todo will have length 0.
332
this.do_register();
333
} else {
334
// Accounts are being registered right now. Add to the todo list.
335
const v = this.register_todo[account_id];
336
if (v != null) {
337
v.push(cb);
338
} else {
339
this.register_todo[account_id] = [cb];
340
}
341
}
342
}
343
344
// Call do_register_work to completely clear the work
345
// this.register_todo work queue.
346
// NOTE: do_register_work does each account, *one after another*,
347
// rather than doing everything in parallel. WARNING: DO NOT
348
// rewrite this to do everything in parallel, unless you think you
349
// thoroughly understand the algorithm, since I think
350
// doing things in parallel would horribly break!
351
private async do_register(): Promise<void> {
352
if (this.state != "ready") return; // maybe shutting down.
353
354
// This gets a single account_id, if there are any:
355
let account_id: string | undefined = undefined;
356
for (account_id in this.register_todo) break;
357
if (account_id == null) return; // nothing to do.
358
359
const dbg = this.dbg(`do_register(account_id="${account_id}")`);
360
dbg("registering account");
361
if (this.do_register_lock)
362
throw Error("do_register MUST NOT be called twice at once!");
363
this.do_register_lock = true;
364
try {
365
// Register this account
366
let projects: QueryResult[];
367
try {
368
// 2021-05-10: one user has a really large number of projects, which causes the hub to crash
369
// TODO: fix this ORDER BY .. LIMIT .. part properly
370
projects = await query(this.db, {
371
query:
372
"SELECT project_id, json_agg(o) as users FROM (SELECT project_id, jsonb_object_keys(users) AS o FROM projects WHERE users ? $1::TEXT ORDER BY last_edited DESC LIMIT 10000) s group by s.project_id",
373
params: [account_id],
374
});
375
} catch (err) {
376
const e = `error registering '${account_id}' -- err=${err}`;
377
dbg(e);
378
this.handle_error(e); // it is game over.
379
return;
380
}
381
382
// we care about this account_id
383
this.accounts[account_id] = true;
384
385
dbg("now adding all users to project tracker -- start");
386
for (const project of projects) {
387
if (this.users[project.project_id] != null) {
388
// already have data about this project
389
continue;
390
} else {
391
for (const collab_account_id of project.users) {
392
if (collab_account_id == null) {
393
continue; // just skip; evidently rarely this isn't defined, maybe due to db error?
394
}
395
this.add_user_to_project(collab_account_id, project.project_id);
396
}
397
}
398
}
399
dbg("successfully registered -- stop");
400
401
// call the callbacks
402
const callbacks = this.register_todo[account_id];
403
if (callbacks != null) {
404
for (const cb of callbacks) {
405
cb();
406
}
407
// We are done (trying to) register account_id.
408
delete this.register_todo[account_id];
409
}
410
} finally {
411
this.do_register_lock = false;
412
}
413
if (len(this.register_todo) > 0) {
414
// Deal with next account that needs to be registered
415
this.do_register();
416
}
417
}
418
419
// TODO: not actually used by any client yet... but obviously it should
420
// be since this would be a work/memory leak, right?
421
public unregister(account_id: string): void {
422
if (this.state == "closed") return;
423
if (!this.accounts[account_id]) return; // nothing to do
424
425
const v: string[] = [];
426
for (const project_id in this.projects[account_id]) {
427
v.push(project_id);
428
}
429
delete this.accounts[account_id];
430
431
// Forget about any projects they account_id is on that are no longer
432
// necessary to watch...
433
for (const project_id of v) {
434
let need: boolean = false;
435
for (const other_account_id in this.users[project_id]) {
436
if (this.accounts[other_account_id] != null) {
437
need = true;
438
break;
439
}
440
}
441
if (!need) {
442
for (const other_account_id in this.users[project_id]) {
443
this.remove_user_from_project(other_account_id, project_id, true);
444
}
445
delete this.users[project_id];
446
}
447
}
448
}
449
450
// Return *set* of projects that this user is a collaborator on
451
public get_projects(account_id: string): { [project_id: string]: boolean } {
452
if (this.state == "closed") return {};
453
if (!this.accounts[account_id]) {
454
// This should never happen, but very rarely it DOES. I do not know why, having studied the
455
// code. But when it does, just raising an exception blows up the server really badly.
456
// So for now we just async register the account, return that it is not a collaborator
457
// on anything. Then some query will fail, get tried again, and work since registration will
458
// have finished.
459
//throw Error("account (='#{account_id}') must be registered")
460
this.register(account_id);
461
return {};
462
}
463
return this.projects[account_id] != null ? this.projects[account_id] : {};
464
}
465
466
// map from collabs of account_id to number of projects they collab
467
// on (account_id itself counted twice)
468
public get_collabs(account_id: string): { [account_id: string]: number } {
469
if (this.state == "closed") return {};
470
return this.collabs[account_id] != null ? this.collabs[account_id] : {};
471
}
472
473
private listener_counts(account_id: string): object {
474
const x: any = {};
475
for (const e of [
476
"add_user_to_project",
477
"remove_user_from_project",
478
"add_collaborator",
479
"remove_collaborator",
480
]) {
481
const event = e + "-" + account_id;
482
x[event] = this.listenerCount(event);
483
}
484
return x;
485
}
486
}
487
488
function all_query(db: PostgreSQL, opts: QueryOptions, cb: Function): void {
489
if (opts == null) {
490
throw Error("opts must not be null");
491
}
492
opts.cb = all_results(cb);
493
db._query(opts);
494
}
495
496
async function query<T>(
497
db: PostgreSQL,
498
opts: QueryOptions,
499
): Promise<QueryResult<T>[]> {
500
return await callback(all_query, db, opts);
501
}
502
503