Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/persist/server.ts
5819 views
1
/*
2
CONAT_SERVER=http://localhost:3000 node
3
4
// making a server from scratch
5
6
// initialize persist context
7
8
require('@cocalc/backend/conat/persist');
9
10
// a conat server and client
11
s = require('@cocalc/conat/core/server').init({port:4567, getUser:()=>{return {hub_id:'hub'}}}); client = s.client();
12
13
// persist server
14
p = require('@cocalc/conat/persist/server').server({client}); 0;
15
16
17
18
// a client for persist server
19
20
c = require('@cocalc/conat/persist/client').stream({client, user:{hub_id:'hub'}, storage:{path:'b.txt'}});
21
22
for await (x of await c.getAll()) { console.log(x) }
23
24
25
await c.set({messageData:client.message(123)})
26
27
for await (x of await c.getAll()) { console.log(x) }
28
29
[ { seq: 1, time: 1750218209211, encoding: 0, raw: <Buffer 7b> } ]
30
31
(await c.get({seq:5})).data
32
33
await c.set({key:'foo', messageData:client.message('bar')})
34
(await c.get({key:'foo'})).data
35
36
await c.delete({seq:6})
37
38
39
client = await require('@cocalc/backend/conat').conat(); kv = require('@cocalc/backend/conat/sync').akv({project_id:'3fa218e5-7196-4020-8b30-e2127847cc4f', name:'a.txt', client})
40
41
client = await require('@cocalc/backend/conat').conat(); s = require('@cocalc/backend/conat/sync').astream({project_id:'3fa218e5-7196-4020-8b30-e2127847cc4f', name:'b.txt', client})
42
43
client = await require('@cocalc/backend/conat').conat(); s = await require('@cocalc/backend/conat/sync').dstream({project_id:'3fa218e5-7196-4020-8b30-e2127847cc4f', name:'ds2.txt', client})
44
45
46
client = await require('@cocalc/backend/conat').conat(); kv = require('@cocalc/backend/conat/sync').akv({project_id:'3fa218e5-7196-4020-8b30-e2127847cc4f', name:'a.txt', client})
47
48
49
client = await require('@cocalc/backend/conat').conat(); kv = await require('@cocalc/backend/conat/sync').dkv({project_id:'3fa218e5-7196-4020-8b30-e2127847cc4f', name:'a1', client})
50
51
52
client = await require('@cocalc/backend/conat').conat(); s = await require('@cocalc/conat/sync/core-stream').cstream({name:'d.txt',client})
53
54
55
*/
56
57
import { type Client, ConatError } from "@cocalc/conat/core/client";
58
import {
59
type ConatSocketServer,
60
type ServerSocket,
61
} from "@cocalc/conat/socket";
62
import type {
63
StoredMessage,
64
PersistentStream,
65
StorageOptions,
66
} from "./storage";
67
import { getStream, SERVICE, MAX_PER_USER, MAX_GLOBAL, RESOURCE } from "./util";
68
import { throttle } from "lodash";
69
import { type SetOptions } from "./client";
70
import { once } from "@cocalc/util/async-utils";
71
import { UsageMonitor } from "@cocalc/conat/monitor/usage";
72
import { getLogger } from "@cocalc/conat/client";
73
import { initLoadBalancer } from "./load-balancer";
74
75
const logger = getLogger("persist:server");
76
77
// When sending a large number of message for
78
// getAll or change updates, we combine together messages
79
// until hitting this size, then send them all at once.
80
// This bound is to avoid potentially using a huge amount of RAM
81
// when streaming a large saved database to the client.
82
// Note: if a single message is larger than this, it still
83
// gets sent, just individually.
84
const DEFAULT_MESSAGES_THRESH = 20 * 1e6;
85
//const DEFAULT_MESSAGES_THRESH = 1e5;
86
87
// I added an experimental way to run any sqlite query... but it is disabled
88
// since of course there are major DOS and security concerns.
89
const ENABLE_SQLITE_GENERAL_QUERIES = false;
90
91
const SEND_THROTTLE = 30;
92
93
export function server({
94
client,
95
messagesThresh = DEFAULT_MESSAGES_THRESH,
96
service = SERVICE,
97
id = "0",
98
clusterMode,
99
}: {
100
client: Client;
101
messagesThresh?: number;
102
service?: string;
103
id?: string;
104
// if false, runs it's own internal load balancer that always returns this server
105
clusterMode?: boolean;
106
}): ConatSocketServer {
107
const log = (...args) => {
108
logger.debug(id, service, ...args);
109
};
110
log("server: creating persist server");
111
if (client == null) {
112
throw Error("client must be specified");
113
}
114
const subject = `${service}.*`;
115
const server: ConatSocketServer = client.socket.listen(subject, { id });
116
log("server listening", { subject, id });
117
if (!clusterMode) {
118
log("persist server not in cluster mode, so starting own load balancer");
119
initLoadBalancer({ service, ids: [id], client });
120
}
121
const usage = new UsageMonitor({
122
maxPerUser: MAX_PER_USER,
123
max: MAX_GLOBAL,
124
resource: RESOURCE,
125
log: (...args) => {
126
log(RESOURCE, ...args);
127
},
128
});
129
130
server.on("close", () => {
131
log("stopping persist server", { id, service });
132
usage.close();
133
});
134
135
server.on("connection", (socket: ServerSocket) => {
136
log("server: got new connection", {
137
id,
138
service,
139
subject: socket.subject,
140
});
141
// console.log(new Date(), "persist server got connection", {
142
// persist: client.info.id,
143
// subject: socket.subject,
144
// });
145
let error = "";
146
let errorCode: any = undefined;
147
let changefeed = false;
148
let storage: undefined | StorageOptions = undefined;
149
let stream: undefined | PersistentStream = undefined;
150
let user = "";
151
let added = false;
152
socket.on("data", async (data) => {
153
// log("server: got data ", data);
154
if (stream == null) {
155
storage = data.storage;
156
changefeed = data.changefeed;
157
try {
158
user = socket.subject.split(".")[1];
159
usage.add(user);
160
added = true;
161
stream = await getStream({
162
subject: socket.subject,
163
storage,
164
service,
165
});
166
if (changefeed) {
167
startChangefeed({ socket, stream, messagesThresh });
168
}
169
socket.emit("stream-initialized");
170
} catch (err) {
171
error = `${err}`;
172
errorCode = err.code;
173
socket.write(null, { headers: { error, code: errorCode } });
174
}
175
}
176
});
177
socket.on("closed", () => {
178
log("socket closed", id, socket.subject);
179
storage = undefined;
180
stream?.close();
181
stream = undefined;
182
if (added) {
183
usage.delete(user);
184
}
185
});
186
187
socket.on("request", async (mesg) => {
188
const request = mesg.headers;
189
// log("got request", request);
190
191
try {
192
if (error) {
193
throw new ConatError(error, { code: errorCode });
194
}
195
if (stream == null) {
196
await once(socket, "stream-initialized", request.timeout ?? 30000);
197
}
198
if (stream == null) {
199
throw Error("bug");
200
}
201
if (request.cmd == "set") {
202
mesg.respondSync(
203
stream.set({
204
key: request.key,
205
previousSeq: request.previousSeq,
206
raw: mesg.raw,
207
ttl: request.ttl,
208
encoding: mesg.encoding,
209
headers: request.headers,
210
msgID: request.msgID,
211
}),
212
);
213
} else if (request.cmd == "setMany") {
214
// just like set except the main data of the mesg
215
// has an array of set operations
216
const resp: (
217
| { seq: number; time: number }
218
| { error: string; code?: any }
219
)[] = [];
220
for (const {
221
key,
222
previousSeq,
223
ttl,
224
msgID,
225
messageData,
226
} of mesg.data as SetOptions[]) {
227
try {
228
resp.push(
229
stream.set({
230
key,
231
previousSeq,
232
ttl,
233
headers: messageData.headers,
234
msgID,
235
raw: messageData.raw,
236
encoding: messageData.encoding,
237
}),
238
);
239
} catch (err) {
240
resp.push({ error: `${err}`, code: err.code });
241
}
242
}
243
mesg.respondSync(resp);
244
} else if (request.cmd == "delete") {
245
mesg.respondSync(stream.delete(request));
246
} else if (request.cmd == "config") {
247
mesg.respondSync(stream.config(request.config));
248
} else if (request.cmd == "inventory") {
249
mesg.respondSync(stream.inventory());
250
} else if (request.cmd == "get") {
251
const resp = stream.get({ key: request.key, seq: request.seq });
252
//console.log("got resp = ", resp);
253
if (resp == null) {
254
mesg.respondSync(null);
255
} else {
256
const { raw, encoding, headers, seq, time, key } = resp;
257
mesg.respondSync(null, {
258
raw,
259
encoding,
260
headers: { ...headers, seq, time, key },
261
});
262
}
263
} else if (request.cmd == "keys") {
264
const resp = stream.keys();
265
mesg.respondSync(resp);
266
} else if (request.cmd == "sqlite") {
267
if (!ENABLE_SQLITE_GENERAL_QUERIES) {
268
throw Error("sqlite command not currently supported");
269
}
270
const resp = stream.sqlite(request.statement, request.params);
271
mesg.respondSync(resp);
272
} else if (request.cmd == "serverId") {
273
mesg.respondSync(server.id);
274
} else if (request.cmd == "getAll") {
275
log("getAll", { subject: socket.subject, request });
276
// getAll uses requestMany which responds with all matching messages,
277
// so no call to mesg.respond here.
278
getAll({ stream, mesg, request, messagesThresh });
279
} else if (request.cmd == "changefeed") {
280
log("changefeed", changefeed);
281
if (!changefeed) {
282
changefeed = true;
283
startChangefeed({ socket, stream, messagesThresh });
284
}
285
mesg.respondSync("created");
286
} else {
287
mesg.respondSync(null, {
288
headers: { error: `unknown command ${request.cmd}`, code: 404 },
289
});
290
}
291
} catch (err) {
292
mesg.respondSync(null, {
293
headers: { error: `${err}`, code: err.code },
294
});
295
}
296
});
297
});
298
299
return server;
300
}
301
302
async function getAll({ stream, mesg, request, messagesThresh }) {
303
let seq = 0;
304
const respond = (error?, messages?: StoredMessage[]) => {
305
mesg.respondSync(messages, { headers: { error, seq, code: error?.code } });
306
seq += 1;
307
};
308
309
try {
310
const messages: StoredMessage[] = [];
311
let size = 0;
312
for (const message of stream.getAll({
313
start_seq: request.start_seq,
314
end_seq: request.end_seq,
315
})) {
316
messages.push(message);
317
size += message.raw.length;
318
if (size >= messagesThresh) {
319
respond(undefined, messages);
320
messages.length = 0;
321
size = 0;
322
}
323
}
324
325
if (messages.length > 0) {
326
respond(undefined, messages);
327
}
328
// successful finish
329
respond();
330
} catch (err) {
331
respond(`${err}`);
332
}
333
}
334
335
function startChangefeed({ socket, stream, messagesThresh }) {
336
logger.debug("startChangefeed", { subject: socket.subject });
337
// this seq here has nothing to do with the seq of the StoredMessage!
338
let seq = 0;
339
const respond = (error?, messages?: StoredMessage[]) => {
340
if (socket.state == "closed") {
341
return;
342
}
343
//logger.debug("changefeed: writing messages to socket", { seq, messages });
344
socket.write(messages, { headers: { error, seq } });
345
seq += 1;
346
};
347
348
const unsentMessages: StoredMessage[] = [];
349
const sendAllUnsentMessages = throttle(
350
() => {
351
while (socket.state != "closed" && unsentMessages.length > 0) {
352
const messages: StoredMessage[] = [];
353
let size = 0;
354
while (unsentMessages.length > 0 && socket.state != "closed") {
355
const message = unsentMessages.shift();
356
// e.g. op:'delete' messages have length 0 and no raw field
357
size += message?.raw?.length ?? 0;
358
messages.push(message!);
359
if (size >= messagesThresh) {
360
respond(undefined, messages);
361
size = 0;
362
messages.length = 0;
363
}
364
}
365
if (messages.length > 0) {
366
respond(undefined, messages);
367
}
368
}
369
},
370
SEND_THROTTLE,
371
{ leading: true, trailing: true },
372
);
373
374
stream.on("change", (message) => {
375
if (socket.state == "closed") {
376
return;
377
}
378
//console.log("stream change event", message);
379
// logger.debug("changefeed got message", message, socket.state);
380
unsentMessages.push(message);
381
sendAllUnsentMessages();
382
});
383
}
384
385