Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/socket/server-socket.ts
1710 views
1
import { EventEmitter } from "events";
2
import {
3
type Headers,
4
DEFAULT_REQUEST_TIMEOUT,
5
type Message,
6
messageData,
7
ConatError,
8
} from "@cocalc/conat/core/client";
9
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
10
import { once } from "@cocalc/util/async-utils";
11
import { SOCKET_HEADER_CMD, type State, clientSubject } from "./util";
12
import { type TCP, createTCP } from "./tcp";
13
import { type ConatSocketServer } from "./server";
14
import { keepAlive, KeepAlive } from "./keepalive";
15
import { getLogger } from "@cocalc/conat/client";
16
17
const logger = getLogger("socket:server-socket");
18
19
// One specific socket from the point of view of a server.
20
export class ServerSocket extends EventEmitter {
21
private conatSocket: ConatSocketServer;
22
public readonly id: string;
23
public lastPing = Date.now();
24
25
private queuedWrites: { data: any; headers?: Headers }[] = [];
26
public readonly clientSubject: string;
27
28
public state: State = "ready";
29
// the non-pattern subject the client connected to
30
public readonly subject: string;
31
32
// this is just for compat with conatSocket api:
33
public readonly address = { ip: "" };
34
// conn is just for compatibility with primus/socketio (?).
35
public readonly conn: { id: string };
36
37
public tcp?: TCP;
38
private alive?: KeepAlive;
39
40
constructor({ conatSocket, id, subject }) {
41
super();
42
this.subject = subject;
43
this.conatSocket = conatSocket;
44
this.clientSubject = clientSubject(subject);
45
this.id = id;
46
this.conn = { id };
47
this.initTCP();
48
if (this.tcp == null) {
49
throw Error("bug");
50
}
51
this.initKeepAlive();
52
}
53
54
private initKeepAlive = () => {
55
this.alive?.close();
56
this.alive = keepAlive({
57
role: "server",
58
ping: async () => {
59
await this.request(null, {
60
headers: { [SOCKET_HEADER_CMD]: "ping" },
61
timeout: this.conatSocket.keepAliveTimeout,
62
// waitForInterest is very important in a cluster -- also, obviously
63
// if somebody just opened a socket, they probably exist.
64
waitForInterest: true,
65
});
66
},
67
disconnect: this.close,
68
keepAlive: this.conatSocket.keepAlive,
69
});
70
};
71
72
initTCP() {
73
if (this.tcp != null) {
74
throw Error("this.tcp already initialized");
75
}
76
const request = async (mesg, opts?) =>
77
await this.conatSocket.client.request(this.clientSubject, mesg, {
78
...opts,
79
headers: { ...opts?.headers, [SOCKET_HEADER_CMD]: "socket" },
80
});
81
this.tcp = createTCP({
82
request,
83
role: this.conatSocket.role,
84
reset: this.close,
85
send: this.send,
86
size: this.conatSocket.maxQueueSize,
87
});
88
this.conatSocket.client.on(
89
"disconnected",
90
this.tcp.send.resendLastUntilAcked,
91
);
92
93
this.tcp.recv.on("message", (mesg) => {
94
// console.log("tcp recv emitted message", mesg.data);
95
this.emit("data", mesg.data, mesg.headers);
96
});
97
this.tcp.send.on("drain", () => {
98
this.emit("drain");
99
});
100
}
101
102
disconnect = () => {
103
this.setState("disconnected");
104
if (this.conatSocket.state == "ready") {
105
this.setState("ready");
106
} else {
107
this.conatSocket.once("ready", this.onServerSocketReady);
108
}
109
};
110
111
private onServerSocketReady = () => {
112
if (this.state != "closed") {
113
this.setState("ready");
114
}
115
};
116
117
private setState = (state: State) => {
118
this.state = state;
119
if (state == "ready") {
120
for (const mesg of this.queuedWrites) {
121
this.sendDataToClient(mesg);
122
}
123
this.queuedWrites = [];
124
}
125
this.emit(state);
126
};
127
128
end = async ({ timeout = 3000 }: { timeout?: number } = {}) => {
129
if (this.state == "closed") {
130
return;
131
}
132
try {
133
await this.conatSocket.client.publish(this.clientSubject, null, {
134
headers: { [SOCKET_HEADER_CMD]: "close" },
135
timeout,
136
});
137
} catch (err) {
138
console.log(`WARNING: error closing socket - ${err}`);
139
}
140
this.close();
141
};
142
143
destroy = () => this.close();
144
145
close = () => {
146
if (this.state == "closed") {
147
return;
148
}
149
this.conatSocket.removeListener("ready", this.onServerSocketReady);
150
this.conatSocket.client.publishSync(this.clientSubject, null, {
151
headers: { [SOCKET_HEADER_CMD]: "close" },
152
});
153
154
if (this.tcp != null) {
155
this.conatSocket.client.removeListener(
156
"disconnected",
157
this.tcp.send.resendLastUntilAcked,
158
);
159
this.tcp.send.close();
160
this.tcp.recv.close();
161
// @ts-ignore
162
delete this.tcp;
163
}
164
165
this.alive?.close();
166
delete this.alive;
167
168
this.queuedWrites = [];
169
this.setState("closed");
170
this.removeAllListeners();
171
delete this.conatSocket.sockets[this.id];
172
// @ts-ignore
173
delete this.conatSocket;
174
};
175
176
receiveDataFromClient = (mesg) => {
177
this.alive?.recv();
178
this.tcp?.recv.process(mesg);
179
};
180
181
private sendDataToClient = (mesg) => {
182
this.conatSocket.client.publishSync(this.clientSubject, null, {
183
raw: mesg.raw,
184
headers: mesg.headers,
185
});
186
};
187
188
private send = (mesg: Message) => {
189
if (this.state != "ready") {
190
this.queuedWrites.push(mesg);
191
while (this.queuedWrites.length > this.conatSocket.maxQueueSize) {
192
this.queuedWrites.shift();
193
}
194
return;
195
}
196
// @ts-ignore
197
if (this.state == "closed") {
198
return;
199
}
200
this.sendDataToClient(mesg);
201
return true;
202
};
203
204
// writes will raise an exception if: (1) the socket is closed, or (2)
205
// you hit maxQueueSize un-ACK'd messages.
206
write = (data, { headers }: { headers?: Headers } = {}) => {
207
if (this.state == "closed") {
208
throw new ConatError("closed", { code: "EPIPE" });
209
}
210
const mesg = messageData(data, { headers });
211
this.tcp?.send.process(mesg);
212
};
213
214
// use request reply where the client responds
215
request = async (data, options?) => {
216
await this.waitUntilReady(options?.timeout);
217
logger.silly("server sending request to ", this.clientSubject);
218
return await this.conatSocket.client.request(
219
this.clientSubject,
220
data,
221
options,
222
);
223
};
224
225
private waitUntilReady = reuseInFlight(async (timeout?: number) => {
226
if (this.state == "ready") {
227
return;
228
}
229
await once(this, "ready", timeout ?? DEFAULT_REQUEST_TIMEOUT);
230
if (this.state == "closed") {
231
throw Error("closed");
232
}
233
});
234
235
waitUntilDrain = async () => {
236
await this.tcp?.send.waitUntilDrain();
237
};
238
}
239
240