Path: blob/master/src/packages/conat/socket/server.ts
5614 views
import { ConatSocketBase } from "./base";1import {2PING_PONG_INTERVAL,3type Command,4SOCKET_HEADER_CMD,5clientSubject,6serverStatusSubject,7} from "./util";8import { ServerSocket } from "./server-socket";9import { delay } from "awaiting";10import { type Headers } from "@cocalc/conat/core/client";11import { getLogger } from "@cocalc/conat/client";1213const logger = getLogger("socket:server");1415// DO NOT directly instantiate here -- instead, call the16// socket.listen method on ConatClient.1718export class ConatSocketServer extends ConatSocketBase {19serverSubjectPattern = (): string => {20return `${this.subject}.server.${this.id}.*`;21};2223initTCP() {}2425channel(channel: string) {26return this.client.socket.listen(this.subject + "." + channel, {27desc: `${this.desc ?? ""}.channel('${channel}')`,28}) as ConatSocketServer;29}3031forEach = (f: (socket: ServerSocket, id: string) => void) => {32for (const id in this.sockets) {33f(this.sockets[id], id);34}35};3637private createStatusServer = async () => {38const sub = await this.client.subscribe(serverStatusSubject(this.subject));39if (this.state == "closed") {40sub.close();41return;42}43this.once("closed", () => sub.close());4445(async () => {46for await (const mesg of sub) {47if (this.state == ("closed" as any)) {48sub.close();49return;50}51// TODO: may return load info at some point52mesg.respondSync({ id: this.id });53}54})();55};5657protected async run() {58await this.createStatusServer();59this.deleteDeadSockets();60const sub = await this.client.subscribe(this.serverSubjectPattern());61if (this.state == "closed") {62sub.close();63return;64}65this.sub = sub;66this.setState("ready");67for await (const mesg of this.sub) {68// console.log("got mesg", mesg.data, mesg.headers);69if (this.state == ("closed" as any)) {70return;71}72(async () => {73try {74await this.handleMesg(mesg);75} catch (err) {76logger.debug(77`WARNING -- unexpected issue handling connection -- ${err}`,78);79}80})();81}82}8384private handleMesg = async (mesg) => {85if (this.state == ("closed" as any)) {86return;87}88const cmd = mesg.headers?.[SOCKET_HEADER_CMD];89const id = mesg.subject.split(".").slice(-1)[0];90let socket = this.sockets[id];9192if (socket === undefined) {93if (cmd == "close") {94// already closed95return;96}97// not connected yet -- anything except a connect message is ignored.98if (cmd != "connect") {99logger.debug(100"ignoring data from not-connected socket -- telling it to close",101{ id, cmd },102);103this.client.publishSync(clientSubject(mesg.subject), null, {104headers: { [SOCKET_HEADER_CMD]: "close" },105});106return;107}108// new connection109socket = new ServerSocket({110conatSocket: this,111id,112subject: mesg.subject,113});114this.sockets[id] = socket;115// in a cluster, it's critical that the other side is visible116// before we start sending messages, since otherwise first117// message is likely to be dropped if client is on another node.118try {119await this.client.waitForInterest(socket.clientSubject);120} catch {}121if (this.state == ("closed" as any)) {122return;123}124this.emit("connection", socket);125}126127if (cmd !== undefined) {128// note: test this first since it is also a request129// a special internal control command130this.handleCommandFromClient({ socket, cmd: cmd as Command, mesg });131} else if (mesg.isRequest()) {132// a request to support the socket.on('request', (mesg) => ...) protocol:133socket.emit("request", mesg);134} else {135socket.receiveDataFromClient(mesg);136}137};138139private async deleteDeadSockets() {140while (this.state != "closed") {141for (const id in this.sockets) {142const socket = this.sockets[id];143if (Date.now() - socket.lastPing > PING_PONG_INTERVAL * 2.5) {144socket.destroy();145}146}147await delay(PING_PONG_INTERVAL);148}149}150151request = async (data, options?) => {152await this.waitUntilReady(options?.timeout);153154// we call all connected sockets in parallel,155// then return array of responses.156// Unless race is set, then we return first result157const v: any[] = [];158for (const id in this.sockets) {159const f = async () => {160if (this.state == "closed") {161throw Error("closed");162}163try {164return await this.sockets[id].request(data, options);165} catch (err) {166return err;167}168};169v.push(f());170}171if (options?.race) {172return await Promise.race(v);173} else {174return await Promise.all(v);175}176};177178write = (data, { headers }: { headers?: Headers } = {}): void => {179// @ts-ignore180if (this.state == "closed") {181throw Error("closed");182}183// write to all the sockets that are connected.184for (const id in this.sockets) {185this.sockets[id].write(data, headers);186}187};188189handleCommandFromClient = ({190socket,191cmd,192mesg,193}: {194socket: ServerSocket;195cmd: Command;196mesg;197}) => {198socket.lastPing = Date.now();199if (cmd == "socket") {200socket.tcp?.send.handleRequest(mesg);201} else if (cmd == "ping") {202if (socket.state == "ready") {203// ONLY respond to ping for a server socket if that socket is204// actually ready! ping's are meant to check whether the server205// socket views itself as connected right now. If not, connected,206// ping should timeout207logger.silly("responding to ping from client", socket.id);208mesg.respondSync(null);209}210} else if (cmd == "close") {211const id = socket.id;212socket.close();213delete this.sockets[id];214mesg.respondSync("closed");215} else if (cmd == "connect") {216// very important that connected is successfully delivered, so do not use respondSync.217// Using respond waits for interest.218mesg.respond("connected", { noThrow: true });219} else {220mesg.respondSync({ error: `unknown command - '${cmd}'` });221}222};223224async end({ timeout = 3000 }: { timeout?: number } = {}) {225if (this.state == "closed") {226return;227}228this.reconnection = false;229this.ended = true;230// tell all clients to end231const end = async (id) => {232const socket = this.sockets[id];233delete this.sockets[id];234try {235await socket.end({ timeout });236} catch (err) {237console.log(`WARNING: error ending socket -- ${err}`);238}239};240await Promise.all(Object.keys(this.sockets).map(end));241this.close();242}243}244245246