Path: blob/master/src/packages/conat/socket/server.ts
1710 views
import { ConatSocketBase } from "./base";1import {2PING_PONG_INTERVAL,3type Command,4SOCKET_HEADER_CMD,5clientSubject,6} from "./util";7import { ServerSocket } from "./server-socket";8import { delay } from "awaiting";9import { type Headers } from "@cocalc/conat/core/client";10import { getLogger } from "@cocalc/conat/client";1112const logger = getLogger("socket:server");1314// DO NOT directly instantiate here -- instead, call the15// socket.listen method on ConatClient.1617export class ConatSocketServer extends ConatSocketBase {18initTCP() {}1920channel(channel: string) {21return this.client.socket.listen(this.subject + "." + channel, {22desc: `${this.desc ?? ""}.channel('${channel}')`,23}) as ConatSocketServer;24}2526forEach = (f: (socket: ServerSocket, id: string) => void) => {27for (const id in this.sockets) {28f(this.sockets[id], id);29}30};3132protected async run() {33this.deleteDeadSockets();34const sub = await this.client.subscribe(`${this.subject}.server.*`, {35sticky: true,36});37if (this.state == "closed") {38sub.close();39return;40}41this.sub = sub;42this.setState("ready");43for await (const mesg of this.sub) {44// console.log("got mesg", mesg.data, mesg.headers);45if (this.state == ("closed" as any)) {46return;47}48(async () => {49try {50await this.handleMesg(mesg);51} catch (err) {52logger.debug(53`WARNING -- unexpected issue handling connection -- ${err}`,54);55}56})();57}58}5960private handleMesg = async (mesg) => {61if (this.state == ("closed" as any)) {62return;63}64const cmd = mesg.headers?.[SOCKET_HEADER_CMD];65const id = mesg.subject.split(".").slice(-1)[0];66let socket = this.sockets[id];6768if (socket === undefined) {69if (cmd == "close") {70// already closed71return;72}73// not connected yet -- anything except a connect message is ignored.74if (cmd != "connect") {75logger.debug(76"ignoring data from not-connected socket -- telling it to close",77{ id, cmd },78);79this.client.publishSync(clientSubject(mesg.subject), null, {80headers: { [SOCKET_HEADER_CMD]: "close" },81});82return;83}84// new connection85socket = new ServerSocket({86conatSocket: this,87id,88subject: mesg.subject,89});90this.sockets[id] = socket;91// in a cluster, it's critical that the other side is visible92// before we start sending messages, since otherwise first93// message is likely to be dropped if client is on another node.94try {95await this.client.waitForInterest(socket.clientSubject);96} catch {}97if (this.state == ("closed" as any)) {98return;99}100this.emit("connection", socket);101}102103if (cmd !== undefined) {104// note: test this first since it is also a request105// a special internal control command106this.handleCommandFromClient({ socket, cmd: cmd as Command, mesg });107} else if (mesg.isRequest()) {108// a request to support the socket.on('request', (mesg) => ...) protocol:109socket.emit("request", mesg);110} else {111socket.receiveDataFromClient(mesg);112}113};114115private async deleteDeadSockets() {116while (this.state != "closed") {117for (const id in this.sockets) {118const socket = this.sockets[id];119if (Date.now() - socket.lastPing > PING_PONG_INTERVAL * 2.5) {120socket.destroy();121}122}123await delay(PING_PONG_INTERVAL);124}125}126127request = async (data, options?) => {128await this.waitUntilReady(options?.timeout);129130// we call all connected sockets in parallel,131// then return array of responses.132// Unless race is set, then we return first result133const v: any[] = [];134for (const id in this.sockets) {135const f = async () => {136if (this.state == "closed") {137throw Error("closed");138}139try {140return await this.sockets[id].request(data, options);141} catch (err) {142return err;143}144};145v.push(f());146}147if (options?.race) {148return await Promise.race(v);149} else {150return await Promise.all(v);151}152};153154write = (data, { headers }: { headers?: Headers } = {}): void => {155// @ts-ignore156if (this.state == "closed") {157throw Error("closed");158}159// write to all the sockets that are connected.160for (const id in this.sockets) {161this.sockets[id].write(data, headers);162}163};164165handleCommandFromClient = ({166socket,167cmd,168mesg,169}: {170socket: ServerSocket;171cmd: Command;172mesg;173}) => {174socket.lastPing = Date.now();175if (cmd == "socket") {176socket.tcp?.send.handleRequest(mesg);177} else if (cmd == "ping") {178if (socket.state == "ready") {179// ONLY respond to ping for a server socket if that socket is180// actually ready! ping's are meant to check whether the server181// socket views itself as connected right now. If not, connected,182// ping should timeout183logger.silly("responding to ping from client", socket.id);184mesg.respondSync(null);185}186} else if (cmd == "close") {187const id = socket.id;188socket.close();189delete this.sockets[id];190mesg.respondSync("closed");191} else if (cmd == "connect") {192// very important that connected is successfully delivered, so do not use respondSync.193// Using respond waits for interest.194mesg.respond("connected", { noThrow: true });195} else {196mesg.respondSync({ error: `unknown command - '${cmd}'` });197}198};199200async end({ timeout = 3000 }: { timeout?: number } = {}) {201if (this.state == "closed") {202return;203}204this.reconnection = false;205this.ended = true;206// tell all clients to end207const end = async (id) => {208const socket = this.sockets[id];209delete this.sockets[id];210try {211await socket.end({ timeout });212} catch (err) {213console.log("WARNING: error ending socket -- ${err}");214}215};216await Promise.all(Object.keys(this.sockets).map(end));217this.close();218}219}220221222