Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/core/cluster.ts
5808 views
1
import { type Client, connect } from "./client";
2
import { Patterns } from "./patterns";
3
import {
4
updateInterest,
5
type InterestUpdate,
6
} from "@cocalc/conat/core/server";
7
import type { DStream } from "@cocalc/conat/sync/dstream";
8
import { once } from "@cocalc/util/async-utils";
9
import { server as createPersistServer } from "@cocalc/conat/persist/server";
10
import { getLogger } from "@cocalc/conat/client";
11
import { hash_string } from "@cocalc/util/misc";
12
const CREATE_LINK_TIMEOUT = 45_000;
13
14
const logger = getLogger("conat:core:cluster");
15
16
export async function clusterLink(
17
address: string,
18
systemAccountPassword: string,
19
timeout = CREATE_LINK_TIMEOUT,
20
) {
21
const client = connect({ address, systemAccountPassword });
22
if (client.info == null) {
23
try {
24
await client.waitUntilSignedIn({
25
timeout: timeout ?? CREATE_LINK_TIMEOUT,
26
});
27
} catch (err) {
28
client.close();
29
throw err;
30
}
31
if (client.info == null) {
32
// this is impossible
33
throw Error("BUG -- failed to sign in");
34
}
35
}
36
const { id, clusterName } = client.info;
37
if (!id) {
38
throw Error("id must be specified");
39
}
40
if (!clusterName) {
41
throw Error("clusterName must be specified");
42
}
43
const link = new ClusterLink(client, id, clusterName, address);
44
await link.init();
45
return link;
46
}
47
48
export type Interest = Patterns<{ [queue: string]: Set<string> }>;
49
50
export { type ClusterLink };
51
52
class ClusterLink {
53
public interest: Interest = new Patterns();
54
private streams: ClusterStreams;
55
private state: "init" | "ready" | "closed" = "init";
56
private clientStateChanged = Date.now(); // when client status last changed
57
58
constructor(
59
public readonly client: Client,
60
public readonly id: string,
61
public readonly clusterName: string,
62
public readonly address: string,
63
) {
64
if (!client) {
65
throw Error("client must be specified");
66
}
67
if (!clusterName) {
68
throw Error("clusterName must be specified");
69
}
70
if (!id) {
71
throw Error("id must be specified");
72
}
73
}
74
75
init = async () => {
76
this.client.on("connected", this.handleClientStateChanged);
77
this.client.on("disconnected", this.handleClientStateChanged);
78
this.streams = await clusterStreams({
79
client: this.client,
80
id: this.id,
81
clusterName: this.clusterName,
82
});
83
for (const update of this.streams.interest.getAll()) {
84
updateInterest(update, this.interest);
85
}
86
// I have a slight concern about this because updates might not
87
// arrive in order during automatic failover. That said, maybe
88
// automatic failover doesn't matter with these streams, since
89
// it shouldn't really happen -- each stream is served from the server
90
// it is about, and when that server goes down none of this state
91
// matters anymore.
92
this.streams.interest.on("change", this.handleInterestUpdate);
93
this.state = "ready";
94
};
95
96
isConnected = () => {
97
return this.client.state == "connected";
98
};
99
100
handleInterestUpdate = (update: InterestUpdate) => {
101
updateInterest(update, this.interest);
102
};
103
104
private handleClientStateChanged = () => {
105
this.clientStateChanged = Date.now();
106
};
107
108
howLongDisconnected = () => {
109
if (this.isConnected()) {
110
return 0;
111
}
112
return Date.now() - this.clientStateChanged;
113
};
114
115
close = () => {
116
if (this.state == "closed") {
117
return;
118
}
119
this.state = "closed";
120
this.client.removeListener("connected", this.handleClientStateChanged);
121
this.client.removeListener("disconnected", this.handleClientStateChanged);
122
if (this.streams != null) {
123
this.streams.interest.removeListener("change", this.handleInterestUpdate);
124
this.streams.interest.close();
125
// @ts-ignore
126
delete this.streams;
127
}
128
this.client.close();
129
// @ts-ignore
130
delete this.client;
131
};
132
133
hasInterest = (subject) => {
134
return this.interest.hasMatch(subject);
135
};
136
137
waitForInterest = async (
138
subject: string,
139
timeout: number,
140
signal?: AbortSignal,
141
) => {
142
const hasMatch = this.interest.hasMatch(subject);
143
144
if (hasMatch || !timeout) {
145
// NOTE: we never return the actual matches, since this is a
146
// potential security vulnerability.
147
// it could make it very easy to figure out private inboxes, etc.
148
return hasMatch;
149
}
150
const start = Date.now();
151
while (this.state != "closed" && !signal?.aborted) {
152
if (Date.now() - start >= timeout) {
153
throw Error("timeout");
154
}
155
await once(this.interest, "change");
156
if ((this.state as any) == "closed" || signal?.aborted) {
157
return false;
158
}
159
const hasMatch = this.interest.hasMatch(subject);
160
if (hasMatch) {
161
return true;
162
}
163
}
164
165
return false;
166
};
167
168
hash = (): { interest: number; } => {
169
return {
170
interest: hashInterest(this.interest),
171
};
172
};
173
}
174
175
function clusterStreamNames({
176
clusterName,
177
id,
178
}: {
179
clusterName: string;
180
id: string;
181
}) {
182
return {
183
interest: `cluster/${clusterName}/${id}/interest`,
184
};
185
}
186
187
export function clusterService({
188
id,
189
clusterName,
190
}: {
191
id: string;
192
clusterName: string;
193
}) {
194
return `persist:${clusterName}:${id}`;
195
}
196
197
export async function createClusterPersistServer({
198
client,
199
id,
200
clusterName,
201
}: {
202
client: Client;
203
id: string;
204
clusterName: string;
205
}) {
206
const service = clusterService({ clusterName, id });
207
logger.debug("createClusterPersistServer: ", { service });
208
return await createPersistServer({ client, service });
209
}
210
211
export interface ClusterStreams {
212
interest: DStream<InterestUpdate>;
213
}
214
215
export async function clusterStreams({
216
client,
217
clusterName,
218
id,
219
}: {
220
client: Client;
221
clusterName: string;
222
id: string;
223
}): Promise<ClusterStreams> {
224
logger.debug("clusterStream: ", { clusterName, id });
225
if (!clusterName) {
226
throw Error("clusterName must be set");
227
}
228
const names = clusterStreamNames({ clusterName, id });
229
const opts = {
230
service: clusterService({ clusterName, id }),
231
noCache: true,
232
ephemeral: true,
233
};
234
const interest = await client.sync.dstream<InterestUpdate>({
235
noInventory: true,
236
name: names.interest,
237
...opts,
238
});
239
logger.debug("clusterStreams: got them", { clusterName });
240
return { interest };
241
}
242
243
// Periodically delete not-necessary updates from the interest stream
244
export async function trimClusterStreams(
245
streams: ClusterStreams,
246
data: {
247
interest: Patterns<{ [queue: string]: Set<string> }>;
248
links: { interest: Patterns<{ [queue: string]: Set<string> }> }[];
249
},
250
// don't delete anything that isn't at lest minAge ms old.
251
minAge: number,
252
): Promise<{ seqsInterest: number[]; }> {
253
const { interest } = streams;
254
// First deal with interst
255
// we iterate over the interest stream checking for subjects
256
// with no current interest at all; in such cases it is safe
257
// to purge them entirely from the stream.
258
const seqs: number[] = [];
259
const now = Date.now();
260
for (let n = 0; n < interest.length; n++) {
261
const time = interest.time(n);
262
if (time == null) continue;
263
if (now - time.valueOf() <= minAge) {
264
break;
265
}
266
const update = interest.get(n) as InterestUpdate;
267
if (!data.interest.hasPattern(update.subject)) {
268
const seq = interest.seq(n);
269
if (seq != null) {
270
seqs.push(seq);
271
}
272
}
273
}
274
if (seqs.length > 0) {
275
// [ ] todo -- add to interest.delete a version where it takes an array of sequence numbers
276
logger.debug("trimClusterStream: trimming interest", { seqs });
277
await interest.delete({ seqs });
278
logger.debug("trimClusterStream: successfully trimmed interest", { seqs });
279
}
280
return { seqsInterest: seqs};
281
}
282
283
function hashSet(X: Set<string>): number {
284
let h = 0;
285
for (const a of X) {
286
h += hash_string(a); // integers, and not too many, so should commute
287
}
288
return h;
289
}
290
291
function hashInterestValue(X: { [queue: string]: Set<string> }): number {
292
let h = 0;
293
for (const queue in X) {
294
h += hashSet(X[queue]); // integers, and not too many, so should commute
295
}
296
return h;
297
}
298
299
export function hashInterest(
300
interest: Patterns<{ [queue: string]: Set<string> }>,
301
): number {
302
return interest.hash(hashInterestValue);
303
}
304
305
306