Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Path: blob/master/src/packages/database/postgres/project-and-user-tracker.ts
Views: 687
/*1* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.2* License: MS-RSL – see LICENSE.md for details3*/45/*6* decaffeinate suggestions:7* DS001: Remove Babel/TypeScript constructor workaround8* DS102: Remove unnecessary code created because of implicit returns9* DS103: Rewrite code to no longer use __guard__10* DS205: Consider reworking code to avoid use of IIFEs11* DS207: Consider shorter variations of null checks12* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md13*/1415import { EventEmitter } from "events";1617import { callback } from "awaiting";18import { callback2 } from "@cocalc/util/async-utils";1920import { close, len } from "@cocalc/util/misc";2122import { PostgreSQL, QueryOptions, QueryResult } from "./types";2324import { ChangeEvent, Changes } from "./changefeed";2526const { all_results } = require("../postgres-base");2728type SetOfAccounts = { [account_id: string]: boolean };29type SetOfProjects = { [project_id: string]: boolean };3031type State = "init" | "ready" | "closed";3233export class ProjectAndUserTracker extends EventEmitter {34private state: State = "init";3536private db: PostgreSQL;3738private feed: Changes;3940// by a "set" we mean map to boolean...41// set of accounts we care about42private accounts: SetOfAccounts = {};4344// map from from project_id to set of users of a given project45private users: { [project_id: string]: SetOfAccounts } = {};4647// map from account_id to set of projects of a given user48private projects: { [account_id: string]: SetOfProjects } = {};4950// map from account_id to map from account_ids to *number* of51// projects the two users have in common.52private collabs: {53[account_id: string]: { [account_id: string]: number };54} = {};5556private register_todo: { [account_id: string]: Function[] } = {};5758// used for a runtime sanity check59private do_register_lock: boolean = false;6061constructor(db: PostgreSQL) {62super();63this.db = db;64}6566private assert_state(state: State, f: string): void {67if (this.state != state) {68throw Error(`${f}: state must be ${state} but it is ${this.state}`);69}70}7172async init(): Promise<void> {73this.assert_state("init", "init");74const dbg = this.dbg("init");75dbg("Initializing Project and user tracker...");7677// every changefeed for a user will result in a listener78// on an event on this one object.79this.setMaxListeners(1000);8081try {82// create changefeed listening on changes to projects table83this.feed = await callback2(this.db.changefeed, {84table: "projects",85select: { project_id: "UUID" },86watch: ["users"],87where: {},88});89dbg("Success");90} catch (err) {91this.handle_error(err);92return;93}94this.feed.on("change", this.handle_change.bind(this));95this.feed.on("error", this.handle_error.bind(this));96this.feed.on("close", () => this.handle_error("changefeed closed"));97this.set_state("ready");98}99100private dbg(f) {101return this.db._dbg(`Tracker.${f}`);102}103104private handle_error(err) {105if (this.state == "closed") return;106// There was an error in the changefeed.107// Error is totally fatal, so we close up shop.108const dbg = this.dbg("handle_error");109dbg(`err='${err}'`);110this.emit("error", err);111this.close();112}113114private set_state(state: State): void {115this.state = state;116this.emit(state);117}118119close() {120if (this.state == "closed") {121return;122}123this.set_state("closed");124this.removeAllListeners();125if (this.feed != null) {126this.feed.close();127}128if (this.register_todo != null) {129// clear any outstanding callbacks130for (const account_id in this.register_todo) {131const callbacks = this.register_todo[account_id];132if (callbacks != null) {133for (const cb of callbacks) {134cb("closed - project-and-user-tracker");135}136}137}138}139close(this);140this.state = "closed";141}142143private handle_change_delete(old_val): void {144this.assert_state("ready", "handle_change_delete");145const { project_id } = old_val;146if (this.users[project_id] == null) {147// no users, so nothing to worry about.148return;149}150for (const account_id in this.users[project_id]) {151this.remove_user_from_project(account_id, project_id);152}153return;154}155156private handle_change(x: ChangeEvent): void {157this.assert_state("ready", "handle_change");158if (x.action === "delete") {159if (x.old_val == null) return; // should never happen160this.handle_change_delete(x.old_val);161} else {162if (x.new_val == null) return; // should never happen163this.handle_change_update(x.new_val);164}165}166167private async handle_change_update(new_val): Promise<void> {168this.assert_state("ready", "handle_change_update");169const dbg = this.dbg("handle_change_update");170dbg(new_val);171// users on a project changed or project created172const { project_id } = new_val;173let users: QueryResult<{ account_id: string }>[];174try {175users = await query<{ account_id: string }>(this.db, {176query: "SELECT jsonb_object_keys(users) AS account_id FROM projects",177where: { "project_id = $::UUID": project_id },178});179} catch (err) {180this.handle_error(err);181return;182}183if (this.users[project_id] == null) {184// we are not already watching this project185let any = false;186for (const { account_id } of users) {187if (this.accounts[account_id]) {188any = true;189break;190}191}192if (!any) {193// *and* none of our tracked users are on this project... so don't care194return;195}196}197198// first add any users who got added, and record which accounts are relevant199const users_now: SetOfAccounts = {};200for (const { account_id } of users) {201users_now[account_id] = true;202}203const users_before: SetOfAccounts =204this.users[project_id] != null ? this.users[project_id] : {};205for (const account_id in users_now) {206if (!users_before[account_id]) {207this.add_user_to_project(account_id, project_id);208}209}210for (const account_id in users_before) {211if (!users_now[account_id]) {212this.remove_user_from_project(account_id, project_id);213}214}215}216217// add and remove user from a project, maintaining our data structures218private add_user_to_project(account_id: string, project_id: string): void {219this.assert_state("ready", "add_user_to_project");220if (221this.projects[account_id] != null &&222this.projects[account_id][project_id]223) {224// already added225return;226}227this.emit(`add_user_to_project-${account_id}`, project_id);228if (this.users[project_id] == null) {229this.users[project_id] = {};230}231const users = this.users[project_id];232users[account_id] = true;233234if (this.projects[account_id] == null) {235this.projects[account_id] = {};236}237const projects = this.projects[account_id];238projects[project_id] = true;239240if (this.collabs[account_id] == null) {241this.collabs[account_id] = {};242}243const collabs = this.collabs[account_id];244245for (const other_account_id in users) {246if (collabs[other_account_id] != null) {247collabs[other_account_id] += 1;248} else {249collabs[other_account_id] = 1;250this.emit(`add_collaborator-${account_id}`, other_account_id);251}252const other_collabs = this.collabs[other_account_id];253if (other_collabs[account_id] != null) {254other_collabs[account_id] += 1;255} else {256other_collabs[account_id] = 1;257this.emit(`add_collaborator-${other_account_id}`, account_id);258}259}260}261262private remove_user_from_project(263account_id: string,264project_id: string,265no_emit: boolean = false,266): void {267this.assert_state("ready", "remove_user_from_project");268if (269(account_id != null ? account_id.length : undefined) !== 36 ||270(project_id != null ? project_id.length : undefined) !== 36271) {272throw Error("invalid account_id or project_id");273}274if (275!(this.projects[account_id] != null276? this.projects[account_id][project_id]277: undefined)278) {279return;280}281if (!no_emit) {282this.emit(`remove_user_from_project-${account_id}`, project_id);283}284if (this.collabs[account_id] == null) {285this.collabs[account_id] = {};286}287for (const other_account_id in this.users[project_id]) {288this.collabs[account_id][other_account_id] -= 1;289if (this.collabs[account_id][other_account_id] === 0) {290delete this.collabs[account_id][other_account_id];291if (!no_emit) {292this.emit(`remove_collaborator-${account_id}`, other_account_id);293}294}295this.collabs[other_account_id][account_id] -= 1;296if (this.collabs[other_account_id][account_id] === 0) {297delete this.collabs[other_account_id][account_id];298if (!no_emit) {299this.emit(`remove_collaborator-${other_account_id}`, account_id);300}301}302}303delete this.users[project_id][account_id];304delete this.projects[account_id][project_id];305}306307// Register the given account so that this client watches the database308// in order to be aware of all projects and collaborators of the309// given account.310public async register(account_id: string): Promise<void> {311await callback(this.register_cb.bind(this), account_id);312}313314private register_cb(account_id: string, cb: Function): void {315if (this.state == "closed") return;316const dbg = this.dbg(`register(account_id="${account_id}"`);317if (this.accounts[account_id] != null) {318dbg(319`already registered -- listener counts ${JSON.stringify(320this.listener_counts(account_id),321)}`,322);323cb();324return;325}326if (len(this.register_todo) === 0) {327// no registration is currently happening328this.register_todo[account_id] = [cb];329// kick things off -- this will keep registering accounts330// until everything is done, then this.register_todo will have length 0.331this.do_register();332} else {333// Accounts are being registered right now. Add to the todo list.334const v = this.register_todo[account_id];335if (v != null) {336v.push(cb);337} else {338this.register_todo[account_id] = [cb];339}340}341}342343// Call do_register_work to completely clear the work344// this.register_todo work queue.345// NOTE: do_register_work does each account, *one after another*,346// rather than doing everything in parallel. WARNING: DO NOT347// rewrite this to do everything in parallel, unless you think you348// thoroughly understand the algorithm, since I think349// doing things in parallel would horribly break!350private async do_register(): Promise<void> {351if (this.state != "ready") return; // maybe shutting down.352353// This gets a single account_id, if there are any:354let account_id: string | undefined = undefined;355for (account_id in this.register_todo) break;356if (account_id == null) return; // nothing to do.357358const dbg = this.dbg(`do_register(account_id="${account_id}")`);359dbg("registering account");360if (this.do_register_lock)361throw Error("do_register MUST NOT be called twice at once!");362this.do_register_lock = true;363try {364// Register this account365let projects: QueryResult[];366try {367// 2021-05-10: one user has a really large number of projects, which causes the hub to crash368// TODO: fix this ORDER BY .. LIMIT .. part properly369projects = await query(this.db, {370query:371"SELECT project_id, json_agg(o) as users FROM (SELECT project_id, jsonb_object_keys(users) AS o FROM projects WHERE users ? $1::TEXT ORDER BY last_edited DESC LIMIT 10000) s group by s.project_id",372params: [account_id],373});374} catch (err) {375const e = `error registering '${account_id}' -- err=${err}`;376dbg(e);377this.handle_error(e); // it is game over.378return;379}380381// we care about this account_id382this.accounts[account_id] = true;383384dbg("now adding all users to project tracker -- start");385for (const project of projects) {386if (this.users[project.project_id] != null) {387// already have data about this project388continue;389} else {390for (const collab_account_id of project.users) {391if (collab_account_id == null) {392continue; // just skip; evidently rarely this isn't defined, maybe due to db error?393}394this.add_user_to_project(collab_account_id, project.project_id);395}396}397}398dbg("successfully registered -- stop");399400// call the callbacks401const callbacks = this.register_todo[account_id];402if (callbacks != null) {403for (const cb of callbacks) {404cb();405}406// We are done (trying to) register account_id.407delete this.register_todo[account_id];408}409} finally {410this.do_register_lock = false;411}412if (len(this.register_todo) > 0) {413// Deal with next account that needs to be registered414this.do_register();415}416}417418// TODO: not actually used by any client yet... but obviously it should419// be since this would be a work/memory leak, right?420public unregister(account_id: string): void {421if (this.state == "closed") return;422if (!this.accounts[account_id]) return; // nothing to do423424const v: string[] = [];425for (const project_id in this.projects[account_id]) {426v.push(project_id);427}428delete this.accounts[account_id];429430// Forget about any projects they account_id is on that are no longer431// necessary to watch...432for (const project_id of v) {433let need: boolean = false;434for (const other_account_id in this.users[project_id]) {435if (this.accounts[other_account_id] != null) {436need = true;437break;438}439}440if (!need) {441for (const other_account_id in this.users[project_id]) {442this.remove_user_from_project(other_account_id, project_id, true);443}444delete this.users[project_id];445}446}447}448449// Return *set* of projects that this user is a collaborator on450public get_projects(account_id: string): { [project_id: string]: boolean } {451if (this.state == "closed") return {};452if (!this.accounts[account_id]) {453// This should never happen, but very rarely it DOES. I do not know why, having studied the454// code. But when it does, just raising an exception blows up the server really badly.455// So for now we just async register the account, return that it is not a collaborator456// on anything. Then some query will fail, get tried again, and work since registration will457// have finished.458//throw Error("account (='#{account_id}') must be registered")459this.register(account_id);460return {};461}462return this.projects[account_id] != null ? this.projects[account_id] : {};463}464465// map from collabs of account_id to number of projects they collab466// on (account_id itself counted twice)467public get_collabs(account_id: string): { [account_id: string]: number } {468if (this.state == "closed") return {};469return this.collabs[account_id] != null ? this.collabs[account_id] : {};470}471472private listener_counts(account_id: string): object {473const x: any = {};474for (const e of [475"add_user_to_project",476"remove_user_from_project",477"add_collaborator",478"remove_collaborator",479]) {480const event = e + "-" + account_id;481x[event] = this.listenerCount(event);482}483return x;484}485}486487function all_query(db: PostgreSQL, opts: QueryOptions, cb: Function): void {488if (opts == null) {489throw Error("opts must not be null");490}491opts.cb = all_results(cb);492db._query(opts);493}494495async function query<T>(496db: PostgreSQL,497opts: QueryOptions,498): Promise<QueryResult<T>[]> {499return await callback(all_query, db, opts);500}501502503