Path: blob/master/src/packages/conat/socket/client.ts
5716 views
import {1messageData,2type Subscription,3type Headers,4ConatError,5} from "@cocalc/conat/core/client";6import { ConatSocketBase } from "./base";7import { type TCP, createTCP } from "./tcp";8import {9SOCKET_HEADER_CMD,10DEFAULT_COMMAND_TIMEOUT,11type ConatSocketOptions,12serverStatusSubject,13} from "./util";14import { EventIterator } from "@cocalc/util/event-iterator";15import { keepAlive, KeepAlive } from "./keepalive";16import { getLogger } from "@cocalc/conat/client";17import { until } from "@cocalc/util/async-utils";1819const logger = getLogger("socket:client");2021// DO NOT directly instantiate here -- instead, call the22// socket.connect method on ConatClient.2324export class ConatSocketClient extends ConatSocketBase {25queuedWrites: { data: any; headers?: Headers }[] = [];26private tcp?: TCP;27private alive?: KeepAlive;28private serverId?: string;29private loadBalancer?: (subject:string) => Promise<string>;3031constructor(opts: ConatSocketOptions) {32super(opts);33this.loadBalancer = opts.loadBalancer;34logger.silly("creating a client socket connecting to ", this.subject);35this.initTCP();36this.on("ready", () => {37for (const mesg of this.queuedWrites) {38this.sendDataToServer(mesg);39}40});41if (this.tcp == null) {42throw Error("bug");43}44}4546// subject to send messages/data to the socket server.47serverSubject = (): string => {48if (!this.serverId) {49throw Error("no server selected");50}51return `${this.subject}.server.${this.serverId}.${this.id}`;52};5354channel(channel: string) {55return this.client.socket.connect(this.subject + "." + channel, {56desc: `${this.desc ?? ""}.channel('${channel}')`,57maxQueueSize: this.maxQueueSize,58}) as ConatSocketClient;59}6061private initKeepAlive = () => {62this.alive?.close();63this.alive = keepAlive({64role: "client",65ping: async () =>66await this.request(null, {67headers: { [SOCKET_HEADER_CMD]: "ping" },68timeout: this.keepAliveTimeout,69}),70disconnect: this.disconnect,71keepAlive: this.keepAlive,72});73};7475initTCP() {76if (this.tcp != null) {77throw Error("this.tcp already initialized");78}79// request = send a socket request mesg to the server side of the socket80// either ack what's received or asking for a resend of missing data.81const request = async (mesg, opts?) =>82await this.client.request(this.serverSubject(), mesg, {83...opts,84headers: { ...opts?.headers, [SOCKET_HEADER_CMD]: "socket" },85});8687this.tcp = createTCP({88request,89role: this.role,90reset: this.disconnect,91send: this.sendToServer,92size: this.maxQueueSize,93});9495this.client.on("disconnected", this.tcp.send.resendLastUntilAcked);9697this.tcp.recv.on("message", (mesg) => {98this.emit("data", mesg.data, mesg.headers);99});100this.tcp.send.on("drain", () => {101this.emit("drain");102});103}104105waitUntilDrain = async () => {106await this.tcp?.send.waitUntilDrain();107};108109private sendCommandToServer = async (110cmd: "close" | "ping" | "connect",111timeout = DEFAULT_COMMAND_TIMEOUT,112) => {113const headers = {114[SOCKET_HEADER_CMD]: cmd,115id: this.id,116};117const subject = this.serverSubject();118logger.silly("sendCommandToServer", { cmd, timeout, subject });119const resp = await this.client.request(subject, null, {120headers,121timeout,122waitForInterest: cmd == "connect", // connect is exactly when other side might not be visible yet.123});124125const value = resp.data;126logger.silly("sendCommandToServer: got resp", { cmd, value, subject });127if (value?.error) {128throw Error(value?.error);129} else {130return value;131}132};133134private getServerId = async () => {135let id;136if (this.loadBalancer != null) {137logger.debug("getting server id from load balancer");138id = await this.loadBalancer(this.subject);139} else {140logger.debug("getting server id from socket server");141const resp = await this.client.request(142serverStatusSubject(this.subject),143null,144);145({ id } = resp.data);146}147this.serverId = id;148};149150protected async run() {151if (this.state == "closed") {152return;153}154// console.log(155// "client socket -- subscribing to ",156// `${this.subject}.client.${this.id}`,157// );158try {159await this.getServerId();160161logger.silly("run: getting subscription");162const sub = await this.client.subscribe(163`${this.subject}.client.${this.id}`,164);165// @ts-ignore166if (this.state == "closed") {167sub.close();168return;169}170// the disconnect function does this.sub.close()171this.sub = sub;172let resp: any = undefined;173await until(174async () => {175if (this.state == "closed") {176logger.silly("closed -- giving up on connecting");177return true;178}179try {180logger.silly("sending connect command to server", this.subject);181resp = await this.sendCommandToServer("connect");182this.alive?.recv();183return true;184} catch (err) {185logger.silly("failed to connect", this.subject, err);186}187return false;188},189{ start: 500, decay: 1.3, max: 10000 },190);191192if (resp != "connected") {193throw Error("failed to connect");194}195this.setState("ready");196this.initKeepAlive();197for await (const mesg of this.sub) {198this.alive?.recv();199const cmd = mesg.headers?.[SOCKET_HEADER_CMD];200if (cmd) {201logger.silly("client got cmd", cmd);202}203if (cmd == "socket") {204this.tcp?.send.handleRequest(mesg);205} else if (cmd == "close") {206this.close();207return;208} else if (cmd == "ping") {209// logger.silly("responding to ping from server", this.id);210mesg.respondSync(null);211} else if (mesg.isRequest()) {212// logger.silly("client got request");213this.emit("request", mesg);214} else {215// logger.silly("client got data"); //, { data: mesg.data });216this.tcp?.recv.process(mesg);217}218}219} catch (err) {220logger.silly("socket connect failed", err);221this.disconnect();222}223}224225private sendDataToServer = (mesg) => {226this.client.publishSync(this.serverSubject(), null, {227raw: mesg.raw,228headers: mesg.headers,229});230};231232private sendToServer = (mesg) => {233if (this.state != "ready") {234this.queuedWrites.push(mesg);235while (this.queuedWrites.length > this.maxQueueSize) {236this.queuedWrites.shift();237}238return;239}240// @ts-ignore241if (this.state == "closed") {242throw Error("closed");243}244if (this.role == "server") {245throw Error("sendToServer is only for use by the client");246} else {247// we are the client, so write to server248this.sendDataToServer(mesg);249}250};251252request = async (data, options?) => {253await this.waitUntilReady(options?.timeout);254if (this.state == "closed") {255throw Error("closed");256}257// console.log("sending request from client ", { subject, data, options });258return await this.client.request(this.serverSubject(), data, options);259};260261requestMany = async (data, options?): Promise<Subscription> => {262await this.waitUntilReady(options?.timeout);263return await this.client.requestMany(this.serverSubject(), data, options);264};265266async end({ timeout = 3000 }: { timeout?: number } = {}) {267if (this.state == "closed") {268return;269}270this.reconnection = false;271this.ended = true;272// tell server we're done273try {274await this.sendCommandToServer("close", timeout);275} catch {}276this.close();277}278279close() {280if (this.state == "closed") {281return;282}283this.sub?.close();284if (this.tcp != null) {285this.client.removeListener(286"disconnected",287this.tcp.send.resendLastUntilAcked,288);289}290this.queuedWrites = [];291// tell server we're gone (but don't wait)292(async () => {293try {294await this.sendCommandToServer("close");295} catch {}296})();297if (this.tcp != null) {298this.tcp.send.close();299this.tcp.recv.close();300// @ts-ignore301delete this.tcp;302}303this.alive?.close();304delete this.alive;305super.close();306}307308// writes will raise an exception if: (1) the socket is closed code='EPIPE', or (2)309// you hit maxQueueSize un-ACK'd messages, code='ENOBUFS'310write = (data, { headers }: { headers?: Headers } = {}): void => {311// @ts-ignore312if (this.state == "closed") {313throw new ConatError("closed", { code: "EPIPE" });314}315const mesg = messageData(data, { headers });316this.tcp?.send.process(mesg);317};318319iter = () => {320return new EventIterator<[any, Headers]>(this, "data");321};322}323324325