Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Path: blob/master/src/packages/frontend/client/hub.ts
Views: 687
/*1* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.2* License: MS-RSL – see LICENSE.md for details3*/45import { callback, delay } from "awaiting";6import { throttle } from "lodash";7import type { WebappClient } from "./client";8import { delete_cookie } from "../misc/cookies";9import {10copy_without,11from_json_socket,12to_json_socket,13defaults,14required,15uuid,16} from "@cocalc/util/misc";17import * as message from "@cocalc/util/message";18import {19do_anonymous_setup,20should_do_anonymous_setup,21} from "./anonymous-setup";22import {23deleteRememberMe,24setRememberMe,25} from "@cocalc/frontend/misc/remember-me";26import { appBasePath } from "@cocalc/frontend/customize/app-base-path";2728// Maximum number of outstanding concurrent messages (that have responses)29// to send at once to hub-websocket.30const MAX_CONCURRENT: number = 17;3132// just define what we need for code sanity33interface PrimusConnection {34write: (arg0: string) => void;35open: () => void;36end: () => void;37latency?: number;38}3940export interface MessageInfo {41count: number;42sent: number;43sent_length: number;44recv: number;45recv_length: number;46enqueued: number;47max_concurrent: number;48}4950export class HubClient {51private client: WebappClient;52private conn?: PrimusConnection;5354private connected: boolean = false;55private connection_is_totally_dead: boolean = false;56private num_attempts: number = 0;57private signed_in: boolean = false;58private signed_in_time: number = 0;59private signed_in_mesg: object;6061private call_callbacks: {62[id: string]: {63timeout?: any;64error_event: boolean;65first: boolean;66cb: Function;67};68} = {};6970private mesg_data: {71queue: any[];72count: number;73sent: number;74sent_length: number;75recv: number;76recv_length: number;77} = {78queue: [], // messages in the queue to send79count: 0, // number of message currently outstanding80sent: 0, // total number of messages sent to backend.81sent_length: 0, // total amount of data sent82recv: 0, // number of messages received from backend83recv_length: 0,84};8586constructor(client: WebappClient) {87this.client = client;8889/* We heavily throttle this, since it's ONLY used for the connections90dialog, which users never look at, and it could waste cpu trying to91update things for no reason. It also impacts the color of the92connection indicator, so throttling will make that color change a93bit more laggy. That's probably worth it. */94this.emit_mesg_data = throttle(this.emit_mesg_data.bind(this), 2000);9596// never attempt to reconnect more than once per 10s, no matter what.97this.reconnect = throttle(this.reconnect.bind(this), 10000);9899// Start attempting to connect to a hub.100this.init_hub_websocket();101}102103private emit_mesg_data(): void {104const info: MessageInfo = copy_without(this.mesg_data, ["queue"]) as any;105info.enqueued = this.mesg_data.queue.length;106info.max_concurrent = MAX_CONCURRENT;107this.client.emit("mesg_info", info);108}109110public get_num_attempts(): number {111return this.num_attempts;112}113114public send(mesg: object): void {115//console.log("send at #{misc.mswalltime()}", mesg)116const data = to_json_socket(mesg);117this.mesg_data.sent_length += data.length;118this.emit_mesg_data();119this.write_data(data);120}121122private write_data(data: string): void {123if (this.conn == null) {124console.warn(125"HubClient.write_data: can't write data since not connected",126);127return;128}129try {130this.conn.write(data);131} catch (err) {132console.warn("HubClient.write_data", err);133}134}135136private delete_websocket_cookie(): void {137delete_cookie("SMCSERVERID3");138}139140public is_signed_in(): boolean {141return this.is_connected() && !!this.signed_in;142}143144public set_signed_in(): void {145this.signed_in = true;146}147148public set_signed_out(): void {149this.signed_in = false;150}151152public get_signed_in_time(): number {153return this.signed_in_time;154}155156public get_signed_in_mesg(): object {157return this.signed_in_mesg;158}159160public is_connected(): boolean {161return !!this.connected;162}163164public reconnect(): void {165if (this.connection_is_totally_dead) {166// CRITICAL: See https://github.com/primus/primus#primusopen !167this.conn?.open();168}169}170171public disconnect(): void {172if (this.connected) {173this.conn?.end();174}175}176177private ondata(data: string): void {178//console.log("got #{data.length} of data")179this.mesg_data.recv += 1;180this.mesg_data.recv_length += data.length;181this.emit_mesg_data();182this.handle_json_data(data);183}184185private async handle_json_data(data: string): Promise<void> {186this.emit_mesg_data();187const mesg = from_json_socket(data);188// console.log(`handle_json_data: ${data}`);189switch (mesg.event) {190case "cookies":191try {192await this.client.account_client.cookies(mesg);193} catch (err) {194console.warn("Error handling cookie ", mesg, err);195}196break;197198case "signed_in":199this.client.account_id = mesg.account_id;200this.set_signed_in();201this.signed_in_time = Date.now();202setRememberMe(appBasePath);203this.signed_in_mesg = mesg;204this.client.emit("signed_in", mesg);205break;206207case "remember_me_failed":208deleteRememberMe(appBasePath);209this.client.emit(mesg.event, mesg);210break;211212case "version":213this.client.emit("new_version", {214version: mesg.version,215min_version: mesg.min_version,216});217break;218219case "error":220// An error that isn't tagged with an id -- some sort of general problem.221if (mesg.id == null) {222console.log(`WARNING: ${JSON.stringify(mesg.error)}`);223return;224}225break;226227case "start_metrics":228this.client.emit("start_metrics", mesg.interval_s);229break;230}231232// the call f(null, mesg) below can mutate mesg (!), so we better save the id here.233const { id } = mesg;234const v = this.call_callbacks[id];235if (v != null) {236const { cb, error_event } = v;237v.first = false;238if (error_event && mesg.event === "error") {239if (!mesg.error) {240// make sure mesg.error is set to something.241mesg.error = "error";242}243cb(mesg.error);244} else {245cb(undefined, mesg);246}247if (!mesg.multi_response) {248delete this.call_callbacks[id];249}250}251}252253private do_call(opts: any, cb: Function): void {254if (opts.cb == null) {255// console.log("no opts.cb", opts.message)256// A call to the backend, but where we do not wait for a response.257// In order to maintain at least roughly our limit on MAX_CONCURRENT,258// we simply pretend that this message takes about 150ms259// to complete. This helps space things out so the server can260// handle requests properly, instead of just discarding them (be nice261// to the backend and it will be nice to you).262this.send(opts.message);263setTimeout(cb, 150);264return;265}266if (opts.message.id == null) {267// Assign a uuid (usually we do this)268opts.message.id = uuid();269}270const { id } = opts.message;271let called_cb: boolean = false;272if (this.call_callbacks[id] != null) {273// User is requesting to send a message with the same id as274// a currently outstanding message. This typically happens275// when disconnecting and reconnecting. It's critical to276// clear up the existing call before overwritting277// call_callbacks[id]. The point is the message id's are278// NOT at all guaranteed to be random.279this.clear_call(id);280}281282this.call_callbacks[id] = {283cb: (...args) => {284if (!called_cb) {285called_cb = true;286cb();287}288// NOTE: opts.cb is always defined since otherwise289// we would have exited above.290if (opts.cb != null) {291opts.cb(...args);292}293},294error_event: !!opts.error_event,295first: true,296};297298this.send(opts.message);299300if (opts.timeout) {301this.call_callbacks[id].timeout = setTimeout(() => {302if (this.call_callbacks[id] == null || this.call_callbacks[id].first) {303const error = "Timeout after " + opts.timeout + " seconds";304if (!called_cb) {305called_cb = true;306cb();307}308if (opts.cb != null) {309opts.cb(error, message.error({ id, error }));310}311delete this.call_callbacks[id];312}313}, opts.timeout * 1000);314} else {315// IMPORTANT: No matter what, we call cb within 60s; if we don't do this then316// in case opts.timeout isn't set but opts.cb is, but user disconnects,317// then cb would never get called, which throws off our call counter.318// Note that the input to cb doesn't matter.319const f = () => {320if (!called_cb) {321called_cb = true;322cb();323}324};325this.call_callbacks[id].timeout = setTimeout(f, 60 * 1000);326}327}328329public call(opts: any): void {330// This function:331// * Modifies the message by adding an id attribute with a random uuid value332// * Sends the message to the hub333// * When message comes back with that id, call the callback and delete it (if cb opts.cb is defined)334// The message will not be seen by @handle_message.335// * If the timeout is reached before any messages come back, delete the callback and stop listening.336// However, if the message later arrives it may still be handled by @handle_message.337opts = defaults(opts, {338message: required,339timeout: undefined,340error_event: false, // if true, turn error events into just a normal err341allow_post: undefined, // TODO: deprecated -- completely ignored and not used in any way.342cb: undefined,343});344if (!this.is_connected()) {345if (opts.cb != null) {346opts.cb("not connected");347}348return;349}350this.mesg_data.queue.push(opts);351this.mesg_data.sent += 1;352this.update_calls();353}354355// like call above, but async and error_event defaults to TRUE,356// so an exception is raised on resp messages that have event='error'.357358public async async_call(opts: any): Promise<any> {359const f = (cb) => {360opts.cb = cb;361this.call(opts);362};363if (opts.error_event == null) {364opts.error_event = true;365}366return await callback(f);367}368369private update_calls(): void {370while (371this.mesg_data.queue.length > 0 &&372this.mesg_data.count < MAX_CONCURRENT373) {374this.process_next_call();375}376}377378private process_next_call(): void {379if (this.mesg_data.queue.length === 0) {380return;381}382this.mesg_data.count += 1;383const opts = this.mesg_data.queue.shift();384this.emit_mesg_data();385this.do_call(opts, () => {386this.mesg_data.count -= 1;387this.emit_mesg_data();388this.update_calls();389});390}391392private clear_call(id: string): void {393const obj = this.call_callbacks[id];394if (obj == null) return;395delete this.call_callbacks[id];396obj.cb("disconnect");397if (obj.timeout) {398clearTimeout(obj.timeout);399delete obj.timeout;400}401}402403private clear_call_queue(): void {404for (const id in this.call_callbacks) {405this.clear_call(id);406}407this.emit_mesg_data();408}409410private async init_hub_websocket(): Promise<void> {411const log = (...mesg) => console.log("hub_websocket -", ...mesg);412log("connect");413this.client.emit("connecting");414415this.client.on("connected", () => {416this.send_version();417// Any outstanding calls made before connecting happened418// can't possibly succeed, so we clear all outstanding messages.419this.clear_call_queue();420});421422this.delete_websocket_cookie();423// Important: window.Primus is usually defined when we get to the point424// of running this code. However, sometimes it doesn't -- timing is random425// and whether it is defined here depends on a hub being available to426// serve it up. So we just keep trying until it is defined.427// There is no need to back off or delay, since we aren't428// actually doing anything at all here in terms of work!429log("Loading global websocket client library from hub-websocket...");430while (window.Primus == null) {431await delay(200);432}433log(434"Loaded global websocket library! Now creating websocket connection to hub-websocket...",435);436const conn = (this.conn = new window.Primus({437reconnect: {438max: 30000,439min: 3000,440retries: 1000,441},442}));443444conn.on("open", async () => {445this.connected = true;446this.connection_is_totally_dead = false;447this.client.emit("connected");448log("connected");449this.num_attempts = 0;450451conn.removeAllListeners("data");452conn.on("data", this.ondata.bind(this));453454if (should_do_anonymous_setup()) {455do_anonymous_setup(this.client);456}457});458459conn.on("outgoing::open", () => {460log("connecting");461this.client.emit("connecting");462});463464conn.on("offline", () => {465log("offline");466this.connected = this.signed_in = false;467this.client.emit("disconnected", "offline");468});469470conn.on("online", () => {471log("online");472});473474conn.on("message", (evt) => {475this.ondata(evt.data);476});477478conn.on("error", (err) => {479log("error: ", err);480});481// NOTE: we do NOT emit an error event in this case! See482// https://github.com/sagemathinc/cocalc/issues/1819483// for extensive discussion.484485conn.on("close", () => {486log("closed");487this.connected = this.signed_in = false;488this.client.emit("disconnected", "close");489});490491conn.on("end", () => {492this.connection_is_totally_dead = true;493});494495conn.on("reconnect scheduled", (opts) => {496this.num_attempts = opts.attempt;497// This just informs everybody that we *are* disconnected.498this.client.emit("disconnected", "close");499conn.removeAllListeners("data");500this.delete_websocket_cookie();501log(502`reconnect scheduled (attempt ${opts.attempt} out of ${opts.retries})`,503);504});505506conn.on("reconnect", () => {507this.client.emit("connecting");508});509}510511private send_version(): void {512this.send(message.version({ version: this.client.version() }));513}514515public fix_connection(): void {516this.delete_websocket_cookie();517this.conn?.end();518this.conn?.open();519}520521public latency(): number | void {522if (this.connected) {523return this.conn?.latency;524}525}526}527528529