CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place. Commercial Alternative to JupyterHub.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/backend/tcp/enable-messaging-protocol.ts
Views: 923
1
/*
2
* This file is part of CoCalc: Copyright © 2022 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
/*
7
8
Enable two new functions write_mesg and recv_mesg on a TCP socket.
9
10
*/
11
12
import { Buffer } from "node:buffer";
13
import { Socket } from "node:net";
14
15
import getLogger from "@cocalc/backend/logger";
16
import { error } from "@cocalc/util/message";
17
import { from_json_socket, to_json_socket, trunc } from "@cocalc/util/misc";
18
19
const winston = getLogger("tcp.enable");
20
21
export type Type = "json" | "blob";
22
23
export interface Message {
24
id?: string;
25
uuid?: string;
26
blob?: Buffer | string;
27
ttlSeconds?: number;
28
event?: "sage_raw_input" | "hello";
29
value?: any;
30
done?: boolean;
31
}
32
33
interface RecvMesgOpts {
34
type: Type;
35
id: string; // or uuid
36
cb: (message: object) => void; // called with cb(mesg)
37
timeout?: number; // units of **seconds** (NOT ms!).
38
}
39
40
export interface CoCalcSocket extends Socket {
41
id: string;
42
pid?: number;
43
heartbeat?: Date;
44
write_mesg: (
45
type: Type,
46
mesg: Message,
47
cb?: (err?: string | Error) => void,
48
) => void;
49
recv_mesg: (opts: RecvMesgOpts) => void;
50
}
51
52
export default function enable(socket: CoCalcSocket, desc: string = "") {
53
socket.setMaxListeners(500); // we use a lot of listeners for listening for messages
54
55
let buf: Uint8Array | null = null;
56
let bufTargetLength = -1;
57
58
const listenForMesg = (data: Uint8Array) => {
59
buf = buf == null ? data : new Uint8Array([...buf, ...data]);
60
while (true) {
61
if (bufTargetLength === -1) {
62
// starting to read a new message
63
if (buf.length >= 4) {
64
const dv = new DataView(buf.buffer, buf.byteOffset, buf.byteLength);
65
bufTargetLength = dv.getUint32(0) + 4;
66
} else {
67
return; // have to wait for more data to find out message length
68
}
69
}
70
if (bufTargetLength <= buf.length) {
71
// read a new message from our buffer
72
const type = String.fromCharCode(buf[4]);
73
const mesg = buf.slice(5, bufTargetLength);
74
75
const textDecoder = new TextDecoder();
76
switch (type) {
77
case "j": // JSON
78
const s = textDecoder.decode(mesg);
79
try {
80
// Do not use "obj = JSON.parse(s)"
81
const obj = from_json_socket(s); // this properly parses Date objects
82
socket.emit("mesg", "json", obj);
83
} catch (err) {
84
winston.debug(
85
`WARNING: failed to parse JSON message='${trunc(
86
s,
87
512,
88
)}' on socket ${desc} - ${err}`,
89
);
90
// skip it.
91
return;
92
}
93
break;
94
case "b": // BLOB (tagged by a uuid)
95
socket.emit("mesg", "blob", {
96
uuid: textDecoder.decode(mesg.slice(0, 36)),
97
blob: mesg.slice(36),
98
});
99
break;
100
default:
101
// NOTE -- better to show warning than throw possibly uncaught exception, since
102
// don't want malicious user to cause anything to crash.
103
winston.debug(`WARNING: unknown message type: '${type}'`);
104
return;
105
}
106
buf = buf.slice(bufTargetLength);
107
bufTargetLength = -1;
108
if (buf.length === 0) {
109
return;
110
}
111
} else {
112
// nothing to do but wait for more data
113
return;
114
}
115
}
116
};
117
118
socket.on("data", listenForMesg);
119
120
socket.write_mesg = (
121
type: Type,
122
data: Message,
123
cb?: (err?: string | Error) => void,
124
): void => {
125
if (data == null) {
126
// uncomment this to get a traceback to see what might be causing this...
127
//throw Error(`write_mesg(type='${type}': data must be defined`);
128
cb?.(`write_mesg(type='${type}': data must be defined`);
129
return;
130
}
131
const send = function (s: string | ArrayBuffer): void {
132
const length: Uint8Array = new Uint8Array(4);
133
// This line was 4 hours of work. It is absolutely
134
// *critical* to change the (possibly a string) s into a
135
// buffer before computing its length and sending it!!
136
// Otherwise unicode characters will cause trouble.
137
const data: Uint8Array = new Uint8Array(
138
typeof s === "string" ? Buffer.from(s) : s,
139
);
140
141
const lengthView = new DataView(length.buffer);
142
// this was buf.writeInt32BE, i.e. big endian
143
lengthView.setInt32(0, data.byteLength, false); // false for big-endian
144
145
if (!socket.writable) {
146
cb?.("socket not writable");
147
return;
148
} else {
149
socket.write(length);
150
socket.write(data, cb);
151
}
152
};
153
154
switch (type) {
155
case "json":
156
send("j" + to_json_socket(data));
157
return;
158
case "blob":
159
if (data.uuid == null) {
160
cb?.("data object *must* have a uuid attribute");
161
return;
162
}
163
if (data.blob == null) {
164
cb?.("data object *must* have a blob attribute");
165
return;
166
}
167
send(
168
new Uint8Array([
169
...Buffer.from("b"),
170
...Buffer.from(data.uuid),
171
...(Buffer.isBuffer(data.blob)
172
? data.blob
173
: Buffer.from(data.blob)),
174
]).buffer,
175
);
176
return;
177
default:
178
cb?.(`unknown message type '${type}'`);
179
return;
180
}
181
};
182
183
// Wait until we receive exactly *one* message of the given type
184
// with the given id, then call the callback with that message.
185
// (If the type is 'blob', with the given uuid.)
186
socket.recv_mesg = ({ type, id, cb, timeout }: RecvMesgOpts): void => {
187
let done: boolean = false;
188
let timeoutId: ReturnType<typeof setTimeout> | null = null;
189
190
const f = (mesgType: Type, mesg: Readonly<Message>) => {
191
if (
192
type === mesgType &&
193
((type === "json" && mesg.id === id) ||
194
(type === "blob" && mesg.uuid === id))
195
) {
196
if (done) return;
197
socket.removeListener("mesg", f);
198
done = true;
199
if (timeoutId != null) {
200
clearTimeout(timeoutId);
201
}
202
cb(mesg);
203
}
204
};
205
206
socket.on("mesg", f);
207
208
if (timeout != null) {
209
timeoutId = setTimeout(() => {
210
if (done) return;
211
done = true;
212
socket.removeListener("mesg", f);
213
cb(error({ error: `Timed out after ${timeout} seconds.` }));
214
}, timeout * 1000);
215
}
216
};
217
}
218
219