Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Path: blob/master/src/packages/backend/tcp/enable-messaging-protocol.ts
Views: 687
/*1* This file is part of CoCalc: Copyright © 2022 Sagemath, Inc.2* License: MS-RSL – see LICENSE.md for details3*/45/*67Enable two new functions write_mesg and recv_mesg on a TCP socket.89*/1011import { Socket } from "node:net";1213import getLogger from "@cocalc/backend/logger";14import { error } from "@cocalc/util/message";15import { from_json_socket, to_json_socket, trunc } from "@cocalc/util/misc";1617const winston = getLogger("tcp.enable");1819export type Type = "json" | "blob";2021export interface Message {22id?: string;23uuid?: string;24blob?: Buffer | string;25ttlSeconds?: number;26event?: "sage_raw_input" | "hello";27value?: any;28done?: boolean;29}3031interface RecvMesgOpts {32type: Type;33id: string; // or uuid34cb: (message: object) => void; // called with cb(mesg)35timeout?: number; // units of **seconds** (NOT ms!).36}3738export interface CoCalcSocket extends Socket {39id: string;40pid?: number;41heartbeat?: Date;42write_mesg: (43type: Type,44mesg: Message,45cb?: (err?: string | Error) => void46) => void;47recv_mesg: (opts: RecvMesgOpts) => void;48}4950export default function enable(socket: CoCalcSocket, desc: string = "") {51socket.setMaxListeners(500); // we use a lot of listeners for listening for messages5253let buf: Buffer | null = null;54let bufTargetLength = -1;5556const listenForMesg = (data: Buffer) => {57buf = buf == null ? data : Buffer.concat([buf, data]);58while (true) {59if (bufTargetLength === -1) {60// starting to read a new message61if (buf.length >= 4) {62bufTargetLength = buf.readUInt32BE(0) + 4;63} else {64return; // have to wait for more data to find out message length65}66}67if (bufTargetLength <= buf.length) {68// read a new message from our buffer69const type = buf.slice(4, 5).toString();70const mesg = buf.slice(5, bufTargetLength);71switch (type) {72case "j": // JSON73const s = mesg.toString();74let obj;75try {76// Do not use "obj = JSON.parse(s)"77obj = from_json_socket(s); // this properly parses Date objects78} catch (err) {79winston.debug(80`WARNING: failed to parse JSON message='${trunc(81s,8251283)}' on socket ${desc} - ${err}`84);85// skip it.86return;87}88socket.emit("mesg", "json", obj);89break;90case "b": // BLOB (tagged by a uuid)91socket.emit("mesg", "blob", {92uuid: mesg.slice(0, 36).toString(),93blob: mesg.slice(36),94});95break;96default:97// NOTE -- better to show warning than throw possibly uncaught exception, since98// don't want malicious user to cause anything to crash.99winston.debug(`WARNING: unknown message type: '${type}'`);100return;101}102buf = buf.slice(bufTargetLength);103bufTargetLength = -1;104if (buf.length === 0) {105return;106}107} else {108// nothing to do but wait for more data109return;110}111}112};113114socket.on("data", listenForMesg);115116socket.write_mesg = (117type: Type,118data: Message,119cb?: (err?: string | Error) => void120): void => {121if (data == null) {122// uncomment this to get a traceback to see what might be causing this...123//throw Error(`write_mesg(type='${type}': data must be defined`);124cb?.(`write_mesg(type='${type}': data must be defined`);125return;126}127const send = function (s: string | Buffer): void {128const buf = Buffer.alloc(4);129// This line was 4 hours of work. It is absolutely130// *critical* to change the (possibly a string) s into a131// buffer before computing its length and sending it!!132// Otherwise unicode characters will cause trouble.133if (typeof s === "string") {134s = Buffer.from(s);135}136buf.writeInt32BE(s.length, 0);137if (!socket.writable) {138cb?.("socket not writable");139return;140} else {141socket.write(buf);142}143144if (!socket.writable) {145cb?.("socket not writable");146return;147} else {148socket.write(s, cb);149}150};151152switch (type) {153case "json":154send("j" + to_json_socket(data));155return;156case "blob":157if (data.uuid == null) {158cb?.("data object *must* have a uuid attribute");159return;160}161if (data.blob == null) {162cb?.("data object *must* have a blob attribute");163return;164}165send(166Buffer.concat([167Buffer.from("b"),168Buffer.from(data.uuid),169Buffer.from(data.blob),170])171);172return;173default:174cb?.(`unknown message type '${type}'`);175return;176}177};178179// Wait until we receive exactly *one* message of the given type180// with the given id, then call the callback with that message.181// (If the type is 'blob', with the given uuid.)182socket.recv_mesg = ({ type, id, cb, timeout }: RecvMesgOpts): void => {183let done: boolean = false;184let timeoutId: ReturnType<typeof setTimeout> | null = null;185186const f = (mesgType: Type, mesg: Readonly<Message>) => {187if (188type === mesgType &&189((type === "json" && mesg.id === id) ||190(type === "blob" && mesg.uuid === id))191) {192if (done) return;193socket.removeListener("mesg", f);194done = true;195if (timeoutId != null) {196clearTimeout(timeoutId);197}198cb(mesg);199}200};201202socket.on("mesg", f);203204if (timeout != null) {205timeoutId = setTimeout(() => {206if (done) return;207done = true;208socket.removeListener("mesg", f);209cb(error({ error: `Timed out after ${timeout} seconds.` }));210}, timeout * 1000);211}212};213}214215216