Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/socket/client.ts
5716 views
1
import {
2
messageData,
3
type Subscription,
4
type Headers,
5
ConatError,
6
} from "@cocalc/conat/core/client";
7
import { ConatSocketBase } from "./base";
8
import { type TCP, createTCP } from "./tcp";
9
import {
10
SOCKET_HEADER_CMD,
11
DEFAULT_COMMAND_TIMEOUT,
12
type ConatSocketOptions,
13
serverStatusSubject,
14
} from "./util";
15
import { EventIterator } from "@cocalc/util/event-iterator";
16
import { keepAlive, KeepAlive } from "./keepalive";
17
import { getLogger } from "@cocalc/conat/client";
18
import { until } from "@cocalc/util/async-utils";
19
20
const logger = getLogger("socket:client");
21
22
// DO NOT directly instantiate here -- instead, call the
23
// socket.connect method on ConatClient.
24
25
export class ConatSocketClient extends ConatSocketBase {
26
queuedWrites: { data: any; headers?: Headers }[] = [];
27
private tcp?: TCP;
28
private alive?: KeepAlive;
29
private serverId?: string;
30
private loadBalancer?: (subject:string) => Promise<string>;
31
32
constructor(opts: ConatSocketOptions) {
33
super(opts);
34
this.loadBalancer = opts.loadBalancer;
35
logger.silly("creating a client socket connecting to ", this.subject);
36
this.initTCP();
37
this.on("ready", () => {
38
for (const mesg of this.queuedWrites) {
39
this.sendDataToServer(mesg);
40
}
41
});
42
if (this.tcp == null) {
43
throw Error("bug");
44
}
45
}
46
47
// subject to send messages/data to the socket server.
48
serverSubject = (): string => {
49
if (!this.serverId) {
50
throw Error("no server selected");
51
}
52
return `${this.subject}.server.${this.serverId}.${this.id}`;
53
};
54
55
channel(channel: string) {
56
return this.client.socket.connect(this.subject + "." + channel, {
57
desc: `${this.desc ?? ""}.channel('${channel}')`,
58
maxQueueSize: this.maxQueueSize,
59
}) as ConatSocketClient;
60
}
61
62
private initKeepAlive = () => {
63
this.alive?.close();
64
this.alive = keepAlive({
65
role: "client",
66
ping: async () =>
67
await this.request(null, {
68
headers: { [SOCKET_HEADER_CMD]: "ping" },
69
timeout: this.keepAliveTimeout,
70
}),
71
disconnect: this.disconnect,
72
keepAlive: this.keepAlive,
73
});
74
};
75
76
initTCP() {
77
if (this.tcp != null) {
78
throw Error("this.tcp already initialized");
79
}
80
// request = send a socket request mesg to the server side of the socket
81
// either ack what's received or asking for a resend of missing data.
82
const request = async (mesg, opts?) =>
83
await this.client.request(this.serverSubject(), mesg, {
84
...opts,
85
headers: { ...opts?.headers, [SOCKET_HEADER_CMD]: "socket" },
86
});
87
88
this.tcp = createTCP({
89
request,
90
role: this.role,
91
reset: this.disconnect,
92
send: this.sendToServer,
93
size: this.maxQueueSize,
94
});
95
96
this.client.on("disconnected", this.tcp.send.resendLastUntilAcked);
97
98
this.tcp.recv.on("message", (mesg) => {
99
this.emit("data", mesg.data, mesg.headers);
100
});
101
this.tcp.send.on("drain", () => {
102
this.emit("drain");
103
});
104
}
105
106
waitUntilDrain = async () => {
107
await this.tcp?.send.waitUntilDrain();
108
};
109
110
private sendCommandToServer = async (
111
cmd: "close" | "ping" | "connect",
112
timeout = DEFAULT_COMMAND_TIMEOUT,
113
) => {
114
const headers = {
115
[SOCKET_HEADER_CMD]: cmd,
116
id: this.id,
117
};
118
const subject = this.serverSubject();
119
logger.silly("sendCommandToServer", { cmd, timeout, subject });
120
const resp = await this.client.request(subject, null, {
121
headers,
122
timeout,
123
waitForInterest: cmd == "connect", // connect is exactly when other side might not be visible yet.
124
});
125
126
const value = resp.data;
127
logger.silly("sendCommandToServer: got resp", { cmd, value, subject });
128
if (value?.error) {
129
throw Error(value?.error);
130
} else {
131
return value;
132
}
133
};
134
135
private getServerId = async () => {
136
let id;
137
if (this.loadBalancer != null) {
138
logger.debug("getting server id from load balancer");
139
id = await this.loadBalancer(this.subject);
140
} else {
141
logger.debug("getting server id from socket server");
142
const resp = await this.client.request(
143
serverStatusSubject(this.subject),
144
null,
145
);
146
({ id } = resp.data);
147
}
148
this.serverId = id;
149
};
150
151
protected async run() {
152
if (this.state == "closed") {
153
return;
154
}
155
// console.log(
156
// "client socket -- subscribing to ",
157
// `${this.subject}.client.${this.id}`,
158
// );
159
try {
160
await this.getServerId();
161
162
logger.silly("run: getting subscription");
163
const sub = await this.client.subscribe(
164
`${this.subject}.client.${this.id}`,
165
);
166
// @ts-ignore
167
if (this.state == "closed") {
168
sub.close();
169
return;
170
}
171
// the disconnect function does this.sub.close()
172
this.sub = sub;
173
let resp: any = undefined;
174
await until(
175
async () => {
176
if (this.state == "closed") {
177
logger.silly("closed -- giving up on connecting");
178
return true;
179
}
180
try {
181
logger.silly("sending connect command to server", this.subject);
182
resp = await this.sendCommandToServer("connect");
183
this.alive?.recv();
184
return true;
185
} catch (err) {
186
logger.silly("failed to connect", this.subject, err);
187
}
188
return false;
189
},
190
{ start: 500, decay: 1.3, max: 10000 },
191
);
192
193
if (resp != "connected") {
194
throw Error("failed to connect");
195
}
196
this.setState("ready");
197
this.initKeepAlive();
198
for await (const mesg of this.sub) {
199
this.alive?.recv();
200
const cmd = mesg.headers?.[SOCKET_HEADER_CMD];
201
if (cmd) {
202
logger.silly("client got cmd", cmd);
203
}
204
if (cmd == "socket") {
205
this.tcp?.send.handleRequest(mesg);
206
} else if (cmd == "close") {
207
this.close();
208
return;
209
} else if (cmd == "ping") {
210
// logger.silly("responding to ping from server", this.id);
211
mesg.respondSync(null);
212
} else if (mesg.isRequest()) {
213
// logger.silly("client got request");
214
this.emit("request", mesg);
215
} else {
216
// logger.silly("client got data"); //, { data: mesg.data });
217
this.tcp?.recv.process(mesg);
218
}
219
}
220
} catch (err) {
221
logger.silly("socket connect failed", err);
222
this.disconnect();
223
}
224
}
225
226
private sendDataToServer = (mesg) => {
227
this.client.publishSync(this.serverSubject(), null, {
228
raw: mesg.raw,
229
headers: mesg.headers,
230
});
231
};
232
233
private sendToServer = (mesg) => {
234
if (this.state != "ready") {
235
this.queuedWrites.push(mesg);
236
while (this.queuedWrites.length > this.maxQueueSize) {
237
this.queuedWrites.shift();
238
}
239
return;
240
}
241
// @ts-ignore
242
if (this.state == "closed") {
243
throw Error("closed");
244
}
245
if (this.role == "server") {
246
throw Error("sendToServer is only for use by the client");
247
} else {
248
// we are the client, so write to server
249
this.sendDataToServer(mesg);
250
}
251
};
252
253
request = async (data, options?) => {
254
await this.waitUntilReady(options?.timeout);
255
if (this.state == "closed") {
256
throw Error("closed");
257
}
258
// console.log("sending request from client ", { subject, data, options });
259
return await this.client.request(this.serverSubject(), data, options);
260
};
261
262
requestMany = async (data, options?): Promise<Subscription> => {
263
await this.waitUntilReady(options?.timeout);
264
return await this.client.requestMany(this.serverSubject(), data, options);
265
};
266
267
async end({ timeout = 3000 }: { timeout?: number } = {}) {
268
if (this.state == "closed") {
269
return;
270
}
271
this.reconnection = false;
272
this.ended = true;
273
// tell server we're done
274
try {
275
await this.sendCommandToServer("close", timeout);
276
} catch {}
277
this.close();
278
}
279
280
close() {
281
if (this.state == "closed") {
282
return;
283
}
284
this.sub?.close();
285
if (this.tcp != null) {
286
this.client.removeListener(
287
"disconnected",
288
this.tcp.send.resendLastUntilAcked,
289
);
290
}
291
this.queuedWrites = [];
292
// tell server we're gone (but don't wait)
293
(async () => {
294
try {
295
await this.sendCommandToServer("close");
296
} catch {}
297
})();
298
if (this.tcp != null) {
299
this.tcp.send.close();
300
this.tcp.recv.close();
301
// @ts-ignore
302
delete this.tcp;
303
}
304
this.alive?.close();
305
delete this.alive;
306
super.close();
307
}
308
309
// writes will raise an exception if: (1) the socket is closed code='EPIPE', or (2)
310
// you hit maxQueueSize un-ACK'd messages, code='ENOBUFS'
311
write = (data, { headers }: { headers?: Headers } = {}): void => {
312
// @ts-ignore
313
if (this.state == "closed") {
314
throw new ConatError("closed", { code: "EPIPE" });
315
}
316
const mesg = messageData(data, { headers });
317
this.tcp?.send.process(mesg);
318
};
319
320
iter = () => {
321
return new EventIterator<[any, Headers]>(this, "data");
322
};
323
}
324
325