Path: blob/master/src/packages/conat/socket/server-socket.ts
1710 views
import { EventEmitter } from "events";1import {2type Headers,3DEFAULT_REQUEST_TIMEOUT,4type Message,5messageData,6ConatError,7} from "@cocalc/conat/core/client";8import { reuseInFlight } from "@cocalc/util/reuse-in-flight";9import { once } from "@cocalc/util/async-utils";10import { SOCKET_HEADER_CMD, type State, clientSubject } from "./util";11import { type TCP, createTCP } from "./tcp";12import { type ConatSocketServer } from "./server";13import { keepAlive, KeepAlive } from "./keepalive";14import { getLogger } from "@cocalc/conat/client";1516const logger = getLogger("socket:server-socket");1718// One specific socket from the point of view of a server.19export class ServerSocket extends EventEmitter {20private conatSocket: ConatSocketServer;21public readonly id: string;22public lastPing = Date.now();2324private queuedWrites: { data: any; headers?: Headers }[] = [];25public readonly clientSubject: string;2627public state: State = "ready";28// the non-pattern subject the client connected to29public readonly subject: string;3031// this is just for compat with conatSocket api:32public readonly address = { ip: "" };33// conn is just for compatibility with primus/socketio (?).34public readonly conn: { id: string };3536public tcp?: TCP;37private alive?: KeepAlive;3839constructor({ conatSocket, id, subject }) {40super();41this.subject = subject;42this.conatSocket = conatSocket;43this.clientSubject = clientSubject(subject);44this.id = id;45this.conn = { id };46this.initTCP();47if (this.tcp == null) {48throw Error("bug");49}50this.initKeepAlive();51}5253private initKeepAlive = () => {54this.alive?.close();55this.alive = keepAlive({56role: "server",57ping: async () => {58await this.request(null, {59headers: { [SOCKET_HEADER_CMD]: "ping" },60timeout: this.conatSocket.keepAliveTimeout,61// waitForInterest is very important in a cluster -- also, obviously62// if somebody just opened a socket, they probably exist.63waitForInterest: true,64});65},66disconnect: this.close,67keepAlive: this.conatSocket.keepAlive,68});69};7071initTCP() {72if (this.tcp != null) {73throw Error("this.tcp already initialized");74}75const request = async (mesg, opts?) =>76await this.conatSocket.client.request(this.clientSubject, mesg, {77...opts,78headers: { ...opts?.headers, [SOCKET_HEADER_CMD]: "socket" },79});80this.tcp = createTCP({81request,82role: this.conatSocket.role,83reset: this.close,84send: this.send,85size: this.conatSocket.maxQueueSize,86});87this.conatSocket.client.on(88"disconnected",89this.tcp.send.resendLastUntilAcked,90);9192this.tcp.recv.on("message", (mesg) => {93// console.log("tcp recv emitted message", mesg.data);94this.emit("data", mesg.data, mesg.headers);95});96this.tcp.send.on("drain", () => {97this.emit("drain");98});99}100101disconnect = () => {102this.setState("disconnected");103if (this.conatSocket.state == "ready") {104this.setState("ready");105} else {106this.conatSocket.once("ready", this.onServerSocketReady);107}108};109110private onServerSocketReady = () => {111if (this.state != "closed") {112this.setState("ready");113}114};115116private setState = (state: State) => {117this.state = state;118if (state == "ready") {119for (const mesg of this.queuedWrites) {120this.sendDataToClient(mesg);121}122this.queuedWrites = [];123}124this.emit(state);125};126127end = async ({ timeout = 3000 }: { timeout?: number } = {}) => {128if (this.state == "closed") {129return;130}131try {132await this.conatSocket.client.publish(this.clientSubject, null, {133headers: { [SOCKET_HEADER_CMD]: "close" },134timeout,135});136} catch (err) {137console.log(`WARNING: error closing socket - ${err}`);138}139this.close();140};141142destroy = () => this.close();143144close = () => {145if (this.state == "closed") {146return;147}148this.conatSocket.removeListener("ready", this.onServerSocketReady);149this.conatSocket.client.publishSync(this.clientSubject, null, {150headers: { [SOCKET_HEADER_CMD]: "close" },151});152153if (this.tcp != null) {154this.conatSocket.client.removeListener(155"disconnected",156this.tcp.send.resendLastUntilAcked,157);158this.tcp.send.close();159this.tcp.recv.close();160// @ts-ignore161delete this.tcp;162}163164this.alive?.close();165delete this.alive;166167this.queuedWrites = [];168this.setState("closed");169this.removeAllListeners();170delete this.conatSocket.sockets[this.id];171// @ts-ignore172delete this.conatSocket;173};174175receiveDataFromClient = (mesg) => {176this.alive?.recv();177this.tcp?.recv.process(mesg);178};179180private sendDataToClient = (mesg) => {181this.conatSocket.client.publishSync(this.clientSubject, null, {182raw: mesg.raw,183headers: mesg.headers,184});185};186187private send = (mesg: Message) => {188if (this.state != "ready") {189this.queuedWrites.push(mesg);190while (this.queuedWrites.length > this.conatSocket.maxQueueSize) {191this.queuedWrites.shift();192}193return;194}195// @ts-ignore196if (this.state == "closed") {197return;198}199this.sendDataToClient(mesg);200return true;201};202203// writes will raise an exception if: (1) the socket is closed, or (2)204// you hit maxQueueSize un-ACK'd messages.205write = (data, { headers }: { headers?: Headers } = {}) => {206if (this.state == "closed") {207throw new ConatError("closed", { code: "EPIPE" });208}209const mesg = messageData(data, { headers });210this.tcp?.send.process(mesg);211};212213// use request reply where the client responds214request = async (data, options?) => {215await this.waitUntilReady(options?.timeout);216logger.silly("server sending request to ", this.clientSubject);217return await this.conatSocket.client.request(218this.clientSubject,219data,220options,221);222};223224private waitUntilReady = reuseInFlight(async (timeout?: number) => {225if (this.state == "ready") {226return;227}228await once(this, "ready", timeout ?? DEFAULT_REQUEST_TIMEOUT);229if (this.state == "closed") {230throw Error("closed");231}232});233234waitUntilDrain = async () => {235await this.tcp?.send.waitUntilDrain();236};237}238239240