Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/persist/server.ts
1710 views
1
/*
2
CONAT_SERVER=http://localhost:3000 node
3
4
// making a server from scratch
5
6
// initialize persist context
7
8
require('@cocalc/backend/conat/persist');
9
10
// a conat server and client
11
s = require('@cocalc/conat/core/server').init({port:4567, getUser:()=>{return {hub_id:'hub'}}}); client = s.client();
12
13
// persist server
14
p = require('@cocalc/conat/persist/server').server({client}); 0;
15
16
17
18
// a client for persist server
19
20
c = require('@cocalc/conat/persist/client').stream({client, user:{hub_id:'hub'}, storage:{path:'b.txt'}});
21
22
for await (x of await c.getAll()) { console.log(x) }
23
24
25
await c.set({messageData:client.message(123)})
26
27
for await (x of await c.getAll()) { console.log(x) }
28
29
[ { seq: 1, time: 1750218209211, encoding: 0, raw: <Buffer 7b> } ]
30
31
(await c.get({seq:5})).data
32
33
await c.set({key:'foo', messageData:client.message('bar')})
34
(await c.get({key:'foo'})).data
35
36
await c.delete({seq:6})
37
38
39
client = await require('@cocalc/backend/conat').conat(); kv = require('@cocalc/backend/conat/sync').akv({project_id:'3fa218e5-7196-4020-8b30-e2127847cc4f', name:'a.txt', client})
40
41
client = await require('@cocalc/backend/conat').conat(); s = require('@cocalc/backend/conat/sync').astream({project_id:'3fa218e5-7196-4020-8b30-e2127847cc4f', name:'b.txt', client})
42
43
client = await require('@cocalc/backend/conat').conat(); s = await require('@cocalc/backend/conat/sync').dstream({project_id:'3fa218e5-7196-4020-8b30-e2127847cc4f', name:'ds2.txt', client})
44
45
46
client = await require('@cocalc/backend/conat').conat(); kv = require('@cocalc/backend/conat/sync').akv({project_id:'3fa218e5-7196-4020-8b30-e2127847cc4f', name:'a.txt', client})
47
48
49
client = await require('@cocalc/backend/conat').conat(); kv = await require('@cocalc/backend/conat/sync').dkv({project_id:'3fa218e5-7196-4020-8b30-e2127847cc4f', name:'a1', client})
50
51
52
client = await require('@cocalc/backend/conat').conat(); s = await require('@cocalc/conat/sync/core-stream').cstream({name:'d.txt',client})
53
54
55
*/
56
57
import { type Client, ConatError } from "@cocalc/conat/core/client";
58
import {
59
type ConatSocketServer,
60
type ServerSocket,
61
} from "@cocalc/conat/socket";
62
import { getLogger } from "@cocalc/conat/client";
63
import type {
64
StoredMessage,
65
PersistentStream,
66
StorageOptions,
67
} from "./storage";
68
import { getStream, SERVICE, MAX_PER_USER, MAX_GLOBAL, RESOURCE } from "./util";
69
import { throttle } from "lodash";
70
import { type SetOptions } from "./client";
71
import { once } from "@cocalc/util/async-utils";
72
import { UsageMonitor } from "@cocalc/conat/monitor/usage";
73
74
const logger = getLogger("persist:server");
75
76
// When sending a large number of message for
77
// getAll or change updates, we combine together messages
78
// until hitting this size, then send them all at once.
79
// This bound is to avoid potentially using a huge amount of RAM
80
// when streaming a large saved database to the client.
81
// Note: if a single message is larger than this, it still
82
// gets sent, just individually.
83
const DEFAULT_MESSAGES_THRESH = 20 * 1e6;
84
//const DEFAULT_MESSAGES_THRESH = 1e5;
85
86
// I added an experimental way to run any sqlite query... but it is disabled
87
// since of course there are major DOS and security concerns.
88
const ENABLE_SQLITE_GENERAL_QUERIES = false;
89
90
const SEND_THROTTLE = 30;
91
92
export function server({
93
client,
94
messagesThresh = DEFAULT_MESSAGES_THRESH,
95
service = SERVICE,
96
}: {
97
client: Client;
98
messagesThresh?: number;
99
service?: string;
100
}): ConatSocketServer {
101
logger.debug("server: creating persist server");
102
if (client == null) {
103
throw Error("client must be specified");
104
}
105
const subject = `${service}.*`;
106
const server: ConatSocketServer = client.socket.listen(subject);
107
logger.debug("server: listening on ", { subject });
108
const usage = new UsageMonitor({
109
maxPerUser: MAX_PER_USER,
110
max: MAX_GLOBAL,
111
resource: RESOURCE,
112
log: (...args) => {
113
logger.debug(RESOURCE, ...args);
114
},
115
});
116
server.on("close", () => {
117
usage.close();
118
});
119
120
server.on("connection", (socket: ServerSocket) => {
121
logger.debug("server: got new connection", {
122
id: socket.id,
123
subject: socket.subject,
124
});
125
// console.log(new Date(), "persist server got connection", {
126
// persist: client.info.id,
127
// subject: socket.subject,
128
// });
129
let error = "";
130
let errorCode: any = undefined;
131
let changefeed = false;
132
let storage: undefined | StorageOptions = undefined;
133
let stream: undefined | PersistentStream = undefined;
134
let user = "";
135
let added = false;
136
socket.on("data", async (data) => {
137
// logger.debug("server: got data ", data);
138
if (stream == null) {
139
storage = data.storage;
140
changefeed = data.changefeed;
141
try {
142
user = socket.subject.split(".")[1];
143
usage.add(user);
144
added = true;
145
stream = await getStream({
146
subject: socket.subject,
147
storage,
148
service,
149
});
150
if (changefeed) {
151
startChangefeed({ socket, stream, messagesThresh });
152
}
153
socket.emit("stream-initialized");
154
} catch (err) {
155
error = `${err}`;
156
errorCode = err.code;
157
socket.write(null, { headers: { error, code: errorCode } });
158
}
159
}
160
});
161
socket.on("closed", () => {
162
logger.debug("socket closed", socket.subject);
163
storage = undefined;
164
stream?.close();
165
stream = undefined;
166
if (added) {
167
usage.delete(user);
168
}
169
});
170
171
socket.on("request", async (mesg) => {
172
const request = mesg.headers;
173
// logger.debug("got request", request);
174
175
try {
176
if (error) {
177
throw new ConatError(error, { code: errorCode });
178
}
179
if (stream == null) {
180
await once(socket, "stream-initialized", request.timeout ?? 30000);
181
}
182
if (stream == null) {
183
throw Error("bug");
184
}
185
if (request.cmd == "set") {
186
mesg.respondSync(
187
stream.set({
188
key: request.key,
189
previousSeq: request.previousSeq,
190
raw: mesg.raw,
191
ttl: request.ttl,
192
encoding: mesg.encoding,
193
headers: request.headers,
194
msgID: request.msgID,
195
}),
196
);
197
} else if (request.cmd == "setMany") {
198
// just like set except the main data of the mesg
199
// has an array of set operations
200
const resp: (
201
| { seq: number; time: number }
202
| { error: string; code?: any }
203
)[] = [];
204
for (const {
205
key,
206
previousSeq,
207
ttl,
208
msgID,
209
messageData,
210
} of mesg.data as SetOptions[]) {
211
try {
212
resp.push(
213
stream.set({
214
key,
215
previousSeq,
216
ttl,
217
headers: messageData.headers,
218
msgID,
219
raw: messageData.raw,
220
encoding: messageData.encoding,
221
}),
222
);
223
} catch (err) {
224
resp.push({ error: `${err}`, code: err.code });
225
}
226
}
227
mesg.respondSync(resp);
228
} else if (request.cmd == "delete") {
229
mesg.respondSync(stream.delete(request));
230
} else if (request.cmd == "config") {
231
mesg.respondSync(stream.config(request.config));
232
} else if (request.cmd == "inventory") {
233
mesg.respondSync(stream.inventory());
234
} else if (request.cmd == "get") {
235
const resp = stream.get({ key: request.key, seq: request.seq });
236
//console.log("got resp = ", resp);
237
if (resp == null) {
238
mesg.respondSync(null);
239
} else {
240
const { raw, encoding, headers, seq, time, key } = resp;
241
mesg.respondSync(null, {
242
raw,
243
encoding,
244
headers: { ...headers, seq, time, key },
245
});
246
}
247
} else if (request.cmd == "keys") {
248
const resp = stream.keys();
249
mesg.respondSync(resp);
250
} else if (request.cmd == "sqlite") {
251
if (!ENABLE_SQLITE_GENERAL_QUERIES) {
252
throw Error("sqlite command not currently supported");
253
}
254
const resp = stream.sqlite(request.statement, request.params);
255
mesg.respondSync(resp);
256
} else if (request.cmd == "serverId") {
257
mesg.respondSync(server.id);
258
} else if (request.cmd == "getAll") {
259
logger.debug("getAll", { subject: socket.subject, request });
260
// getAll uses requestMany which responds with all matching messages,
261
// so no call to mesg.respond here.
262
getAll({ stream, mesg, request, messagesThresh });
263
} else if (request.cmd == "changefeed") {
264
logger.debug("changefeed", changefeed);
265
if (!changefeed) {
266
changefeed = true;
267
startChangefeed({ socket, stream, messagesThresh });
268
}
269
mesg.respondSync("created");
270
} else {
271
mesg.respondSync(null, {
272
headers: { error: `unknown command ${request.cmd}`, code: 404 },
273
});
274
}
275
} catch (err) {
276
mesg.respondSync(null, {
277
headers: { error: `${err}`, code: err.code },
278
});
279
}
280
});
281
});
282
283
return server;
284
}
285
286
async function getAll({ stream, mesg, request, messagesThresh }) {
287
let seq = 0;
288
const respond = (error?, messages?: StoredMessage[]) => {
289
mesg.respondSync(messages, { headers: { error, seq, code: error?.code } });
290
seq += 1;
291
};
292
293
try {
294
const messages: StoredMessage[] = [];
295
let size = 0;
296
for (const message of stream.getAll({
297
start_seq: request.start_seq,
298
end_seq: request.end_seq,
299
})) {
300
messages.push(message);
301
size += message.raw.length;
302
if (size >= messagesThresh) {
303
respond(undefined, messages);
304
messages.length = 0;
305
size = 0;
306
}
307
}
308
309
if (messages.length > 0) {
310
respond(undefined, messages);
311
}
312
// successful finish
313
respond();
314
} catch (err) {
315
respond(`${err}`);
316
}
317
}
318
319
function startChangefeed({ socket, stream, messagesThresh }) {
320
logger.debug("startChangefeed", { subject: socket.subject });
321
// this seq here has nothing to do with the seq of the StoredMessage!
322
let seq = 0;
323
const respond = (error?, messages?: StoredMessage[]) => {
324
if (socket.state == "closed") {
325
return;
326
}
327
//logger.debug("changefeed: writing messages to socket", { seq, messages });
328
socket.write(messages, { headers: { error, seq } });
329
seq += 1;
330
};
331
332
const unsentMessages: StoredMessage[] = [];
333
const sendAllUnsentMessages = throttle(
334
() => {
335
while (socket.state != "closed" && unsentMessages.length > 0) {
336
const messages: StoredMessage[] = [];
337
let size = 0;
338
while (unsentMessages.length > 0 && socket.state != "closed") {
339
const message = unsentMessages.shift();
340
// e.g. op:'delete' messages have length 0 and no raw field
341
size += message?.raw?.length ?? 0;
342
messages.push(message!);
343
if (size >= messagesThresh) {
344
respond(undefined, messages);
345
size = 0;
346
messages.length = 0;
347
}
348
}
349
if (messages.length > 0) {
350
respond(undefined, messages);
351
}
352
}
353
},
354
SEND_THROTTLE,
355
{ leading: true, trailing: true },
356
);
357
358
stream.on("change", (message) => {
359
if (socket.state == "closed") {
360
return;
361
}
362
//console.log("stream change event", message);
363
// logger.debug("changefeed got message", message, socket.state);
364
unsentMessages.push(message);
365
sendAllUnsentMessages();
366
});
367
}
368
369