Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Path: blob/master/src/packages/sync/client/synctable-project.ts
Views: 687
/*1* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.2* License: MS-RSL – see LICENSE.md for details3*/45/*6Synctable that uses the project websocket rather than the database.7*/89import { delay } from "awaiting";1011import { SyncTable, synctable_no_database } from "@cocalc/sync/table";12import { once, retry_until_success } from "@cocalc/util/async-utils";13import { assertDefined } from "@cocalc/util/misc";14import { reuseInFlight } from "@cocalc/util/reuse-in-flight";15import type { AppClient } from "./types";1617// Always wait at least this long between connect attempts. This18// avoids flooding the project with connection requests if, e.g., the19// client limit for a particular file is reached.20const MIN_CONNECT_WAIT_MS = 5000;2122interface Options {23project_id: string;24query: object;25options: any[];26client: AppClient;27throttle_changes?: undefined | number;28id: string;29}3031import { EventEmitter } from "events";3233class SyncTableChannel extends EventEmitter {34public synctable?: SyncTable;35private project_id: string;36private client: AppClient;37private channel?: any;38private websocket?: any;39private query: any;40private options: any;41private key: string;4243private last_connect: number = 0;4445private connected: boolean = false;4647constructor(opts: Options) {48super();49const { project_id, query, options, client, throttle_changes } = opts;50if (query == null) {51throw Error("query must be defined");52}53if (options == null) {54throw Error("options must be defined");55}56this.key = key(opts);57this.synctable = synctable_no_database(58query,59options,60client,61throttle_changes,62[],63project_id,64);65(this.synctable as any).channel = this; // for debugging66this.synctable.setOnDisconnect = (changes, merge) => {67this.send_mesg_to_project({ event: "set-on-disconnect", changes, merge });68};69this.synctable.sendMessageToProject = (data) => {70this.send_mesg_to_project({ event: "message", data });71};72this.project_id = project_id;73this.client = client;74this.query = query;75this.options = options;76this.init_synctable_handlers();7778this.connect = reuseInFlight(this.connect.bind(this));79this.log = this.log.bind(this);80this.connect();81}8283public is_connected(): boolean {84return this.connected;85}8687private log = (..._args) => {88//console.log("SyncTableChannel", this.query, ..._args);89};9091private async connect(): Promise<void> {92this.log("connect...");93if (this.synctable == null) return;94this.set_connected(false);95this.clean_up_sockets();9697const time_since_last_connect = Date.now() - this.last_connect;98if (time_since_last_connect < MIN_CONNECT_WAIT_MS) {99// Last attempt to connect was very recent, so we wait a little before100// trying again.101await delay(MIN_CONNECT_WAIT_MS - time_since_last_connect);102}103104await retry_until_success({105max_delay: 5000,106f: this.attempt_to_connect.bind(this),107desc: "webapp-synctable-connect",108log: this.log,109});110111this.last_connect = Date.now();112}113114private set_connected(connected: boolean): void {115if (this.synctable == null) return;116this.log("set_connected", connected);117this.connected = connected;118if (this.synctable.client.set_connected != null) {119this.synctable.client.set_connected(connected);120}121if (connected) {122this.emit("connected");123} else {124this.emit("disconnected");125}126}127// Various things could go wrong, e.g., the websocket breaking128// while trying to get the api synctable_channel, touch129// project might time out, etc.130private async attempt_to_connect(): Promise<void> {131// Start with fresh websocket and channel -- old one may be dead.132this.clean_up_sockets();133// touch_project mainly makes sure that some hub is connected to134// the project, so the project can do DB queries. Also135// starts the project.136this.client.touch_project(this.project_id);137// Get a websocket.138this.websocket = await this.client.project_client.websocket(139this.project_id,140);141if (this.websocket.state != "online") {142// give websocket state one chance to change.143// It could change to destroyed or online.144this.log(145"wait for websocket to connect since state is",146this.websocket.state,147);148await once(this.websocket, "state");149}150if (this.websocket.state != "online") {151// Already offline... let's try again from the top.152this.log("websocket failed");153throw Error("websocket went offline already");154}155156this.log("Get a channel");157const api = await this.client.project_client.api(this.project_id);158this.channel = await api.synctable_channel(this.query, this.options);159160if (this.websocket.state != "online") {161// Already offline... let's try again from the top.162throw Error("websocket went offline already");163}164165this.channel.on("data", this.handle_mesg_from_project.bind(this));166this.websocket.on("offline", this.connect);167this.channel.on("close", this.connect);168}169170private init_synctable_handlers(): void {171assertDefined(this.synctable);172this.synctable.on("timed-changes", (timed_changes) => {173this.send_mesg_to_project({ timed_changes });174});175this.synctable.once("closed", this.close.bind(this));176}177178private clean_up_sockets(): void {179if (this.channel != null) {180this.channel.removeListener("close", this.connect);181182// Explicitly emit end -- this is a hack,183// since this is the only way to force the184// channel clean-up code to run in primus-multiplex,185// and it gets run async later if we don't do this.186// TODO: rewrite primus-multiplex from scratch.187this.channel.emit("end");188189try {190this.channel.end();191} catch (err) {192// no op -- this does happen if channel.conn is destroyed193}194delete this.channel;195}196197if (this.websocket != null) {198this.websocket.removeListener("offline", this.connect);199delete this.websocket;200}201}202203private async close(): Promise<void> {204delete cache[this.key];205this.clean_up_sockets();206if (this.synctable != null) {207const s = this.synctable;208delete this.synctable;209await s.close();210}211}212213private handle_mesg_from_project(mesg): void {214this.log("project --> client: ", mesg);215if (this.synctable == null) {216this.log("project --> client: NO SYNCTABLE");217return; // can happen during close218}219if (mesg == null) {220throw Error("mesg must not be null");221}222if (mesg.error != null) {223const { alert_message } = this.client;224const message = `Error opening file -- ${225mesg.error226} -- wait, restart your project or refresh your browser. Query=${JSON.stringify(227this.query,228)}`;229if (alert_message != null) {230alert_message({ type: "info", message, timeout: 10 });231} else {232console.warn(message);233}234}235if (mesg.event == "message") {236this.synctable.emit("message", mesg.data);237return;238}239if (mesg.init != null) {240this.log("project --> client: init_browser_client");241this.synctable.init_browser_client(mesg.init);242// after init message, we are now initialized243// and in the connected state.244this.set_connected(true);245}246if (mesg.versioned_changes != null) {247this.log("project --> client: versioned_changes");248this.synctable.apply_changes_to_browser_client(mesg.versioned_changes);249}250}251252private send_mesg_to_project(mesg): void {253this.log("project <-- client: ", mesg);254if (!this.connected) {255throw Error("must be connected");256}257if (this.websocket == null) {258throw Error("websocket must not be null");259}260if (this.channel == null) {261throw Error("channel must not be null");262}263if (this.websocket.state != "online") {264throw Error(265`websocket state must be online but it is '${this.websocket.state}'`,266);267}268this.channel.write(mesg);269}270}271272// We use a cache to ensure there is at most one synctable273// at a time with given defining parameters. This is just274// for efficiency and sanity, so we use JSON.stringify instead275// of a guranteed stable json.276const cache: { [key: string]: SyncTableChannel } = {};277278// ONLY uncomment when developing!279// (window as any).channel_cache = cache;280281// The id here is so that the synctables and channels are unique282// **for a given syncdoc**. There can be multiple syncdocs for283// the same underlying project_id/path, e.g.,284// - when timetravel and a document are both open at the same time,285// - when a document is closing (and saving offline changes) at the286// same time that it is being opened; to see this disconnect from287// the network, make changes, clocse the file tab, then open it288// again, and reconnect to the network.289// See https://github.com/sagemathinc/cocalc/issues/3595 for why this290// opts.id below is so important. I tried several different approaches,291// and this is the best by far.292function key(opts: Options): string {293return `${opts.id}-${opts.project_id}-${JSON.stringify(294opts.query,295)}-${JSON.stringify(opts.options)}`;296}297298// NOTE: This function can be called by a LOT of different things at once whenever299// waiting to connect to a project. The "await once" inside it creates300// a listener on SyncTableChannel, and there is a limit on the number of301// those you can create without raising a limit (that was appearing in the302// console log a lot). Thus our use of reuseInFlight to prevent this.303async function synctable_project0(opts: Options): Promise<SyncTable> {304const k = key(opts);305// console.log("key = ", k);306let t;307if (cache[k] !== undefined) {308t = cache[k];309} else {310t = new SyncTableChannel(opts);311cache[k] = t;312}313if (!t.is_connected()) {314await once(t, "connected");315}316return t.synctable;317}318319const synctable_project = reuseInFlight(synctable_project0, {320createKey: (args) =>321JSON.stringify([args[0].project_id, args[0].query, args[0].options]),322});323324export default synctable_project;325326327