Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/socket/server.ts
5614 views
1
import { ConatSocketBase } from "./base";
2
import {
3
PING_PONG_INTERVAL,
4
type Command,
5
SOCKET_HEADER_CMD,
6
clientSubject,
7
serverStatusSubject,
8
} from "./util";
9
import { ServerSocket } from "./server-socket";
10
import { delay } from "awaiting";
11
import { type Headers } from "@cocalc/conat/core/client";
12
import { getLogger } from "@cocalc/conat/client";
13
14
const logger = getLogger("socket:server");
15
16
// DO NOT directly instantiate here -- instead, call the
17
// socket.listen method on ConatClient.
18
19
export class ConatSocketServer extends ConatSocketBase {
20
serverSubjectPattern = (): string => {
21
return `${this.subject}.server.${this.id}.*`;
22
};
23
24
initTCP() {}
25
26
channel(channel: string) {
27
return this.client.socket.listen(this.subject + "." + channel, {
28
desc: `${this.desc ?? ""}.channel('${channel}')`,
29
}) as ConatSocketServer;
30
}
31
32
forEach = (f: (socket: ServerSocket, id: string) => void) => {
33
for (const id in this.sockets) {
34
f(this.sockets[id], id);
35
}
36
};
37
38
private createStatusServer = async () => {
39
const sub = await this.client.subscribe(serverStatusSubject(this.subject));
40
if (this.state == "closed") {
41
sub.close();
42
return;
43
}
44
this.once("closed", () => sub.close());
45
46
(async () => {
47
for await (const mesg of sub) {
48
if (this.state == ("closed" as any)) {
49
sub.close();
50
return;
51
}
52
// TODO: may return load info at some point
53
mesg.respondSync({ id: this.id });
54
}
55
})();
56
};
57
58
protected async run() {
59
await this.createStatusServer();
60
this.deleteDeadSockets();
61
const sub = await this.client.subscribe(this.serverSubjectPattern());
62
if (this.state == "closed") {
63
sub.close();
64
return;
65
}
66
this.sub = sub;
67
this.setState("ready");
68
for await (const mesg of this.sub) {
69
// console.log("got mesg", mesg.data, mesg.headers);
70
if (this.state == ("closed" as any)) {
71
return;
72
}
73
(async () => {
74
try {
75
await this.handleMesg(mesg);
76
} catch (err) {
77
logger.debug(
78
`WARNING -- unexpected issue handling connection -- ${err}`,
79
);
80
}
81
})();
82
}
83
}
84
85
private handleMesg = async (mesg) => {
86
if (this.state == ("closed" as any)) {
87
return;
88
}
89
const cmd = mesg.headers?.[SOCKET_HEADER_CMD];
90
const id = mesg.subject.split(".").slice(-1)[0];
91
let socket = this.sockets[id];
92
93
if (socket === undefined) {
94
if (cmd == "close") {
95
// already closed
96
return;
97
}
98
// not connected yet -- anything except a connect message is ignored.
99
if (cmd != "connect") {
100
logger.debug(
101
"ignoring data from not-connected socket -- telling it to close",
102
{ id, cmd },
103
);
104
this.client.publishSync(clientSubject(mesg.subject), null, {
105
headers: { [SOCKET_HEADER_CMD]: "close" },
106
});
107
return;
108
}
109
// new connection
110
socket = new ServerSocket({
111
conatSocket: this,
112
id,
113
subject: mesg.subject,
114
});
115
this.sockets[id] = socket;
116
// in a cluster, it's critical that the other side is visible
117
// before we start sending messages, since otherwise first
118
// message is likely to be dropped if client is on another node.
119
try {
120
await this.client.waitForInterest(socket.clientSubject);
121
} catch {}
122
if (this.state == ("closed" as any)) {
123
return;
124
}
125
this.emit("connection", socket);
126
}
127
128
if (cmd !== undefined) {
129
// note: test this first since it is also a request
130
// a special internal control command
131
this.handleCommandFromClient({ socket, cmd: cmd as Command, mesg });
132
} else if (mesg.isRequest()) {
133
// a request to support the socket.on('request', (mesg) => ...) protocol:
134
socket.emit("request", mesg);
135
} else {
136
socket.receiveDataFromClient(mesg);
137
}
138
};
139
140
private async deleteDeadSockets() {
141
while (this.state != "closed") {
142
for (const id in this.sockets) {
143
const socket = this.sockets[id];
144
if (Date.now() - socket.lastPing > PING_PONG_INTERVAL * 2.5) {
145
socket.destroy();
146
}
147
}
148
await delay(PING_PONG_INTERVAL);
149
}
150
}
151
152
request = async (data, options?) => {
153
await this.waitUntilReady(options?.timeout);
154
155
// we call all connected sockets in parallel,
156
// then return array of responses.
157
// Unless race is set, then we return first result
158
const v: any[] = [];
159
for (const id in this.sockets) {
160
const f = async () => {
161
if (this.state == "closed") {
162
throw Error("closed");
163
}
164
try {
165
return await this.sockets[id].request(data, options);
166
} catch (err) {
167
return err;
168
}
169
};
170
v.push(f());
171
}
172
if (options?.race) {
173
return await Promise.race(v);
174
} else {
175
return await Promise.all(v);
176
}
177
};
178
179
write = (data, { headers }: { headers?: Headers } = {}): void => {
180
// @ts-ignore
181
if (this.state == "closed") {
182
throw Error("closed");
183
}
184
// write to all the sockets that are connected.
185
for (const id in this.sockets) {
186
this.sockets[id].write(data, headers);
187
}
188
};
189
190
handleCommandFromClient = ({
191
socket,
192
cmd,
193
mesg,
194
}: {
195
socket: ServerSocket;
196
cmd: Command;
197
mesg;
198
}) => {
199
socket.lastPing = Date.now();
200
if (cmd == "socket") {
201
socket.tcp?.send.handleRequest(mesg);
202
} else if (cmd == "ping") {
203
if (socket.state == "ready") {
204
// ONLY respond to ping for a server socket if that socket is
205
// actually ready! ping's are meant to check whether the server
206
// socket views itself as connected right now. If not, connected,
207
// ping should timeout
208
logger.silly("responding to ping from client", socket.id);
209
mesg.respondSync(null);
210
}
211
} else if (cmd == "close") {
212
const id = socket.id;
213
socket.close();
214
delete this.sockets[id];
215
mesg.respondSync("closed");
216
} else if (cmd == "connect") {
217
// very important that connected is successfully delivered, so do not use respondSync.
218
// Using respond waits for interest.
219
mesg.respond("connected", { noThrow: true });
220
} else {
221
mesg.respondSync({ error: `unknown command - '${cmd}'` });
222
}
223
};
224
225
async end({ timeout = 3000 }: { timeout?: number } = {}) {
226
if (this.state == "closed") {
227
return;
228
}
229
this.reconnection = false;
230
this.ended = true;
231
// tell all clients to end
232
const end = async (id) => {
233
const socket = this.sockets[id];
234
delete this.sockets[id];
235
try {
236
await socket.end({ timeout });
237
} catch (err) {
238
console.log(`WARNING: error ending socket -- ${err}`);
239
}
240
};
241
await Promise.all(Object.keys(this.sockets).map(end));
242
this.close();
243
}
244
}
245
246