Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Path: blob/master/src/packages/project/sync/server.ts
Views: 687
/*1* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.2* License: MS-RSL – see LICENSE.md for details3*/45/*6SyncTable server channel -- used for supporting realtime sync7between project and browser client.89TODO:1011- [ ] If initial query fails, need to raise exception. Right now it gets12silently swallowed in persistent mode...13*/1415// How long to wait from when we hit 0 clients until closing this channel.16// Making this short saves memory and cpu.17// Making it longer reduces the potential time to open a file, e.g., if you18// disconnect then reconnect, e.g., by refreshing your browser.19// Related to https://github.com/sagemathinc/cocalc/issues/562720// and https://github.com/sagemathinc/cocalc/issues/582321// and https://github.com/sagemathinc/cocalc/issues/56172223// This is a hard upper bound on the number of browser sessions that could24// have the same file open at once. We put some limit on it, to at least25// limit problems from bugs which crash projects (since each connection uses26// memory, and it adds up). Some customers want 100+ simultaneous users,27// so don't set this too low (except for dev)!28const MAX_CONNECTIONS = 500;2930// The frontend client code *should* prevent many connections, but some31// old broken clients may not work properly. This must be at least 2,32// since we can have two clients for a given channel at once if a file is33// being closed still, while it is reopened (e.g., when user does this:34// disconnect, change, close, open, reconnect). Also, this setting prevents35// some potentially malicious conduct, and also possible new clients with bugs.36// It is VERY important that this not be too small, since there is often37// a delay/timeout before a channel is properly closed.38const MAX_CONNECTIONS_FROM_ONE_CLIENT = 10;3940import {41synctable_no_changefeed,42synctable_no_database,43SyncTable,44VersionedChange,45set_debug,46} from "@cocalc/sync/table";4748// Only uncomment this for an intense level of debugging.49// set_debug(true);50// @ts-ignore -- typescript nonsense.51const _ = set_debug;5253import { init_syncdoc, getSyncDocFromSyncTable } from "./sync-doc";54import { key, register_synctable } from "./open-synctables";55import { reuseInFlight } from "@cocalc/util/reuse-in-flight";56import { once } from "@cocalc/util/async-utils";57import { delay } from "awaiting";58import { close, deep_copy, len } from "@cocalc/util/misc";59import { registerListingsTable } from "./listings";60import { register_project_info_table } from "./project-info";61import { register_project_status_table } from "./project-status";62import { register_usage_info_table } from "./usage-info";63import type { MergeType } from "@cocalc/sync/table/synctable";64import Client from "@cocalc/sync-client";65import { getJupyterRedux } from "@cocalc/jupyter/kernel";6667type Query = { [key: string]: any };6869interface Spark {70address: { ip: string };71id: string;72conn: {73id: string;74write: (obj: any) => boolean;75once: (str: string, fn: Function) => void;76on: (str: string, fn: Function) => void;77writable: boolean;78};79write: (obj: any) => boolean;80end: (...args) => void;81on: (str: string, fn: Function) => void;82}8384interface Channel {85write: (obj: any) => boolean;86on: (str: string, fn: Function) => void;87forEach: (fn: Function) => void;88destroy: Function;89}9091interface Primus {92channel: (str: string) => Channel;93}9495interface Logger {96debug: Function;97}9899import stringify from "json-stable-stringify";100import { sha1 } from "@cocalc/backend/sha1";101102const COCALC_EPHEMERAL_STATE: boolean =103process.env.COCALC_EPHEMERAL_STATE === "yes";104105class SyncTableChannel {106private synctable: SyncTable;107private client: Client;108private logger: Logger;109public readonly name: string;110private query: Query;111private options: any[] = [];112private query_string: string;113private channel: Channel;114private closed: boolean = false;115private closing: boolean = false;116private setOnDisconnect: {117[spark_id: string]: { changes: any; merge: MergeType }[];118} = {};119private num_connections: { n: number; changed: Date } = {120n: 0,121changed: new Date(),122};123124// If true, do not use a database at all, even on the backend.125// Table is reset any time this object is created. This is126// useful, e.g., for tracking user cursor locations or other127// ephemeral state.128private ephemeral: boolean = false;129130// If true, do not close even if all clients have disconnected.131// This is used to keep sessions running, even when all browsers132// have closed, e.g., state for Sage worksheets, jupyter133// notebooks, etc., where user may want to close their browser134// (or just drop a connection temporarily) while a persistent stateful135// session continues running.136private persistent: boolean = false;137138private connections_from_one_client: { [id: string]: number } = {};139140constructor({141client,142primus,143query,144options,145logger,146name,147}: {148client: Client;149primus: Primus;150name: string;151query: Query;152options: any;153logger: Logger;154}) {155this.name = name;156this.client = client;157this.logger = logger;158this.query = query;159this.init_options(options);160if (COCALC_EPHEMERAL_STATE) {161// No matter what, we set ephemeral true when162// this env var is set, since all db access163// will be denied anyways.164this.ephemeral = true;165}166this.query_string = stringify(query); // used only for logging167this.channel = primus.channel(this.name);168this.log(169`creating new sync channel (persistent=${this.persistent}, ephemeral=${this.ephemeral})`,170);171}172173public async init(): Promise<void> {174this.init_handlers();175await this.init_synctable();176}177178private init_options(options): void {179if (options == null) {180return;181}182for (const option of deep_copy(options)) {183// deep_copy so do not mutate input options.184if (typeof option != "object" || option == null) {185throw Error("invalid options");186}187for (const x of ["ephemeral", "persistent"]) {188// options that are only for project websocket tables.189if (option[x] != null) {190this[x] = option[x];191delete option[x];192}193}194if (len(option) > 0) {195// remaining synctable/database options.196this.options.push(option);197}198}199}200201private log(...args): void {202if (this.logger == null) return;203this.logger.debug(204`SyncTableChannel('${this.name}', '${this.query_string}'${205this.closed ? ",CLOSED" : ""206}): `,207...args,208);209}210211private init_handlers(): void {212this.log("init_handlers");213this.channel.on("connection", this.new_connection.bind(this));214this.channel.on("disconnection", this.end_connection.bind(this));215}216217private async init_synctable(): Promise<void> {218this.log("init_synctable");219let create_synctable: Function;220if (this.ephemeral) {221this.log("init_synctable -- ephemeral (no database)");222create_synctable = synctable_no_database;223} else {224this.log("init_synctable -- persistent (but no changefeeds)");225create_synctable = synctable_no_changefeed;226}227this.synctable = create_synctable(this.query, this.options, this.client);228229// if the synctable closes, then the channel should also close.230// I think this should happen, e.g., when we "close and halt"231// a jupyter notebook, which closes the synctable, triggering this.232this.synctable.once("closed", this.close.bind(this));233234if (this.query[this.synctable.get_table()][0].string_id != null) {235register_synctable(this.query, this.synctable);236}237if (this.synctable.table === "syncstrings") {238this.log("init_synctable -- syncstrings: also initialize syncdoc...");239init_syncdoc(this.client, this.synctable);240}241242this.synctable.on(243"versioned-changes",244this.send_versioned_changes_to_browsers.bind(this),245);246247this.log("created synctable -- waiting for connected state");248await once(this.synctable, "connected");249this.log("created synctable -- now connected");250251// broadcast synctable content to all connected clients.252this.broadcast_synctable_to_browsers();253}254255private increment_connection_count(spark: Spark): number {256// account for new connection from this particular client.257let m: undefined | number = this.connections_from_one_client[spark.conn.id];258if (m === undefined) m = 0;259return (this.connections_from_one_client[spark.conn.id] = m + 1);260}261262private decrement_connection_count(spark: Spark): number {263const m: undefined | number =264this.connections_from_one_client[spark.conn.id];265if (m === undefined) {266return 0;267}268return (this.connections_from_one_client[spark.conn.id] = Math.max(2690,270m - 1,271));272}273274private async new_connection(spark: Spark): Promise<void> {275// Now handle the connection276const n = this.num_connections.n + 1;277this.num_connections = { n, changed: new Date() };278279// account for new connection from this particular client.280const m = this.increment_connection_count(spark);281282this.log(283`new connection from (address=${spark.address.ip}, conn=${spark.conn.id}) -- ${spark.id} -- num_connections = ${n} (from this client = ${m})`,284);285286if (m > MAX_CONNECTIONS_FROM_ONE_CLIENT) {287const error = `Too many connections (${m} > ${MAX_CONNECTIONS_FROM_ONE_CLIENT}) from this client. You might need to refresh your browser.`;288this.log(289`${error} Waiting 15s, then killing new connection from ${spark.id}...`,290);291await delay(15000); // minimize impact of client trying again, which it should do...292this.decrement_connection_count(spark);293spark.end({ error });294return;295}296297if (n > MAX_CONNECTIONS) {298const error = `Too many connections (${n} > ${MAX_CONNECTIONS})`;299this.log(300`${error} Waiting 5s, then killing new connection from ${spark.id}`,301);302await delay(5000); // minimize impact of client trying again, which it should do303this.decrement_connection_count(spark);304spark.end({ error });305return;306}307308if (this.closed) {309this.log(`table closed: killing new connection from ${spark.id}`);310this.decrement_connection_count(spark);311spark.end();312return;313}314if (this.synctable != null && this.synctable.get_state() == "closed") {315this.log(`table state closed: killing new connection from ${spark.id}`);316this.decrement_connection_count(spark);317spark.end();318return;319}320if (321this.synctable != null &&322this.synctable.get_state() == "disconnected"323) {324// Because synctable is being initialized for the first time,325// or it temporarily disconnected (e.g., lost hub), and is326// trying to reconnect. So just wait for it to connect.327await once(this.synctable, "connected");328}329330// Now that table is connected, we can send initial mesg to browser331// with table state.332this.send_synctable_to_browser(spark);333334spark.on("data", async (mesg) => {335try {336await this.handle_mesg_from_browser(spark, mesg);337} catch (err) {338spark.write({ error: `error handling mesg -- ${err}` });339this.log("error handling mesg -- ", err, err.stack);340}341});342}343344private async end_connection(spark: Spark): Promise<void> {345// This should never go below 0 (that would be a bug), but let's346// just ewnsure it doesn't since if it did that would weirdly break347// things for users as the table would keep trying to close.348const n = Math.max(0, this.num_connections.n - 1);349this.num_connections = { n, changed: new Date() };350351const m = this.decrement_connection_count(spark);352this.log(353`spark event -- end connection ${spark.address.ip} -- ${spark.id} -- num_connections = ${n} (from this client = ${m})`,354);355356if (!this.closed) {357try {358const x = this.setOnDisconnect[spark.id];359this.log("do setOnDisconnect", x);360if (x != null) {361for (const { changes, merge } of x) {362this.synctable.set(changes, merge);363}364delete this.setOnDisconnect[spark.id];365}366} catch (err) {367this.log("setOnDisconnect error", err);368}369}370371this.check_if_should_save_or_close();372}373374private send_synctable_to_browser(spark: Spark): void {375if (this.closed || this.closing || this.synctable == null) return;376this.log("send_synctable_to_browser");377spark.write({ init: this.synctable.initial_version_for_browser_client() });378}379380private broadcast_synctable_to_browsers(): void {381if (this.closed || this.closing || this.synctable == null) return;382this.log("broadcast_synctable_to_browsers");383const x = { init: this.synctable.initial_version_for_browser_client() };384this.channel.write(x);385}386387/* This is called when a user disconnects. This always triggers a save to388disk. It may also trigger closing the file in some cases. */389private async check_if_should_save_or_close() {390if (this.closed) {391// don't bother if either already closed392return;393}394this.log("check_if_should_save_or_close: save to disk if possible");395try {396await this.save_if_possible();397} catch (err) {398// the name "save if possible" suggests this should be non-fatal.399this.log(400"check_if_should_save_or_close: WARNING: unable to save -- ",401err,402);403}404const { n } = this.num_connections ?? {};405this.log("check_if_should_save_or_close", { n });406if (!this.persistent && n === 0) {407this.log("check_if_should_save_or_close: close if possible");408await this.close_if_possible();409}410}411412private async handle_mesg_from_browser(413spark: Spark,414mesg: any,415): Promise<void> {416// do not log the actual mesg, since it can be huge and make the logfile dozens of MB.417// Temporarily enable as needed for debugging purposes.418//this.log("handle_mesg_from_browser ", { mesg });419if (this.closed) {420throw Error("received mesg from browser AFTER close");421}422if (mesg == null) {423throw Error("mesg must not be null");424}425if (mesg.event == "set-on-disconnect") {426this.log("set-on-disconnect", mesg, spark.id);427if (this.setOnDisconnect[spark.id] == null) {428this.setOnDisconnect[spark.id] = [];429}430this.setOnDisconnect[spark.id].push(mesg);431return;432}433if (mesg.event == "message") {434// generic messages from any client to the project can be435// handled by backend code by listening for message events.436this.synctable.emit("message", {437data: mesg.data,438spark,439channel: this.channel,440});441return;442}443if (mesg.timed_changes != null) {444this.synctable.apply_changes_from_browser_client(mesg.timed_changes);445}446await this.synctable.save();447}448449private send_versioned_changes_to_browsers(450versioned_changes: VersionedChange[],451): void {452if (this.closed) return;453this.log("send_versioned_changes_to_browsers");454const x = { versioned_changes };455this.channel.write(x);456}457458private async save_if_possible(): Promise<void> {459if (this.closed || this.closing) {460return; // closing or already closed461}462this.log("save_if_possible: saves changes to database");463await this.synctable.save();464if (this.synctable.table === "syncstrings") {465this.log("save_if_possible: also fetch syncdoc");466const syncdoc = getSyncDocFromSyncTable(this.synctable);467if (syncdoc != null) {468const path = syncdoc.get_path();469this.log("save_if_possible: saving syncdoc to disk", { path });470if (path.endsWith(".sage-jupyter2")) {471// treat jupyter notebooks in a special way, since they have472// an aux .ipynb file that the syncdoc doesn't know about. In473// this case we save the ipynb to disk, not just the hidden474// syncdb file.475const { actions } = await getJupyterRedux(syncdoc);476if (actions == null) {477this.log("save_if_possible: jupyter -- actions is null");478} else {479if (!actions.isCellRunner()) {480this.log("save_if_possible: jupyter -- not cell runner");481return;482}483this.log("save_if_possible: jupyter -- saving to ipynb");484await actions.save_ipynb_file();485}486}487await syncdoc.save_to_disk();488} else {489this.log("save_if_possible: no syncdoc");490}491}492}493494private async close_if_possible(): Promise<void> {495if (this.closed || this.closing) {496return; // closing or already closed497}498const { n, changed } = this.num_connections;499const delay = Date.now() - changed.valueOf();500this.log(501`close_if_possible: there are ${n} connections and delay=${delay}`,502);503if (n === 0) {504this.log(`close_if_possible: close this SyncTableChannel atomically`);505// actually close506this.close();507} else {508this.log(`close_if_possible: NOT closing this SyncTableChannel`);509}510}511512private close(): void {513if (this.closed) {514return;515}516this.log("close: closing");517this.closing = true;518delete synctable_channels[this.name];519this.channel.destroy();520this.synctable.close_no_async();521this.log("close: closed");522close(this); // don't call this.log after this!523this.closed = true;524}525526public get_synctable(): SyncTable {527return this.synctable;528}529}530531const synctable_channels: { [name: string]: SyncTableChannel } = {};532533function createKey(args): string {534return stringify([args[3], args[4]]);535}536537function channel_name(query: any, options: any[]): string {538// stable identifier to this query + options across539// project restart, etc. We first make the options540// as canonical as we can:541const opts = {};542for (const x of options) {543for (const key in x) {544opts[key] = x[key];545}546}547// It's critical that we dedup the synctables having548// to do with sync-doc's. A problem case is multiple549// queries for the same table, due to the time cutoff550// for patches after making a snapshot.551let q: string;552try {553q = key(query);554} catch {555// throws an error if the table doesn't have a string_id;556// that's fine - in this case, just make a key out of the query.557q = query;558}559const y = stringify([q, opts]);560const s = sha1(y);561return `sync:${s}`;562}563564async function synctable_channel0(565client: any,566primus: any,567logger: any,568query: any,569options: any[],570): Promise<string> {571const name = channel_name(query, options);572logger.debug("synctable_channel", JSON.stringify(query), name);573if (synctable_channels[name] === undefined) {574synctable_channels[name] = new SyncTableChannel({575client,576primus,577name,578query,579options,580logger,581});582await synctable_channels[name].init();583if (query?.listings != null) {584registerListingsTable(synctable_channels[name].get_synctable(), query);585} else if (query?.project_info != null) {586register_project_info_table(587synctable_channels[name].get_synctable(),588logger,589client.client_id(),590);591} else if (query?.project_status != null) {592register_project_status_table(593synctable_channels[name].get_synctable(),594logger,595client.client_id(),596);597} else if (query?.usage_info != null) {598register_usage_info_table(599synctable_channels[name].get_synctable(),600client.client_id(),601);602}603}604return name;605}606607export const synctable_channel = reuseInFlight(synctable_channel0, {608createKey,609});610611612