Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/persist/client.ts
1710 views
1
import {
2
type Message as ConatMessage,
3
type Client,
4
type MessageData,
5
ConatError,
6
} from "@cocalc/conat/core/client";
7
import { type ConatSocketClient } from "@cocalc/conat/socket";
8
import { EventIterator } from "@cocalc/util/event-iterator";
9
import type {
10
StorageOptions,
11
Configuration,
12
SetOperation,
13
DeleteOperation,
14
StoredMessage,
15
PartialInventory,
16
} from "./storage";
17
export { StoredMessage, StorageOptions };
18
import { persistSubject, SERVICE, type User } from "./util";
19
import { assertHasWritePermission as assertHasWritePermission0 } from "./auth";
20
import { refCacheSync } from "@cocalc/util/refcache";
21
import { EventEmitter } from "events";
22
import { getLogger } from "@cocalc/conat/client";
23
import { until } from "@cocalc/util/async-utils";
24
25
let DEFAULT_RECONNECT_DELAY = 1500;
26
27
export function setDefaultReconnectDelay(delay) {
28
DEFAULT_RECONNECT_DELAY = delay;
29
}
30
31
interface GetAllOpts {
32
start_seq?: number;
33
end_seq?: number;
34
timeout?: number;
35
maxWait?: number;
36
}
37
38
const logger = getLogger("persist:client");
39
40
export type ChangefeedEvent = (SetOperation | DeleteOperation)[];
41
export type Changefeed = EventIterator<ChangefeedEvent>;
42
43
export { type PersistStreamClient };
44
class PersistStreamClient extends EventEmitter {
45
public socket: ConatSocketClient;
46
private changefeeds: any[] = [];
47
private state: "ready" | "closed" = "ready";
48
private lastSeq?: number;
49
private reconnecting = false;
50
private gettingMissed = false;
51
private changesWhenGettingMissed: ChangefeedEvent[] = [];
52
53
constructor(
54
private client: Client,
55
private storage: StorageOptions,
56
private user: User,
57
private service = SERVICE,
58
) {
59
super();
60
this.setMaxListeners(100);
61
// paths.add(this.storage.path);
62
logger.debug("constructor", this.storage);
63
this.init();
64
}
65
66
private init = () => {
67
if (this.client.state == "closed") {
68
this.close();
69
return;
70
}
71
if (this.isClosed()) {
72
return;
73
}
74
this.socket?.close();
75
const subject = persistSubject({ ...this.user, service: this.service });
76
this.socket = this.client.socket.connect(subject, {
77
desc: `persist: ${this.storage.path}`,
78
reconnection: false,
79
});
80
logger.debug("init", this.storage.path, "connecting to ", subject);
81
this.socket.write({
82
storage: this.storage,
83
changefeed: this.changefeeds.length > 0,
84
});
85
86
// get any messages from the stream that we missed while offline;
87
// this only matters if there are changefeeds.
88
if (this.reconnecting) {
89
this.getMissed();
90
}
91
92
this.socket.once("disconnected", () => {
93
this.reconnecting = true;
94
this.socket.removeAllListeners();
95
setTimeout(this.init, DEFAULT_RECONNECT_DELAY);
96
});
97
this.socket.once("closed", () => {
98
this.reconnecting = true;
99
this.socket.removeAllListeners();
100
setTimeout(this.init, DEFAULT_RECONNECT_DELAY);
101
});
102
103
this.socket.on("data", (updates, headers) => {
104
if (updates == null && headers != null) {
105
// has to be an error
106
this.emit(
107
"error",
108
new ConatError(headers?.error, { code: headers?.code }),
109
);
110
this.close();
111
return;
112
}
113
if (this.gettingMissed) {
114
this.changesWhenGettingMissed.push(updates);
115
} else {
116
this.changefeedEmit(updates);
117
}
118
});
119
};
120
121
private getMissed = async () => {
122
if (this.changefeeds.length == 0 || this.state != "ready") {
123
return;
124
}
125
try {
126
this.gettingMissed = true;
127
this.changesWhenGettingMissed.length = 0;
128
129
await until(
130
async () => {
131
if (this.changefeeds.length == 0 || this.state != "ready") {
132
return true;
133
}
134
try {
135
await this.socket.waitUntilReady(15000);
136
if (this.changefeeds.length == 0 || this.state != "ready") {
137
return true;
138
}
139
const resp = await this.socket.request(null, {
140
headers: {
141
cmd: "changefeed",
142
},
143
});
144
if (resp.headers?.error) {
145
throw new ConatError(`${resp.headers?.error}`, {
146
code: resp.headers?.code,
147
});
148
}
149
if (this.changefeeds.length == 0 || this.state != "ready") {
150
return true;
151
}
152
const updates = await this.getAll({
153
start_seq: this.lastSeq,
154
timeout: 15000,
155
});
156
this.changefeedEmit(updates);
157
return true;
158
} catch {
159
return false;
160
}
161
},
162
{ min: 2000, max: 15000 },
163
);
164
} finally {
165
if (this.state != "ready") {
166
return;
167
}
168
this.gettingMissed = false;
169
for (const updates of this.changesWhenGettingMissed) {
170
this.changefeedEmit(updates);
171
}
172
this.changesWhenGettingMissed.length = 0;
173
}
174
};
175
176
private changefeedEmit = (updates: ChangefeedEvent) => {
177
updates = updates.filter((update) => {
178
if (update.op == "delete") {
179
return true;
180
} else {
181
if (update.seq > (this.lastSeq ?? 0)) {
182
this.lastSeq = update.seq;
183
return true;
184
}
185
}
186
return false;
187
});
188
if (updates.length == 0) {
189
return;
190
}
191
this.emit("changefeed", updates);
192
};
193
194
private isClosed = () => this.state == "closed";
195
196
close = () => {
197
logger.debug("close", this.storage);
198
// paths.delete(this.storage.path);
199
this.state = "closed";
200
this.emit("closed");
201
for (const iter of this.changefeeds) {
202
iter.close();
203
this.changefeeds.length = 0;
204
}
205
this.socket.close();
206
};
207
208
// The changefeed is *guaranteed* to deliver every message
209
// in the stream **exactly once and in order**, even if there
210
// are disconnects, failovers, etc. Dealing with dropped messages,
211
// duplicates, etc., is NOT the responsibility of clients.
212
changefeed = async (): Promise<Changefeed> => {
213
// activate changefeed mode (so server publishes updates -- this is idempotent)
214
const resp = await this.socket.request(null, {
215
headers: {
216
cmd: "changefeed",
217
},
218
});
219
if (resp.headers?.error) {
220
throw new ConatError(`${resp.headers?.error}`, {
221
code: resp.headers?.code,
222
});
223
}
224
// an iterator over any updates that are published.
225
const iter = new EventIterator<ChangefeedEvent>(this, "changefeed", {
226
map: (args) => args[0],
227
});
228
this.changefeeds.push(iter);
229
return iter;
230
};
231
232
set = async ({
233
key,
234
ttl,
235
previousSeq,
236
msgID,
237
messageData,
238
timeout,
239
}: SetOptions & { timeout?: number }): Promise<{
240
seq: number;
241
time: number;
242
}> => {
243
return this.checkForError(
244
await this.socket.request(null, {
245
raw: messageData.raw,
246
encoding: messageData.encoding,
247
headers: {
248
headers: messageData.headers,
249
cmd: "set",
250
key,
251
ttl,
252
previousSeq,
253
msgID,
254
timeout,
255
},
256
timeout,
257
}),
258
);
259
};
260
261
setMany = async (
262
ops: SetOptions[],
263
{ timeout }: { timeout?: number } = {},
264
): Promise<
265
({ seq: number; time: number } | { error: string; code?: any })[]
266
> => {
267
return this.checkForError(
268
await this.socket.request(ops, {
269
headers: {
270
cmd: "setMany",
271
timeout,
272
},
273
timeout,
274
}),
275
);
276
};
277
278
delete = async ({
279
timeout,
280
seq,
281
seqs,
282
last_seq,
283
all,
284
}: {
285
timeout?: number;
286
seq?: number;
287
seqs?: number[];
288
last_seq?: number;
289
all?: boolean;
290
}): Promise<{ seqs: number[] }> => {
291
return this.checkForError(
292
await this.socket.request(null, {
293
headers: {
294
cmd: "delete",
295
seq,
296
seqs,
297
last_seq,
298
all,
299
timeout,
300
},
301
timeout,
302
}),
303
);
304
};
305
306
config = async ({
307
config,
308
timeout,
309
}: {
310
config?: Partial<Configuration>;
311
timeout?: number;
312
} = {}): Promise<Configuration> => {
313
return this.checkForError(
314
await this.socket.request(null, {
315
headers: {
316
cmd: "config",
317
config,
318
timeout,
319
} as any,
320
timeout,
321
}),
322
);
323
};
324
325
inventory = async (timeout?): Promise<PartialInventory> => {
326
return this.checkForError(
327
await this.socket.request(null, {
328
headers: {
329
cmd: "inventory",
330
} as any,
331
timeout,
332
}),
333
);
334
};
335
336
get = async ({
337
seq,
338
key,
339
timeout,
340
}: {
341
timeout?: number;
342
} & (
343
| { seq: number; key?: undefined }
344
| { key: string; seq?: undefined }
345
)): Promise<ConatMessage | undefined> => {
346
const resp = await this.socket.request(null, {
347
headers: { cmd: "get", seq, key, timeout } as any,
348
timeout,
349
});
350
this.checkForError(resp, true);
351
if (resp.headers == null) {
352
return undefined;
353
}
354
return resp;
355
};
356
357
// returns async iterator over arrays of stored messages.
358
// It's must safer to use getAll below, but less memory
359
// efficient.
360
async *getAllIter({
361
start_seq,
362
end_seq,
363
timeout,
364
maxWait,
365
}: GetAllOpts = {}): AsyncGenerator<StoredMessage[], void, unknown> {
366
if (this.isClosed()) {
367
// done
368
return;
369
}
370
const sub = await this.socket.requestMany(null, {
371
headers: {
372
cmd: "getAll",
373
start_seq,
374
end_seq,
375
timeout,
376
} as any,
377
timeout,
378
maxWait,
379
});
380
if (this.isClosed()) {
381
// done with this
382
return;
383
}
384
let seq = 0; // next expected seq number for the sub (not the data)
385
for await (const { data, headers } of sub) {
386
if (headers?.error) {
387
throw new ConatError(`${headers.error}`, { code: headers.code });
388
}
389
if (data == null || this.socket.state == "closed") {
390
// done
391
return;
392
}
393
if (typeof headers?.seq != "number" || headers?.seq != seq) {
394
throw new ConatError(
395
`data dropped, probably due to load -- please try again; expected seq=${seq}, but got ${headers?.seq}`,
396
{
397
code: 503,
398
},
399
);
400
} else {
401
seq = headers?.seq + 1;
402
}
403
yield data;
404
}
405
}
406
407
getAll = async (opts: GetAllOpts = {}): Promise<StoredMessage[]> => {
408
// NOTE: We check messages.headers.seq (which has nothing to do with the stream seq numbers!)
409
// and make sure it counts from 0 up until done, and that nothing was missed.
410
// ONLY once that is done and we have everything do we call processPersistentMessages.
411
// Otherwise, just wait and try again from scratch. There's no socket or
412
// any other guarantees that messages aren't dropped since this is requestMany,
413
// and under load DEFINITELY messages can be dropped.
414
// This throws with code=503 if something goes wrong due to sequence numbers.
415
let messages: StoredMessage[] = [];
416
const sub = await this.getAllIter(opts);
417
if (this.isClosed()) {
418
throw Error("closed");
419
}
420
for await (const value of sub) {
421
messages = messages.concat(value);
422
}
423
if (this.isClosed()) {
424
throw Error("closed");
425
}
426
return messages;
427
};
428
429
keys = async ({ timeout }: { timeout?: number } = {}): Promise<string[]> => {
430
return this.checkForError(
431
await this.socket.request(null, {
432
headers: { cmd: "keys", timeout } as any,
433
timeout,
434
}),
435
);
436
};
437
438
sqlite = async ({
439
timeout,
440
statement,
441
params,
442
}: {
443
timeout?: number;
444
statement: string;
445
params?: any[];
446
}): Promise<any[]> => {
447
return this.checkForError(
448
await this.socket.request(null, {
449
headers: {
450
cmd: "sqlite",
451
statement,
452
params,
453
} as any,
454
timeout,
455
}),
456
);
457
};
458
459
private checkForError = (mesg, noReturn = false) => {
460
if (mesg.headers != null) {
461
const { error, code } = mesg.headers;
462
if (error || code) {
463
throw new ConatError(error ?? "error", { code });
464
}
465
}
466
if (!noReturn) {
467
return mesg.data;
468
}
469
};
470
471
// id of the remote server we're connected to
472
serverId = async () => {
473
return this.checkForError(
474
await this.socket.request(null, {
475
headers: { cmd: "serverId" },
476
}),
477
);
478
};
479
}
480
481
export interface SetOptions {
482
messageData: MessageData;
483
key?: string;
484
ttl?: number;
485
previousSeq?: number;
486
msgID?: string;
487
timeout?: number;
488
}
489
490
interface Options {
491
client: Client;
492
// who is accessing persistent storage
493
user: User;
494
// what storage they are accessing
495
storage: StorageOptions;
496
noCache?: boolean;
497
service?: string;
498
}
499
500
export const stream = refCacheSync<Options, PersistStreamClient>({
501
name: "persistent-stream-client",
502
createKey: ({ user, storage, client, service = SERVICE }: Options) => {
503
return JSON.stringify([user, storage, client.id, service]);
504
},
505
createObject: ({ client, user, storage, service = SERVICE }: Options) => {
506
// avoid wasting server resources, etc., by always checking permissions client side first
507
assertHasWritePermission({ user, storage, service });
508
return new PersistStreamClient(client, storage, user, service);
509
},
510
});
511
512
let permissionChecks = true;
513
export function disablePermissionCheck() {
514
if (!process.env.COCALC_TEST_MODE) {
515
throw Error("disabling permission check only allowed in test mode");
516
}
517
permissionChecks = false;
518
}
519
520
const assertHasWritePermission = ({ user, storage, service }) => {
521
if (!permissionChecks) {
522
// should only be used for unit testing, since otherwise would
523
// make clients slower and possibly increase server load.
524
return;
525
}
526
const subject = persistSubject({ ...user, service });
527
assertHasWritePermission0({ subject, path: storage.path, service });
528
};
529
530