Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Path: blob/master/src/packages/project/client.ts
Views: 687
/*1* This file is part of CoCalc: Copyright © 2023 Sagemath, Inc.2* License: MS-RSL – see LICENSE.md for details3*/45/*6client.ts -- A project viewed as a client for a hub.78For security reasons, a project does initiate a TCP connection to a hub,9but rather hubs initiate TCP connections to projects:1011* MINUS: This makes various things more complicated, e.g., a project12might not have any open connection to a hub, but still "want" to write13something to the database; in such a case it is simply out of luck14and must wait.1516* PLUS: Security is simpler since a hub initiates the connection to17a project. A hub doesn't have to receive TCP connections and decide18whether or not to trust what is on the other end of those connections.1920That said, this architecture could change, and very little code would change21as a result.22*/23import EventEmitter from "node:events";24import fs from "node:fs";25import { join } from "node:path";26import { FileSystemClient } from "@cocalc/sync-client/lib/client-fs";27import { execute_code, uuidsha1 } from "@cocalc/backend/misc_node";28import { CoCalcSocket } from "@cocalc/backend/tcp/enable-messaging-protocol";29import { SyncDoc } from "@cocalc/sync/editor/generic/sync-doc";30import type { ProjectClient as ProjectClientInterface } from "@cocalc/sync/editor/generic/types";31import { SyncString } from "@cocalc/sync/editor/string/sync";32import * as synctable2 from "@cocalc/sync/table";33import { callback2, once } from "@cocalc/util/async-utils";34import { PROJECT_HUB_HEARTBEAT_INTERVAL_S } from "@cocalc/util/heartbeat";35import * as message from "@cocalc/util/message";36import * as misc from "@cocalc/util/misc";37import type { CB } from "@cocalc/util/types/callback";38import type { ExecuteCodeOptionsWithCallback } from "@cocalc/util/types/execute-code";39import * as blobs from "./blobs";40import { symmetric_channel } from "./browser-websocket/symmetric_channel";41import { json } from "./common";42import * as data from "./data";43import initJupyter from "./jupyter/init";44import * as kucalc from "./kucalc";45import { getLogger } from "./logger";46import * as sage_session from "./sage_session";47import { getListingsTable } from "@cocalc/project/sync/listings";48import { get_synctable } from "./sync/open-synctables";49import { get_syncdoc } from "./sync/sync-doc";5051const winston = getLogger("client");5253const HOME = process.env.HOME ?? "/home/user";5455let DEBUG = false;56// Easy way to enable debugging in any project anywhere.57const DEBUG_FILE = join(HOME, ".smc-DEBUG");58if (fs.existsSync(DEBUG_FILE)) {59DEBUG = true;60} else if (kucalc.IN_KUCALC) {61// always make verbose in kucalc, since logs are taken care of by the k8s62// logging infrastructure...63DEBUG = true;64} else {65winston.info(66"create this file to enable very verbose debugging:",67DEBUG_FILE,68);69}7071winston.info(`DEBUG = ${DEBUG}`);72if (!DEBUG) {73winston.info(`create ${DEBUG_FILE} for much more verbose logging`);74}7576let client: Client;7778export function init() {79if (client != null) {80throw Error("BUG: Client already initialized!");81}82client = new Client();83return client;84}8586export function getClient(): Client {87if (client == null) {88throw Error("BUG: Client not initialized!");89}90return client;91}9293let ALREADY_CREATED = false;9495type HubCB = CB<any, { event: "error"; error?: string }>;9697export class Client extends EventEmitter implements ProjectClientInterface {98private project_id: string;99private _connected: boolean;100101private _hub_callbacks: {102[key: string]: HubCB;103};104private _hub_client_sockets: {105[id: string]: {106socket: CoCalcSocket;107callbacks?: { [id: string]: HubCB | CB<any, string> };108activity: Date;109};110};111private _changefeed_sockets: any;112private _open_syncstrings?: { [key: string]: SyncString };113114// use to define a logging function that is cleanly used internally115dbg = (f: string) => {116if (DEBUG && winston) {117return (...m) => {118return winston.debug(`Client.${f}`, ...m);119};120} else {121return function (..._) {};122}123};124125private filesystemClient = new FileSystemClient();126write_file = this.filesystemClient.write_file;127path_read = this.filesystemClient.path_read;128path_stat = this.filesystemClient.path_stat;129file_size_async = this.filesystemClient.file_size_async;130file_stat_async = this.filesystemClient.file_stat_async;131watch_file = this.filesystemClient.watch_file;132133constructor() {134super();135if (ALREADY_CREATED) {136throw Error("BUG: Client already created!");137}138ALREADY_CREATED = true;139this.project_id = data.project_id;140this.dbg("constructor")();141this.setMaxListeners(300); // every open file/table/sync db listens for connect event, which adds up.142// initialize two caches143this._hub_callbacks = {};144this._hub_client_sockets = {};145this._changefeed_sockets = {};146this._connected = false;147148// Start listening for syncstrings that have been recently modified, so that we149// can open them and provide filesystem and computational support.150// TODO: delete this code.151//# @_init_recent_syncstrings_table()152153if (kucalc.IN_KUCALC) {154kucalc.init(this);155}156157misc.bind_methods(this);158159initJupyter();160}161162public alert_message({163type = "default",164title,165message,166}: {167type?: "default";168title?: string;169message: string;170block?: boolean;171timeout?: number; // time in seconds172}): void {173this.dbg("alert_message")(type, title, message);174}175176// todo: more could be closed...177public close(): void {178if (this._open_syncstrings != null) {179const object = misc.keys(this._open_syncstrings);180for (let _ in object) {181const s = this._open_syncstrings[_];182s.close();183}184delete this._open_syncstrings;185}186//return clearInterval(this._recent_syncstrings_interval);187}188189// account_id or project_id of this client190public client_id(): string {191return this.project_id;192}193194public get_project_id(): string {195return this.project_id;196}197198// true since this client is a project199public is_project(): boolean {200return true;201}202203public is_browser(): boolean {204return false;205}206207public is_compute_server(): boolean {208return false;209}210211// false since this client is not a user212public is_user(): boolean {213return false;214}215216public is_signed_in(): boolean {217return true;218}219220public is_connected(): boolean {221return this._connected;222}223224// We trust the time on our own compute servers (unlike random user's browser).225public server_time(): Date {226return new Date();227}228229// Declare that the given socket is active right now and can be used for230// communication with some hub (the one the socket is connected to).231public active_socket(socket: CoCalcSocket): void {232const dbg = this.dbg(233`active_socket(id=${socket.id},ip='${socket.remoteAddress}')`,234);235let x = this._hub_client_sockets[socket.id];236if (x == null) {237dbg();238x = this._hub_client_sockets[socket.id] = {239socket,240callbacks: {},241activity: new Date(),242};243let heartbeat_interval: ReturnType<typeof setInterval> | undefined =244undefined;245const socket_end = (): void => {246if (heartbeat_interval == null) {247// alrady destroyed it248return;249}250dbg("ending socket");251clearInterval(heartbeat_interval);252heartbeat_interval = undefined;253if (x.callbacks != null) {254for (const id in x.callbacks) {255// TODO: is this right? Should we call the callback an {event:error} object?256const cb = x.callbacks[id] as CB<any, string>;257cb?.("socket closed");258}259delete x.callbacks; // so additional trigger of end doesn't do anything260}261delete this._hub_client_sockets[socket.id];262dbg(263`number of active sockets now equals ${misc.len(264this._hub_client_sockets,265)}`,266);267if (misc.len(this._hub_client_sockets) === 0) {268this._connected = false;269dbg("lost all active sockets");270this.emit("disconnected");271}272socket.end();273socket.destroy();274};275276socket.on("end", socket_end);277socket.on("error", socket_end);278279const check_heartbeat = (): void => {280if (281socket.heartbeat == null ||282Date.now() - socket.heartbeat.getTime() >=2831.5 * PROJECT_HUB_HEARTBEAT_INTERVAL_S * 1000284) {285dbg("heartbeat failed");286socket_end();287} else {288dbg("heartbeat -- socket is working");289}290};291292heartbeat_interval = setInterval(293check_heartbeat,2941.5 * PROJECT_HUB_HEARTBEAT_INTERVAL_S * 1000,295);296297if (misc.len(this._hub_client_sockets) >= 1) {298dbg("CONNECTED!");299this._connected = true;300this.emit("connected");301}302} else {303x.activity = new Date();304}305}306307// Handle a mesg coming back from some hub. If we have a callback we call it308// for the given message, then return true. Otherwise, return309// false, meaning something else should try to handle this message.310public handle_mesg(mesg, socket) {311const dbg = this.dbg(`handle_mesg(${misc.trunc_middle(json(mesg), 512)})`);312const f = this._hub_callbacks[mesg.id];313if (f != null) {314dbg("calling callback");315if (!mesg.multi_response) {316delete this._hub_callbacks[mesg.id];317delete this._hub_client_sockets[socket.id].callbacks?.[mesg.id];318}319try {320f(mesg);321} catch (err) {322dbg(`WARNING: error handling message from client. -- ${err}`);323}324return true;325} else {326dbg("no callback");327return false;328}329}330331// Get a socket connection to the hub from one in our cache; choose one at random.332// There is obviously no guarantee to get the same hub if you call this twice!333// Returns undefined if there are currently no connections from any hub to us334// (in which case, the project must wait).335public get_hub_socket() {336const socket_ids = misc.keys(this._hub_client_sockets);337this.dbg("get_hub_socket")(338`there are ${socket_ids.length} sockets -- ${JSON.stringify(socket_ids)}`,339);340if (socket_ids.length === 0) {341return;342}343return this._hub_client_sockets[misc.random_choice(socket_ids)].socket;344}345346// Send a message to some hub server and await a response (if cb defined).347public call(opts: {348message: any;349timeout?: number; // timeout in seconds; if specified call will error out after this much time350socket?: CoCalcSocket; // if specified, use this socket351cb?: CB<any, string>; // awaits response if given352}) {353const dbg = this.dbg(`call(message=${json(opts.message)})`);354dbg();355const socket =356opts.socket != null ? opts.socket : (opts.socket = this.get_hub_socket()); // set socket to best one if no socket specified357if (socket == null) {358dbg("no sockets");359// currently, due to the security model, there's no way out of this; that will change...360opts.cb?.("no hubs currently connected to this project");361return;362}363if (opts.cb != null) {364let timer;365if (opts.timeout) {366dbg("configure timeout");367const fail = () => {368dbg("failed");369delete this._hub_callbacks[opts.message.id];370opts.cb?.(`timeout after ${opts.timeout}s`);371delete opts.cb;372};373timer = setTimeout(fail, opts.timeout * 1000);374}375if (opts.message.id == null) {376opts.message.id = misc.uuid();377}378const cb = (this._hub_callbacks[opts.message.id] = (resp) => {379//dbg("got response: #{misc.trunc(json(resp),400)}")380if (timer != null) {381clearTimeout(timer);382timer = undefined;383}384if (resp?.event === "error") {385opts.cb?.(resp.error ? resp.error : "error");386} else {387opts.cb?.(undefined, resp);388}389});390const callbacks = this._hub_client_sockets[socket.id].callbacks;391if (callbacks != null) {392callbacks[opts.message.id] = cb;393}394}395// Finally, send the message396return socket.write_mesg("json", opts.message);397}398399// Do a project_query400public query({401query,402options,403changes,404//standby = false, // **IGNORED**405timeout = 30,406cb,407}: {408query: any; // a query (see schema.js)409options?: { [key: string]: any }[]; // options to the query, e.g., [{limit:5}] )410changes?: boolean; // whether or not to create a changefeed411//standby: boolean; // **IGNORED**412timeout: number; // how long in *seconds* wait for initial result from hub database call413cb: CB<any, string>;414}) {415if (options != null && !misc.is_array(options)) {416throw Error("options must be an array");417return;418}419const mesg = message.query({420id: misc.uuid(),421query,422options,423changes,424multi_response: changes,425});426const socket = this.get_hub_socket();427if (socket == null) {428// It will try later when one is available...429cb("no hub socket available");430return;431}432if (changes) {433// Record socket for this changefeed in @_changefeed_sockets434this._changefeed_sockets[mesg.id] = socket;435// CRITICAL: On error or end, send an end error to the synctable, so that it will436// attempt to reconnect (and also stop writing to the socket).437// This is important, since for project clients438// the disconnected event is only emitted when *all* connections from439// hubs to the local_hub end. If two connections s1 and s2 are open,440// and s1 is used for a sync table, and s1 closes (e.g., hub1 is restarted),441// then s2 is still open and no 'disconnected' event is emitted. Nonetheless,442// it's important for the project to consider the synctable broken and443// try to reconnect it, which in this case it would do using s2.444socket.on("error", () => {445cb("socket-end");446});447socket.on("end", () => {448cb("socket-end");449});450}451return this.call({452message: mesg,453timeout,454socket,455cb,456});457}458459// Cancel an outstanding changefeed query.460private _query_cancel(opts: { id: string; cb?: CB }) {461const socket = this._changefeed_sockets[opts.id];462if (socket == null) {463// nothing to do464return opts.cb?.();465} else {466return this.call({467message: message.query_cancel({ id: opts.id }),468timeout: 30,469socket,470cb: opts.cb,471});472}473}474475// ASYNC version476public async query_cancel(id) {477return await callback2(this._query_cancel, { id });478}479480public sync_table(query, options?: any, throttle_changes = undefined) {481return synctable2.synctable(query, options, this, throttle_changes);482}483484// We leave in the project_id for consistency with the browser UI.485// And maybe someday we'll have tables managed across projects (?).486public async synctable_project(_project_id: string, query, _options) {487// TODO: this is ONLY for syncstring tables (syncstrings, patches, cursors).488// Also, options are ignored -- since we use whatever was selected by the frontend.489const the_synctable = await get_synctable(query, this);490// To provide same API, must also wait until done initializing.491if (the_synctable.get_state() !== "connected") {492await once(the_synctable, "connected");493}494if (the_synctable.get_state() !== "connected") {495throw Error(496"Bug -- state of synctable must be connected " + JSON.stringify(query),497);498}499return the_synctable;500}501502// WARNING: making two of the exact same sync_string or sync_db will definitely503// lead to corruption!504505// Get the synchronized doc with the given path. Returns undefined506// if currently no such sync-doc.507public syncdoc({ path }: { path: string }): SyncDoc | undefined {508return get_syncdoc(path);509}510511public symmetric_channel(name) {512return symmetric_channel(name);513}514515public path_access(opts: { path: string; mode: string; cb: CB }): void {516// mode: sub-sequence of 'rwxf' -- see https://nodejs.org/api/fs.html#fs_class_fs_stats517// cb(err); err = if any access fails; err=undefined if all access is OK518let access = 0;519for (let s of opts.mode) {520access |= fs[s.toUpperCase() + "_OK"];521}522return fs.access(opts.path, access, opts.cb);523}524525// TODO: exists is deprecated. "To check if a file exists526// without manipulating it afterwards, fs.access() is527// recommended."528public path_exists(opts: { path: string; cb: CB }) {529const dbg = this.dbg(`checking if path (='${opts.path}') exists`);530dbg();531return fs.exists(opts.path, (exists) => {532dbg(`returned ${exists}`);533opts.cb(undefined, exists);534}); // err actually never happens with node.js, so we change api to be more consistent535}536537// Size of file in bytes (divide by 1000 for K, by 10^6 for MB.)538public file_size(opts: { filename: string; cb: CB }): void {539this.path_stat({540path: opts.filename,541cb: (err, stat) => {542opts.cb(err, stat?.size);543},544});545}546547// execute a command using the shell or a subprocess -- see docs for execute_code in misc_node.548public shell(opts: ExecuteCodeOptionsWithCallback): void {549execute_code(opts);550}551552// return new sage session -- the code that actually calls this is in the @cocalc/sync package553// in "packages/sync/editor/generic/evaluator.ts"554public sage_session({555path,556}: {557path: string; // the path to the *worksheet* file558}): sage_session.SageSessionType {559return sage_session.sage_session({ path, client: this });560}561562// Save a blob to the central db blobstore.563// The sha1 is optional.564public save_blob({565blob,566sha1,567uuid: optsUUID,568cb,569}: {570blob: Buffer; // Buffer of data571sha1?: string;572uuid?: string; // if given then uuid must be derived from sha1 hash573cb?: (err: string | undefined, resp?: any) => void;574}) {575const uuid = optsUUID ?? uuidsha1(blob, sha1);576const dbg = this.dbg(`save_blob(uuid='${uuid}')`);577const hub = this.get_hub_socket();578if (hub == null) {579dbg("fail -- no global hubs");580cb?.(581"no global hubs are connected to the local hub, so nowhere to send file",582);583return;584}585dbg("sending blob mesg");586hub.write_mesg("blob", { uuid, blob });587dbg("waiting for response");588blobs.receive_save_blob_message({589sha1: uuid,590cb: (resp): void => {591if (resp?.error) {592dbg(`fail -- '${resp.error}'`);593cb?.(resp.error, resp);594} else {595dbg("success");596cb?.(undefined, resp);597}598},599});600}601602public get_blob(opts: {603blob: Buffer; // Buffer of data604sha1?: string;605uuid?: string; // if given is uuid derived from sha1606cb?: (err: string) => void; // (err, resp)607}) {608const dbg = this.dbg("get_blob");609dbg(opts.sha1);610opts.cb?.("get_blob: not implemented");611}612613// no-op; assumed async api614touch_project(_project_id: string) {}615616async get_syncdoc_history(string_id: string, patches = false) {617const dbg = this.dbg("get_syncdoc_history");618dbg(string_id, patches);619const mesg = message.get_syncdoc_history({620string_id,621patches,622});623return await callback2(this.call, { message: mesg });624}625626// Return true if the file was explicitly deleted.627// Returns unknown if don't know628// Returns false if definitely not.629public is_deleted(filename: string, _project_id: string) {630return getListingsTable()?.isDeleted(filename);631}632633public async set_deleted(634filename: string,635_project_id?: string,636): Promise<void> {637// project_id is ignored638const listings = getListingsTable();639return await listings?.setDeleted(filename);640}641}642643644