CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/backend/tcp/enable-messaging-protocol.ts
Views: 687
1
/*
2
* This file is part of CoCalc: Copyright © 2022 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
/*
7
8
Enable two new functions write_mesg and recv_mesg on a TCP socket.
9
10
*/
11
12
import { Socket } from "node:net";
13
14
import getLogger from "@cocalc/backend/logger";
15
import { error } from "@cocalc/util/message";
16
import { from_json_socket, to_json_socket, trunc } from "@cocalc/util/misc";
17
18
const winston = getLogger("tcp.enable");
19
20
export type Type = "json" | "blob";
21
22
export interface Message {
23
id?: string;
24
uuid?: string;
25
blob?: Buffer | string;
26
ttlSeconds?: number;
27
event?: "sage_raw_input" | "hello";
28
value?: any;
29
done?: boolean;
30
}
31
32
interface RecvMesgOpts {
33
type: Type;
34
id: string; // or uuid
35
cb: (message: object) => void; // called with cb(mesg)
36
timeout?: number; // units of **seconds** (NOT ms!).
37
}
38
39
export interface CoCalcSocket extends Socket {
40
id: string;
41
pid?: number;
42
heartbeat?: Date;
43
write_mesg: (
44
type: Type,
45
mesg: Message,
46
cb?: (err?: string | Error) => void
47
) => void;
48
recv_mesg: (opts: RecvMesgOpts) => void;
49
}
50
51
export default function enable(socket: CoCalcSocket, desc: string = "") {
52
socket.setMaxListeners(500); // we use a lot of listeners for listening for messages
53
54
let buf: Buffer | null = null;
55
let bufTargetLength = -1;
56
57
const listenForMesg = (data: Buffer) => {
58
buf = buf == null ? data : Buffer.concat([buf, data]);
59
while (true) {
60
if (bufTargetLength === -1) {
61
// starting to read a new message
62
if (buf.length >= 4) {
63
bufTargetLength = buf.readUInt32BE(0) + 4;
64
} else {
65
return; // have to wait for more data to find out message length
66
}
67
}
68
if (bufTargetLength <= buf.length) {
69
// read a new message from our buffer
70
const type = buf.slice(4, 5).toString();
71
const mesg = buf.slice(5, bufTargetLength);
72
switch (type) {
73
case "j": // JSON
74
const s = mesg.toString();
75
let obj;
76
try {
77
// Do not use "obj = JSON.parse(s)"
78
obj = from_json_socket(s); // this properly parses Date objects
79
} catch (err) {
80
winston.debug(
81
`WARNING: failed to parse JSON message='${trunc(
82
s,
83
512
84
)}' on socket ${desc} - ${err}`
85
);
86
// skip it.
87
return;
88
}
89
socket.emit("mesg", "json", obj);
90
break;
91
case "b": // BLOB (tagged by a uuid)
92
socket.emit("mesg", "blob", {
93
uuid: mesg.slice(0, 36).toString(),
94
blob: mesg.slice(36),
95
});
96
break;
97
default:
98
// NOTE -- better to show warning than throw possibly uncaught exception, since
99
// don't want malicious user to cause anything to crash.
100
winston.debug(`WARNING: unknown message type: '${type}'`);
101
return;
102
}
103
buf = buf.slice(bufTargetLength);
104
bufTargetLength = -1;
105
if (buf.length === 0) {
106
return;
107
}
108
} else {
109
// nothing to do but wait for more data
110
return;
111
}
112
}
113
};
114
115
socket.on("data", listenForMesg);
116
117
socket.write_mesg = (
118
type: Type,
119
data: Message,
120
cb?: (err?: string | Error) => void
121
): void => {
122
if (data == null) {
123
// uncomment this to get a traceback to see what might be causing this...
124
//throw Error(`write_mesg(type='${type}': data must be defined`);
125
cb?.(`write_mesg(type='${type}': data must be defined`);
126
return;
127
}
128
const send = function (s: string | Buffer): void {
129
const buf = Buffer.alloc(4);
130
// This line was 4 hours of work. It is absolutely
131
// *critical* to change the (possibly a string) s into a
132
// buffer before computing its length and sending it!!
133
// Otherwise unicode characters will cause trouble.
134
if (typeof s === "string") {
135
s = Buffer.from(s);
136
}
137
buf.writeInt32BE(s.length, 0);
138
if (!socket.writable) {
139
cb?.("socket not writable");
140
return;
141
} else {
142
socket.write(buf);
143
}
144
145
if (!socket.writable) {
146
cb?.("socket not writable");
147
return;
148
} else {
149
socket.write(s, cb);
150
}
151
};
152
153
switch (type) {
154
case "json":
155
send("j" + to_json_socket(data));
156
return;
157
case "blob":
158
if (data.uuid == null) {
159
cb?.("data object *must* have a uuid attribute");
160
return;
161
}
162
if (data.blob == null) {
163
cb?.("data object *must* have a blob attribute");
164
return;
165
}
166
send(
167
Buffer.concat([
168
Buffer.from("b"),
169
Buffer.from(data.uuid),
170
Buffer.from(data.blob),
171
])
172
);
173
return;
174
default:
175
cb?.(`unknown message type '${type}'`);
176
return;
177
}
178
};
179
180
// Wait until we receive exactly *one* message of the given type
181
// with the given id, then call the callback with that message.
182
// (If the type is 'blob', with the given uuid.)
183
socket.recv_mesg = ({ type, id, cb, timeout }: RecvMesgOpts): void => {
184
let done: boolean = false;
185
let timeoutId: ReturnType<typeof setTimeout> | null = null;
186
187
const f = (mesgType: Type, mesg: Readonly<Message>) => {
188
if (
189
type === mesgType &&
190
((type === "json" && mesg.id === id) ||
191
(type === "blob" && mesg.uuid === id))
192
) {
193
if (done) return;
194
socket.removeListener("mesg", f);
195
done = true;
196
if (timeoutId != null) {
197
clearTimeout(timeoutId);
198
}
199
cb(mesg);
200
}
201
};
202
203
socket.on("mesg", f);
204
205
if (timeout != null) {
206
timeoutId = setTimeout(() => {
207
if (done) return;
208
done = true;
209
socket.removeListener("mesg", f);
210
cb(error({ error: `Timed out after ${timeout} seconds.` }));
211
}, timeout * 1000);
212
}
213
};
214
}
215
216