Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/persist/load-balancer.ts
2210 views
1
/*
2
The persist load balancer listens for requests on the subject
3
4
{SERVICE}.{scope}.id
5
6
- SERVICE (e.g., 'persist')
7
- scope it typically hub or account-...or project-....
8
- id is literally that.
9
10
It then responds with a persist server id from the ones that got started.
11
The persist server id is just a function of the scope, i.e,. we
12
just shard accounts and projects across the persist server, and that's
13
it. The most important thing is that this assignment never changes
14
(unless you restart servers), because if two clients both fetch the
15
id for the same scope, they must get something on the same persist
16
server, since otherwise things could be out of sync.
17
18
If somehow two clients got different id's, that' wouldn't corrupt data on disk.
19
We always run different persist servers on the same machine with the same disk,
20
so since data is written with sqlite lite, nothing will literally be out sync,
21
since sqlite has locks and allows multiple processes to write to the same file.
22
However, the *change* events will not properly get sent out, and that will break
23
collaborative editing badly.
24
*/
25
26
import { type Client } from "@cocalc/conat/core/client";
27
import { getLogger } from "@cocalc/conat/client";
28
import { SERVICE } from "./util";
29
import { hash_string } from "@cocalc/util/misc";
30
import { delay } from "awaiting";
31
32
const logger = getLogger("persist:load-balancer");
33
34
export function initLoadBalancer({
35
client,
36
ids,
37
service = SERVICE,
38
}: {
39
client: Client;
40
ids: string[];
41
service?: string;
42
}) {
43
if (ids.length == 0) {
44
throw Error("there must be at least 1 id");
45
}
46
47
const subject = `${service}.*.id`;
48
49
// I don't think subscription ever randomly throw errors, but this
50
// is so important I'm making it extra paranoid:
51
(async () => {
52
while (true) {
53
let sub: any = undefined;
54
try {
55
logger.debug("creating persist load balancer: ", { subject, ids });
56
sub = await client.subscribe(subject);
57
for await (const mesg of sub) {
58
mesg.respondSync(getId(ids, mesg.subject));
59
}
60
} catch (err) {
61
sub?.close();
62
logger.debug("ERROR (restarting) -- ", err);
63
}
64
await delay(3000);
65
}
66
})();
67
}
68
69
// we use a hash so that this takes NO memory, but the assignment
70
// lasts forever, which means our sharding by server doesn't
71
// take into account load at all. This keeps things much, much
72
// simpler, and should be fine in practice.
73
export function getId(ids: string[], subject: string) {
74
const h = Math.abs(hash_string(subject.split(".")[1]));
75
const id = ids[h % ids.length];
76
//logger.debug("getId", { ids, subject, id });
77
return id;
78
}
79
80
export async function getPersistServerId({
81
client,
82
subject,
83
}: {
84
client: Client;
85
subject: string;
86
}) {
87
// take only first two segments of subject, since it could have a bunch more
88
// that we better ignore (e.g., from the client)
89
const s = subject.split(".").slice(0, 2).join(".") + ".id";
90
const resp = await client.request(s, null);
91
return resp.data;
92
}
93
94