Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/socket/client.ts
1710 views
1
import {
2
messageData,
3
type Subscription,
4
type Headers,
5
ConatError,
6
} from "@cocalc/conat/core/client";
7
import { ConatSocketBase } from "./base";
8
import { type TCP, createTCP } from "./tcp";
9
import {
10
SOCKET_HEADER_CMD,
11
DEFAULT_COMMAND_TIMEOUT,
12
type ConatSocketOptions,
13
} from "./util";
14
import { EventIterator } from "@cocalc/util/event-iterator";
15
import { keepAlive, KeepAlive } from "./keepalive";
16
import { getLogger } from "@cocalc/conat/client";
17
import { until } from "@cocalc/util/async-utils";
18
19
const logger = getLogger("socket:client");
20
21
// DO NOT directly instantiate here -- instead, call the
22
// socket.connect method on ConatClient.
23
24
export class ConatSocketClient extends ConatSocketBase {
25
queuedWrites: { data: any; headers?: Headers }[] = [];
26
private tcp?: TCP;
27
private alive?: KeepAlive;
28
29
constructor(opts: ConatSocketOptions) {
30
super(opts);
31
logger.silly("creating a client socket connecting to ", this.subject);
32
this.initTCP();
33
this.on("ready", () => {
34
for (const mesg of this.queuedWrites) {
35
this.sendDataToServer(mesg);
36
}
37
});
38
if (this.tcp == null) {
39
throw Error("bug");
40
}
41
}
42
43
channel(channel: string) {
44
return this.client.socket.connect(this.subject + "." + channel, {
45
desc: `${this.desc ?? ""}.channel('${channel}')`,
46
maxQueueSize: this.maxQueueSize,
47
}) as ConatSocketClient;
48
}
49
50
private initKeepAlive = () => {
51
this.alive?.close();
52
this.alive = keepAlive({
53
role: "client",
54
ping: async () =>
55
await this.request(null, {
56
headers: { [SOCKET_HEADER_CMD]: "ping" },
57
timeout: this.keepAliveTimeout,
58
}),
59
disconnect: this.disconnect,
60
keepAlive: this.keepAlive,
61
});
62
};
63
64
initTCP() {
65
if (this.tcp != null) {
66
throw Error("this.tcp already initialized");
67
}
68
// request = send a socket request mesg to the server side of the socket
69
// either ack what's received or asking for a resend of missing data.
70
const request = async (mesg, opts?) =>
71
await this.client.request(`${this.subject}.server.${this.id}`, mesg, {
72
...opts,
73
headers: { ...opts?.headers, [SOCKET_HEADER_CMD]: "socket" },
74
});
75
76
this.tcp = createTCP({
77
request,
78
role: this.role,
79
reset: this.disconnect,
80
send: this.sendToServer,
81
size: this.maxQueueSize,
82
});
83
84
this.client.on("disconnected", this.tcp.send.resendLastUntilAcked);
85
86
this.tcp.recv.on("message", (mesg) => {
87
this.emit("data", mesg.data, mesg.headers);
88
});
89
this.tcp.send.on("drain", () => {
90
this.emit("drain");
91
});
92
}
93
94
waitUntilDrain = async () => {
95
await this.tcp?.send.waitUntilDrain();
96
};
97
98
private sendCommandToServer = async (
99
cmd: "close" | "ping" | "connect",
100
timeout = DEFAULT_COMMAND_TIMEOUT,
101
) => {
102
const headers = {
103
[SOCKET_HEADER_CMD]: cmd,
104
id: this.id,
105
};
106
const subject = `${this.subject}.server.${this.id}`;
107
logger.silly("sendCommandToServer", { cmd, timeout, subject });
108
const resp = await this.client.request(subject, null, {
109
headers,
110
timeout,
111
waitForInterest: cmd == "connect", // connect is exactly when other side might not be visible yet.
112
});
113
const value = resp.data;
114
logger.silly("sendCommandToServer: got resp", { cmd, value, subject });
115
if (value?.error) {
116
throw Error(value?.error);
117
} else {
118
return value;
119
}
120
};
121
122
protected async run() {
123
if (this.state == "closed") {
124
return;
125
}
126
// console.log(
127
// "client socket -- subscribing to ",
128
// `${this.subject}.client.${this.id}`,
129
// );
130
try {
131
logger.silly("run: getting subscription");
132
const sub = await this.client.subscribe(
133
`${this.subject}.client.${this.id}`,
134
);
135
// @ts-ignore
136
if (this.state == "closed") {
137
sub.close();
138
return;
139
}
140
this.sub = sub;
141
let resp: any = undefined;
142
await until(
143
async () => {
144
if (this.state == "closed") {
145
logger.silly("closed -- giving up on connecting");
146
return true;
147
}
148
try {
149
logger.silly("sending connect command to server", this.subject);
150
resp = await this.sendCommandToServer("connect");
151
this.alive?.recv();
152
return true;
153
} catch (err) {
154
logger.silly("failed to connect", this.subject, err);
155
}
156
return false;
157
},
158
{ start: 500, decay: 1.3, max: 10000 },
159
);
160
161
if (resp != "connected") {
162
throw Error("failed to connect");
163
}
164
this.setState("ready");
165
this.initKeepAlive();
166
for await (const mesg of this.sub) {
167
this.alive?.recv();
168
const cmd = mesg.headers?.[SOCKET_HEADER_CMD];
169
if (cmd) {
170
logger.silly("client got cmd", cmd);
171
}
172
if (cmd == "socket") {
173
this.tcp?.send.handleRequest(mesg);
174
} else if (cmd == "close") {
175
this.close();
176
return;
177
} else if (cmd == "ping") {
178
// logger.silly("responding to ping from server", this.id);
179
mesg.respondSync(null);
180
} else if (mesg.isRequest()) {
181
// logger.silly("client got request");
182
this.emit("request", mesg);
183
} else {
184
// logger.silly("client got data"); //, { data: mesg.data });
185
this.tcp?.recv.process(mesg);
186
}
187
}
188
} catch (err) {
189
logger.silly("socket connect failed", err);
190
this.disconnect();
191
}
192
}
193
194
private sendDataToServer = (mesg) => {
195
const subject = `${this.subject}.server.${this.id}`;
196
this.client.publishSync(subject, null, {
197
raw: mesg.raw,
198
headers: mesg.headers,
199
});
200
};
201
202
private sendToServer = (mesg) => {
203
if (this.state != "ready") {
204
this.queuedWrites.push(mesg);
205
while (this.queuedWrites.length > this.maxQueueSize) {
206
this.queuedWrites.shift();
207
}
208
return;
209
}
210
// @ts-ignore
211
if (this.state == "closed") {
212
throw Error("closed");
213
}
214
if (this.role == "server") {
215
throw Error("sendToServer is only for use by the client");
216
} else {
217
// we are the client, so write to server
218
this.sendDataToServer(mesg);
219
}
220
};
221
222
request = async (data, options?) => {
223
await this.waitUntilReady(options?.timeout);
224
const subject = `${this.subject}.server.${this.id}`;
225
if (this.state == "closed") {
226
throw Error("closed");
227
}
228
// console.log("sending request from client ", { subject, data, options });
229
return await this.client.request(subject, data, options);
230
};
231
232
requestMany = async (data, options?): Promise<Subscription> => {
233
await this.waitUntilReady(options?.timeout);
234
const subject = `${this.subject}.server.${this.id}`;
235
return await this.client.requestMany(subject, data, options);
236
};
237
238
async end({ timeout = 3000 }: { timeout?: number } = {}) {
239
if (this.state == "closed") {
240
return;
241
}
242
this.reconnection = false;
243
this.ended = true;
244
// tell server we're done
245
try {
246
await this.sendCommandToServer("close", timeout);
247
} catch {}
248
this.close();
249
}
250
251
close() {
252
if (this.state == "closed") {
253
return;
254
}
255
this.sub?.close();
256
if (this.tcp != null) {
257
this.client.removeListener(
258
"disconnected",
259
this.tcp.send.resendLastUntilAcked,
260
);
261
}
262
this.queuedWrites = [];
263
// tell server we're gone (but don't wait)
264
(async () => {
265
try {
266
await this.sendCommandToServer("close");
267
} catch {}
268
})();
269
if (this.tcp != null) {
270
this.tcp.send.close();
271
this.tcp.recv.close();
272
// @ts-ignore
273
delete this.tcp;
274
}
275
this.alive?.close();
276
delete this.alive;
277
super.close();
278
}
279
280
// writes will raise an exception if: (1) the socket is closed code='EPIPE', or (2)
281
// you hit maxQueueSize un-ACK'd messages, code='ENOBUFS'
282
write = (data, { headers }: { headers?: Headers } = {}): void => {
283
// @ts-ignore
284
if (this.state == "closed") {
285
throw new ConatError("closed", { code: "EPIPE" });
286
}
287
const mesg = messageData(data, { headers });
288
this.tcp?.send.process(mesg);
289
};
290
291
iter = () => {
292
return new EventIterator<[any, Headers]>(this, "data");
293
};
294
}
295
296