Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/socket/server.ts
1710 views
1
import { ConatSocketBase } from "./base";
2
import {
3
PING_PONG_INTERVAL,
4
type Command,
5
SOCKET_HEADER_CMD,
6
clientSubject,
7
} from "./util";
8
import { ServerSocket } from "./server-socket";
9
import { delay } from "awaiting";
10
import { type Headers } from "@cocalc/conat/core/client";
11
import { getLogger } from "@cocalc/conat/client";
12
13
const logger = getLogger("socket:server");
14
15
// DO NOT directly instantiate here -- instead, call the
16
// socket.listen method on ConatClient.
17
18
export class ConatSocketServer extends ConatSocketBase {
19
initTCP() {}
20
21
channel(channel: string) {
22
return this.client.socket.listen(this.subject + "." + channel, {
23
desc: `${this.desc ?? ""}.channel('${channel}')`,
24
}) as ConatSocketServer;
25
}
26
27
forEach = (f: (socket: ServerSocket, id: string) => void) => {
28
for (const id in this.sockets) {
29
f(this.sockets[id], id);
30
}
31
};
32
33
protected async run() {
34
this.deleteDeadSockets();
35
const sub = await this.client.subscribe(`${this.subject}.server.*`, {
36
sticky: true,
37
});
38
if (this.state == "closed") {
39
sub.close();
40
return;
41
}
42
this.sub = sub;
43
this.setState("ready");
44
for await (const mesg of this.sub) {
45
// console.log("got mesg", mesg.data, mesg.headers);
46
if (this.state == ("closed" as any)) {
47
return;
48
}
49
(async () => {
50
try {
51
await this.handleMesg(mesg);
52
} catch (err) {
53
logger.debug(
54
`WARNING -- unexpected issue handling connection -- ${err}`,
55
);
56
}
57
})();
58
}
59
}
60
61
private handleMesg = async (mesg) => {
62
if (this.state == ("closed" as any)) {
63
return;
64
}
65
const cmd = mesg.headers?.[SOCKET_HEADER_CMD];
66
const id = mesg.subject.split(".").slice(-1)[0];
67
let socket = this.sockets[id];
68
69
if (socket === undefined) {
70
if (cmd == "close") {
71
// already closed
72
return;
73
}
74
// not connected yet -- anything except a connect message is ignored.
75
if (cmd != "connect") {
76
logger.debug(
77
"ignoring data from not-connected socket -- telling it to close",
78
{ id, cmd },
79
);
80
this.client.publishSync(clientSubject(mesg.subject), null, {
81
headers: { [SOCKET_HEADER_CMD]: "close" },
82
});
83
return;
84
}
85
// new connection
86
socket = new ServerSocket({
87
conatSocket: this,
88
id,
89
subject: mesg.subject,
90
});
91
this.sockets[id] = socket;
92
// in a cluster, it's critical that the other side is visible
93
// before we start sending messages, since otherwise first
94
// message is likely to be dropped if client is on another node.
95
try {
96
await this.client.waitForInterest(socket.clientSubject);
97
} catch {}
98
if (this.state == ("closed" as any)) {
99
return;
100
}
101
this.emit("connection", socket);
102
}
103
104
if (cmd !== undefined) {
105
// note: test this first since it is also a request
106
// a special internal control command
107
this.handleCommandFromClient({ socket, cmd: cmd as Command, mesg });
108
} else if (mesg.isRequest()) {
109
// a request to support the socket.on('request', (mesg) => ...) protocol:
110
socket.emit("request", mesg);
111
} else {
112
socket.receiveDataFromClient(mesg);
113
}
114
};
115
116
private async deleteDeadSockets() {
117
while (this.state != "closed") {
118
for (const id in this.sockets) {
119
const socket = this.sockets[id];
120
if (Date.now() - socket.lastPing > PING_PONG_INTERVAL * 2.5) {
121
socket.destroy();
122
}
123
}
124
await delay(PING_PONG_INTERVAL);
125
}
126
}
127
128
request = async (data, options?) => {
129
await this.waitUntilReady(options?.timeout);
130
131
// we call all connected sockets in parallel,
132
// then return array of responses.
133
// Unless race is set, then we return first result
134
const v: any[] = [];
135
for (const id in this.sockets) {
136
const f = async () => {
137
if (this.state == "closed") {
138
throw Error("closed");
139
}
140
try {
141
return await this.sockets[id].request(data, options);
142
} catch (err) {
143
return err;
144
}
145
};
146
v.push(f());
147
}
148
if (options?.race) {
149
return await Promise.race(v);
150
} else {
151
return await Promise.all(v);
152
}
153
};
154
155
write = (data, { headers }: { headers?: Headers } = {}): void => {
156
// @ts-ignore
157
if (this.state == "closed") {
158
throw Error("closed");
159
}
160
// write to all the sockets that are connected.
161
for (const id in this.sockets) {
162
this.sockets[id].write(data, headers);
163
}
164
};
165
166
handleCommandFromClient = ({
167
socket,
168
cmd,
169
mesg,
170
}: {
171
socket: ServerSocket;
172
cmd: Command;
173
mesg;
174
}) => {
175
socket.lastPing = Date.now();
176
if (cmd == "socket") {
177
socket.tcp?.send.handleRequest(mesg);
178
} else if (cmd == "ping") {
179
if (socket.state == "ready") {
180
// ONLY respond to ping for a server socket if that socket is
181
// actually ready! ping's are meant to check whether the server
182
// socket views itself as connected right now. If not, connected,
183
// ping should timeout
184
logger.silly("responding to ping from client", socket.id);
185
mesg.respondSync(null);
186
}
187
} else if (cmd == "close") {
188
const id = socket.id;
189
socket.close();
190
delete this.sockets[id];
191
mesg.respondSync("closed");
192
} else if (cmd == "connect") {
193
// very important that connected is successfully delivered, so do not use respondSync.
194
// Using respond waits for interest.
195
mesg.respond("connected", { noThrow: true });
196
} else {
197
mesg.respondSync({ error: `unknown command - '${cmd}'` });
198
}
199
};
200
201
async end({ timeout = 3000 }: { timeout?: number } = {}) {
202
if (this.state == "closed") {
203
return;
204
}
205
this.reconnection = false;
206
this.ended = true;
207
// tell all clients to end
208
const end = async (id) => {
209
const socket = this.sockets[id];
210
delete this.sockets[id];
211
try {
212
await socket.end({ timeout });
213
} catch (err) {
214
console.log("WARNING: error ending socket -- ${err}");
215
}
216
};
217
await Promise.all(Object.keys(this.sockets).map(end));
218
this.close();
219
}
220
}
221
222