Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/persist/client.ts
5805 views
1
import {
2
type Message as ConatMessage,
3
type Client,
4
type MessageData,
5
ConatError,
6
} from "@cocalc/conat/core/client";
7
import { type ConatSocketClient } from "@cocalc/conat/socket";
8
import { EventIterator } from "@cocalc/util/event-iterator";
9
import type {
10
StorageOptions,
11
Configuration,
12
SetOperation,
13
DeleteOperation,
14
StoredMessage,
15
PartialInventory,
16
} from "./storage";
17
export { StoredMessage, StorageOptions };
18
import { persistSubject, SERVICE, type User } from "./util";
19
import { assertHasWritePermission as assertHasWritePermission0 } from "./auth";
20
import { refCacheSync } from "@cocalc/util/refcache";
21
import { EventEmitter } from "events";
22
import { getLogger } from "@cocalc/conat/client";
23
import { until } from "@cocalc/util/async-utils";
24
import { getPersistServerId } from "./load-balancer";
25
26
let DEFAULT_RECONNECT_DELAY = 1500;
27
28
export function setDefaultReconnectDelay(delay) {
29
DEFAULT_RECONNECT_DELAY = delay;
30
}
31
32
interface GetAllOpts {
33
start_seq?: number;
34
end_seq?: number;
35
timeout?: number;
36
maxWait?: number;
37
}
38
39
const logger = getLogger("persist:client");
40
41
export type ChangefeedEvent = (SetOperation | DeleteOperation)[];
42
export type Changefeed = EventIterator<ChangefeedEvent>;
43
44
export { type PersistStreamClient };
45
class PersistStreamClient extends EventEmitter {
46
public socket: ConatSocketClient;
47
private changefeeds: any[] = [];
48
private state: "ready" | "closed" = "ready";
49
private lastSeq?: number;
50
private reconnecting = false;
51
private gettingMissed = false;
52
private changesWhenGettingMissed: ChangefeedEvent[] = [];
53
private reconnectTimer?: ReturnType<typeof setTimeout>;
54
55
constructor(
56
private client: Client,
57
private storage: StorageOptions,
58
private user: User,
59
private service = SERVICE,
60
) {
61
super();
62
this.setMaxListeners(100);
63
// paths.add(this.storage.path);
64
logger.debug("constructor", this.storage);
65
this.init();
66
}
67
68
private init = () => {
69
if (this.client.state == "closed") {
70
this.close();
71
return;
72
}
73
if (this.isClosed()) {
74
return;
75
}
76
this.socket?.close();
77
const subject = persistSubject({ ...this.user, service: this.service });
78
this.socket = this.client.socket.connect(subject, {
79
desc: `persist: ${this.storage.path}`,
80
reconnection: false,
81
loadBalancer: async (subject: string) =>
82
await getPersistServerId({ client: this.client, subject }),
83
});
84
logger.debug("init", this.storage.path, "connecting to ", subject);
85
this.socket.write({
86
storage: this.storage,
87
changefeed: this.changefeeds.length > 0,
88
});
89
90
// get any messages from the stream that we missed while offline;
91
// this only matters if there are changefeeds.
92
if (this.reconnecting) {
93
this.getMissed();
94
}
95
96
this.socket.once("disconnected", () => {
97
this.reconnecting = true;
98
this.socket.removeAllListeners();
99
this.scheduleReconnect();
100
});
101
this.socket.once("closed", () => {
102
this.reconnecting = true;
103
this.socket.removeAllListeners();
104
this.scheduleReconnect();
105
});
106
107
this.socket.on("data", (updates, headers) => {
108
if (updates == null && headers != null) {
109
// has to be an error
110
this.emit(
111
"error",
112
new ConatError(headers?.error, { code: headers?.code }),
113
);
114
this.close();
115
return;
116
}
117
if (this.gettingMissed) {
118
this.changesWhenGettingMissed.push(updates);
119
} else {
120
this.changefeedEmit(updates);
121
}
122
});
123
};
124
125
private getMissed = async () => {
126
if (this.changefeeds.length == 0 || this.state != "ready") {
127
return;
128
}
129
try {
130
this.gettingMissed = true;
131
this.changesWhenGettingMissed.length = 0;
132
133
await until(
134
async () => {
135
if (this.changefeeds.length == 0 || this.state != "ready") {
136
return true;
137
}
138
try {
139
await this.socket.waitUntilReady(15000);
140
if (this.changefeeds.length == 0 || this.state != "ready") {
141
return true;
142
}
143
const resp = await this.socket.request(null, {
144
headers: {
145
cmd: "changefeed",
146
},
147
});
148
if (resp.headers?.error) {
149
throw new ConatError(`${resp.headers?.error}`, {
150
code: resp.headers?.code,
151
});
152
}
153
if (this.changefeeds.length == 0 || this.state != "ready") {
154
return true;
155
}
156
const updates = await this.getAll({
157
start_seq: this.lastSeq,
158
timeout: 15000,
159
});
160
this.changefeedEmit(updates);
161
return true;
162
} catch {
163
return false;
164
}
165
},
166
{ min: 2000, max: 15000 },
167
);
168
} finally {
169
if (this.state != "ready") {
170
return;
171
}
172
this.gettingMissed = false;
173
for (const updates of this.changesWhenGettingMissed) {
174
this.changefeedEmit(updates);
175
}
176
this.changesWhenGettingMissed.length = 0;
177
}
178
};
179
180
private changefeedEmit = (updates: ChangefeedEvent) => {
181
updates = updates.filter((update) => {
182
if (update.op == "delete") {
183
return true;
184
} else {
185
if (update.seq > (this.lastSeq ?? 0)) {
186
this.lastSeq = update.seq;
187
return true;
188
}
189
}
190
return false;
191
});
192
if (updates.length == 0) {
193
return;
194
}
195
this.emit("changefeed", updates);
196
};
197
198
private isClosed = () => this.state == "closed";
199
200
private scheduleReconnect = () => {
201
if (this.state == "closed") {
202
return;
203
}
204
if (this.reconnectTimer != null) {
205
clearTimeout(this.reconnectTimer);
206
}
207
this.reconnectTimer = setTimeout(this.init, DEFAULT_RECONNECT_DELAY);
208
this.reconnectTimer.unref?.();
209
};
210
211
close = () => {
212
logger.debug("close", this.storage);
213
// paths.delete(this.storage.path);
214
this.state = "closed";
215
this.emit("closed");
216
if (this.reconnectTimer != null) {
217
clearTimeout(this.reconnectTimer);
218
this.reconnectTimer = undefined;
219
}
220
for (const iter of this.changefeeds) {
221
iter.close();
222
this.changefeeds.length = 0;
223
}
224
this.socket.close();
225
};
226
227
// The changefeed is *guaranteed* to deliver every message
228
// in the stream **exactly once and in order**, even if there
229
// are disconnects, failovers, etc. Dealing with dropped messages,
230
// duplicates, etc., is NOT the responsibility of clients.
231
changefeed = async (): Promise<Changefeed> => {
232
// activate changefeed mode (so server publishes updates -- this is idempotent)
233
const resp = await this.socket.request(null, {
234
headers: {
235
cmd: "changefeed",
236
},
237
});
238
if (resp.headers?.error) {
239
throw new ConatError(`${resp.headers?.error}`, {
240
code: resp.headers?.code,
241
});
242
}
243
// an iterator over any updates that are published.
244
const iter = new EventIterator<ChangefeedEvent>(this, "changefeed", {
245
map: (args) => args[0],
246
});
247
this.changefeeds.push(iter);
248
return iter;
249
};
250
251
set = async ({
252
key,
253
ttl,
254
previousSeq,
255
msgID,
256
messageData,
257
timeout,
258
}: SetOptions & { timeout?: number }): Promise<{
259
seq: number;
260
time: number;
261
}> => {
262
return this.checkForError(
263
await this.socket.request(null, {
264
raw: messageData.raw,
265
encoding: messageData.encoding,
266
headers: {
267
headers: messageData.headers,
268
cmd: "set",
269
key,
270
ttl,
271
previousSeq,
272
msgID,
273
timeout,
274
},
275
timeout,
276
}),
277
);
278
};
279
280
setMany = async (
281
ops: SetOptions[],
282
{ timeout }: { timeout?: number } = {},
283
): Promise<
284
({ seq: number; time: number } | { error: string; code?: any })[]
285
> => {
286
return this.checkForError(
287
await this.socket.request(ops, {
288
headers: {
289
cmd: "setMany",
290
timeout,
291
},
292
timeout,
293
}),
294
);
295
};
296
297
delete = async ({
298
timeout,
299
seq,
300
seqs,
301
last_seq,
302
all,
303
}: {
304
timeout?: number;
305
seq?: number;
306
seqs?: number[];
307
last_seq?: number;
308
all?: boolean;
309
}): Promise<{ seqs: number[] }> => {
310
return this.checkForError(
311
await this.socket.request(null, {
312
headers: {
313
cmd: "delete",
314
seq,
315
seqs,
316
last_seq,
317
all,
318
timeout,
319
},
320
timeout,
321
}),
322
);
323
};
324
325
config = async ({
326
config,
327
timeout,
328
}: {
329
config?: Partial<Configuration>;
330
timeout?: number;
331
} = {}): Promise<Configuration> => {
332
return this.checkForError(
333
await this.socket.request(null, {
334
headers: {
335
cmd: "config",
336
config,
337
timeout,
338
} as any,
339
timeout,
340
}),
341
);
342
};
343
344
inventory = async (timeout?): Promise<PartialInventory> => {
345
return this.checkForError(
346
await this.socket.request(null, {
347
headers: {
348
cmd: "inventory",
349
} as any,
350
timeout,
351
}),
352
);
353
};
354
355
get = async ({
356
seq,
357
key,
358
timeout,
359
}: {
360
timeout?: number;
361
} & (
362
| { seq: number; key?: undefined }
363
| { key: string; seq?: undefined }
364
)): Promise<ConatMessage | undefined> => {
365
const resp = await this.socket.request(null, {
366
headers: { cmd: "get", seq, key, timeout } as any,
367
timeout,
368
});
369
this.checkForError(resp, true);
370
if (resp.headers == null) {
371
return undefined;
372
}
373
return resp;
374
};
375
376
// returns async iterator over arrays of stored messages.
377
// It's must safer to use getAll below, but less memory
378
// efficient.
379
async *getAllIter({
380
start_seq,
381
end_seq,
382
timeout,
383
maxWait,
384
}: GetAllOpts = {}): AsyncGenerator<StoredMessage[], void, unknown> {
385
if (this.isClosed()) {
386
// done
387
return;
388
}
389
const sub = await this.socket.requestMany(null, {
390
headers: {
391
cmd: "getAll",
392
start_seq,
393
end_seq,
394
timeout,
395
} as any,
396
timeout,
397
maxWait,
398
});
399
if (this.isClosed()) {
400
// done with this
401
return;
402
}
403
let seq = 0; // next expected seq number for the sub (not the data)
404
for await (const { data, headers } of sub) {
405
if (headers?.error) {
406
throw new ConatError(`${headers.error}`, { code: headers.code });
407
}
408
if (data == null || this.socket.state == "closed") {
409
// done
410
return;
411
}
412
if (typeof headers?.seq != "number" || headers?.seq != seq) {
413
throw new ConatError(
414
`data dropped, probably due to load -- please try again; expected seq=${seq}, but got ${headers?.seq}`,
415
{
416
code: 503,
417
},
418
);
419
} else {
420
seq = headers?.seq + 1;
421
}
422
yield data;
423
}
424
}
425
426
getAll = async (opts: GetAllOpts = {}): Promise<StoredMessage[]> => {
427
// NOTE: We check messages.headers.seq (which has nothing to do with the stream seq numbers!)
428
// and make sure it counts from 0 up until done, and that nothing was missed.
429
// ONLY once that is done and we have everything do we call processPersistentMessages.
430
// Otherwise, just wait and try again from scratch. There's no socket or
431
// any other guarantees that messages aren't dropped since this is requestMany,
432
// and under load DEFINITELY messages can be dropped.
433
// This throws with code=503 if something goes wrong due to sequence numbers.
434
let messages: StoredMessage[] = [];
435
const sub = await this.getAllIter(opts);
436
if (this.isClosed()) {
437
throw Error("closed");
438
}
439
for await (const value of sub) {
440
messages = messages.concat(value);
441
}
442
if (this.isClosed()) {
443
throw Error("closed");
444
}
445
return messages;
446
};
447
448
keys = async ({ timeout }: { timeout?: number } = {}): Promise<string[]> => {
449
return this.checkForError(
450
await this.socket.request(null, {
451
headers: { cmd: "keys", timeout } as any,
452
timeout,
453
}),
454
);
455
};
456
457
sqlite = async ({
458
timeout,
459
statement,
460
params,
461
}: {
462
timeout?: number;
463
statement: string;
464
params?: any[];
465
}): Promise<any[]> => {
466
return this.checkForError(
467
await this.socket.request(null, {
468
headers: {
469
cmd: "sqlite",
470
statement,
471
params,
472
} as any,
473
timeout,
474
}),
475
);
476
};
477
478
private checkForError = (mesg, noReturn = false) => {
479
if (mesg.headers != null) {
480
const { error, code } = mesg.headers;
481
if (error || code) {
482
throw new ConatError(error ?? "error", { code });
483
}
484
}
485
if (!noReturn) {
486
return mesg.data;
487
}
488
};
489
490
// id of the remote server we're connected to
491
serverId = async () => {
492
return this.checkForError(
493
await this.socket.request(null, {
494
headers: { cmd: "serverId" },
495
}),
496
);
497
};
498
}
499
500
export interface SetOptions {
501
messageData: MessageData;
502
key?: string;
503
ttl?: number;
504
previousSeq?: number;
505
msgID?: string;
506
timeout?: number;
507
}
508
509
interface Options {
510
client: Client;
511
// who is accessing persistent storage
512
user: User;
513
// what storage they are accessing
514
storage: StorageOptions;
515
noCache?: boolean;
516
service?: string;
517
}
518
519
export const stream = refCacheSync<Options, PersistStreamClient>({
520
name: "persistent-stream-client",
521
createKey: ({ user, storage, client, service = SERVICE }: Options) => {
522
return JSON.stringify([user, storage, client.id, service]);
523
},
524
createObject: ({ client, user, storage, service = SERVICE }: Options) => {
525
// avoid wasting server resources, etc., by always checking permissions client side first
526
assertHasWritePermission({ user, storage, service });
527
return new PersistStreamClient(client, storage, user, service);
528
},
529
});
530
531
let permissionChecks = true;
532
export function disablePermissionCheck() {
533
if (!process.env.COCALC_TEST_MODE) {
534
throw Error("disabling permission check only allowed in test mode");
535
}
536
permissionChecks = false;
537
}
538
539
const assertHasWritePermission = ({ user, storage, service }) => {
540
if (!permissionChecks) {
541
// should only be used for unit testing, since otherwise would
542
// make clients slower and possibly increase server load.
543
return;
544
}
545
const subject = persistSubject({ ...user, service });
546
assertHasWritePermission0({ subject, path: storage.path, service });
547
};
548
549