Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place. Commercial Alternative to JupyterHub.
Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place. Commercial Alternative to JupyterHub.
Path: blob/master/src/packages/backend/tcp/enable-messaging-protocol.ts
Views: 923
/*1* This file is part of CoCalc: Copyright © 2022 Sagemath, Inc.2* License: MS-RSL – see LICENSE.md for details3*/45/*67Enable two new functions write_mesg and recv_mesg on a TCP socket.89*/1011import { Buffer } from "node:buffer";12import { Socket } from "node:net";1314import getLogger from "@cocalc/backend/logger";15import { error } from "@cocalc/util/message";16import { from_json_socket, to_json_socket, trunc } from "@cocalc/util/misc";1718const winston = getLogger("tcp.enable");1920export type Type = "json" | "blob";2122export interface Message {23id?: string;24uuid?: string;25blob?: Buffer | string;26ttlSeconds?: number;27event?: "sage_raw_input" | "hello";28value?: any;29done?: boolean;30}3132interface RecvMesgOpts {33type: Type;34id: string; // or uuid35cb: (message: object) => void; // called with cb(mesg)36timeout?: number; // units of **seconds** (NOT ms!).37}3839export interface CoCalcSocket extends Socket {40id: string;41pid?: number;42heartbeat?: Date;43write_mesg: (44type: Type,45mesg: Message,46cb?: (err?: string | Error) => void,47) => void;48recv_mesg: (opts: RecvMesgOpts) => void;49}5051export default function enable(socket: CoCalcSocket, desc: string = "") {52socket.setMaxListeners(500); // we use a lot of listeners for listening for messages5354let buf: Uint8Array | null = null;55let bufTargetLength = -1;5657const listenForMesg = (data: Uint8Array) => {58buf = buf == null ? data : new Uint8Array([...buf, ...data]);59while (true) {60if (bufTargetLength === -1) {61// starting to read a new message62if (buf.length >= 4) {63const dv = new DataView(buf.buffer, buf.byteOffset, buf.byteLength);64bufTargetLength = dv.getUint32(0) + 4;65} else {66return; // have to wait for more data to find out message length67}68}69if (bufTargetLength <= buf.length) {70// read a new message from our buffer71const type = String.fromCharCode(buf[4]);72const mesg = buf.slice(5, bufTargetLength);7374const textDecoder = new TextDecoder();75switch (type) {76case "j": // JSON77const s = textDecoder.decode(mesg);78try {79// Do not use "obj = JSON.parse(s)"80const obj = from_json_socket(s); // this properly parses Date objects81socket.emit("mesg", "json", obj);82} catch (err) {83winston.debug(84`WARNING: failed to parse JSON message='${trunc(85s,86512,87)}' on socket ${desc} - ${err}`,88);89// skip it.90return;91}92break;93case "b": // BLOB (tagged by a uuid)94socket.emit("mesg", "blob", {95uuid: textDecoder.decode(mesg.slice(0, 36)),96blob: mesg.slice(36),97});98break;99default:100// NOTE -- better to show warning than throw possibly uncaught exception, since101// don't want malicious user to cause anything to crash.102winston.debug(`WARNING: unknown message type: '${type}'`);103return;104}105buf = buf.slice(bufTargetLength);106bufTargetLength = -1;107if (buf.length === 0) {108return;109}110} else {111// nothing to do but wait for more data112return;113}114}115};116117socket.on("data", listenForMesg);118119socket.write_mesg = (120type: Type,121data: Message,122cb?: (err?: string | Error) => void,123): void => {124if (data == null) {125// uncomment this to get a traceback to see what might be causing this...126//throw Error(`write_mesg(type='${type}': data must be defined`);127cb?.(`write_mesg(type='${type}': data must be defined`);128return;129}130const send = function (s: string | ArrayBuffer): void {131const length: Uint8Array = new Uint8Array(4);132// This line was 4 hours of work. It is absolutely133// *critical* to change the (possibly a string) s into a134// buffer before computing its length and sending it!!135// Otherwise unicode characters will cause trouble.136const data: Uint8Array = new Uint8Array(137typeof s === "string" ? Buffer.from(s) : s,138);139140const lengthView = new DataView(length.buffer);141// this was buf.writeInt32BE, i.e. big endian142lengthView.setInt32(0, data.byteLength, false); // false for big-endian143144if (!socket.writable) {145cb?.("socket not writable");146return;147} else {148socket.write(length);149socket.write(data, cb);150}151};152153switch (type) {154case "json":155send("j" + to_json_socket(data));156return;157case "blob":158if (data.uuid == null) {159cb?.("data object *must* have a uuid attribute");160return;161}162if (data.blob == null) {163cb?.("data object *must* have a blob attribute");164return;165}166send(167new Uint8Array([168...Buffer.from("b"),169...Buffer.from(data.uuid),170...(Buffer.isBuffer(data.blob)171? data.blob172: Buffer.from(data.blob)),173]).buffer,174);175return;176default:177cb?.(`unknown message type '${type}'`);178return;179}180};181182// Wait until we receive exactly *one* message of the given type183// with the given id, then call the callback with that message.184// (If the type is 'blob', with the given uuid.)185socket.recv_mesg = ({ type, id, cb, timeout }: RecvMesgOpts): void => {186let done: boolean = false;187let timeoutId: ReturnType<typeof setTimeout> | null = null;188189const f = (mesgType: Type, mesg: Readonly<Message>) => {190if (191type === mesgType &&192((type === "json" && mesg.id === id) ||193(type === "blob" && mesg.uuid === id))194) {195if (done) return;196socket.removeListener("mesg", f);197done = true;198if (timeoutId != null) {199clearTimeout(timeoutId);200}201cb(mesg);202}203};204205socket.on("mesg", f);206207if (timeout != null) {208timeoutId = setTimeout(() => {209if (done) return;210done = true;211socket.removeListener("mesg", f);212cb(error({ error: `Timed out after ${timeout} seconds.` }));213}, timeout * 1000);214}215};216}217218219