Path: blob/master/src/packages/conat/socket/client.ts
1710 views
import {1messageData,2type Subscription,3type Headers,4ConatError,5} from "@cocalc/conat/core/client";6import { ConatSocketBase } from "./base";7import { type TCP, createTCP } from "./tcp";8import {9SOCKET_HEADER_CMD,10DEFAULT_COMMAND_TIMEOUT,11type ConatSocketOptions,12} from "./util";13import { EventIterator } from "@cocalc/util/event-iterator";14import { keepAlive, KeepAlive } from "./keepalive";15import { getLogger } from "@cocalc/conat/client";16import { until } from "@cocalc/util/async-utils";1718const logger = getLogger("socket:client");1920// DO NOT directly instantiate here -- instead, call the21// socket.connect method on ConatClient.2223export class ConatSocketClient extends ConatSocketBase {24queuedWrites: { data: any; headers?: Headers }[] = [];25private tcp?: TCP;26private alive?: KeepAlive;2728constructor(opts: ConatSocketOptions) {29super(opts);30logger.silly("creating a client socket connecting to ", this.subject);31this.initTCP();32this.on("ready", () => {33for (const mesg of this.queuedWrites) {34this.sendDataToServer(mesg);35}36});37if (this.tcp == null) {38throw Error("bug");39}40}4142channel(channel: string) {43return this.client.socket.connect(this.subject + "." + channel, {44desc: `${this.desc ?? ""}.channel('${channel}')`,45maxQueueSize: this.maxQueueSize,46}) as ConatSocketClient;47}4849private initKeepAlive = () => {50this.alive?.close();51this.alive = keepAlive({52role: "client",53ping: async () =>54await this.request(null, {55headers: { [SOCKET_HEADER_CMD]: "ping" },56timeout: this.keepAliveTimeout,57}),58disconnect: this.disconnect,59keepAlive: this.keepAlive,60});61};6263initTCP() {64if (this.tcp != null) {65throw Error("this.tcp already initialized");66}67// request = send a socket request mesg to the server side of the socket68// either ack what's received or asking for a resend of missing data.69const request = async (mesg, opts?) =>70await this.client.request(`${this.subject}.server.${this.id}`, mesg, {71...opts,72headers: { ...opts?.headers, [SOCKET_HEADER_CMD]: "socket" },73});7475this.tcp = createTCP({76request,77role: this.role,78reset: this.disconnect,79send: this.sendToServer,80size: this.maxQueueSize,81});8283this.client.on("disconnected", this.tcp.send.resendLastUntilAcked);8485this.tcp.recv.on("message", (mesg) => {86this.emit("data", mesg.data, mesg.headers);87});88this.tcp.send.on("drain", () => {89this.emit("drain");90});91}9293waitUntilDrain = async () => {94await this.tcp?.send.waitUntilDrain();95};9697private sendCommandToServer = async (98cmd: "close" | "ping" | "connect",99timeout = DEFAULT_COMMAND_TIMEOUT,100) => {101const headers = {102[SOCKET_HEADER_CMD]: cmd,103id: this.id,104};105const subject = `${this.subject}.server.${this.id}`;106logger.silly("sendCommandToServer", { cmd, timeout, subject });107const resp = await this.client.request(subject, null, {108headers,109timeout,110waitForInterest: cmd == "connect", // connect is exactly when other side might not be visible yet.111});112const value = resp.data;113logger.silly("sendCommandToServer: got resp", { cmd, value, subject });114if (value?.error) {115throw Error(value?.error);116} else {117return value;118}119};120121protected async run() {122if (this.state == "closed") {123return;124}125// console.log(126// "client socket -- subscribing to ",127// `${this.subject}.client.${this.id}`,128// );129try {130logger.silly("run: getting subscription");131const sub = await this.client.subscribe(132`${this.subject}.client.${this.id}`,133);134// @ts-ignore135if (this.state == "closed") {136sub.close();137return;138}139this.sub = sub;140let resp: any = undefined;141await until(142async () => {143if (this.state == "closed") {144logger.silly("closed -- giving up on connecting");145return true;146}147try {148logger.silly("sending connect command to server", this.subject);149resp = await this.sendCommandToServer("connect");150this.alive?.recv();151return true;152} catch (err) {153logger.silly("failed to connect", this.subject, err);154}155return false;156},157{ start: 500, decay: 1.3, max: 10000 },158);159160if (resp != "connected") {161throw Error("failed to connect");162}163this.setState("ready");164this.initKeepAlive();165for await (const mesg of this.sub) {166this.alive?.recv();167const cmd = mesg.headers?.[SOCKET_HEADER_CMD];168if (cmd) {169logger.silly("client got cmd", cmd);170}171if (cmd == "socket") {172this.tcp?.send.handleRequest(mesg);173} else if (cmd == "close") {174this.close();175return;176} else if (cmd == "ping") {177// logger.silly("responding to ping from server", this.id);178mesg.respondSync(null);179} else if (mesg.isRequest()) {180// logger.silly("client got request");181this.emit("request", mesg);182} else {183// logger.silly("client got data"); //, { data: mesg.data });184this.tcp?.recv.process(mesg);185}186}187} catch (err) {188logger.silly("socket connect failed", err);189this.disconnect();190}191}192193private sendDataToServer = (mesg) => {194const subject = `${this.subject}.server.${this.id}`;195this.client.publishSync(subject, null, {196raw: mesg.raw,197headers: mesg.headers,198});199};200201private sendToServer = (mesg) => {202if (this.state != "ready") {203this.queuedWrites.push(mesg);204while (this.queuedWrites.length > this.maxQueueSize) {205this.queuedWrites.shift();206}207return;208}209// @ts-ignore210if (this.state == "closed") {211throw Error("closed");212}213if (this.role == "server") {214throw Error("sendToServer is only for use by the client");215} else {216// we are the client, so write to server217this.sendDataToServer(mesg);218}219};220221request = async (data, options?) => {222await this.waitUntilReady(options?.timeout);223const subject = `${this.subject}.server.${this.id}`;224if (this.state == "closed") {225throw Error("closed");226}227// console.log("sending request from client ", { subject, data, options });228return await this.client.request(subject, data, options);229};230231requestMany = async (data, options?): Promise<Subscription> => {232await this.waitUntilReady(options?.timeout);233const subject = `${this.subject}.server.${this.id}`;234return await this.client.requestMany(subject, data, options);235};236237async end({ timeout = 3000 }: { timeout?: number } = {}) {238if (this.state == "closed") {239return;240}241this.reconnection = false;242this.ended = true;243// tell server we're done244try {245await this.sendCommandToServer("close", timeout);246} catch {}247this.close();248}249250close() {251if (this.state == "closed") {252return;253}254this.sub?.close();255if (this.tcp != null) {256this.client.removeListener(257"disconnected",258this.tcp.send.resendLastUntilAcked,259);260}261this.queuedWrites = [];262// tell server we're gone (but don't wait)263(async () => {264try {265await this.sendCommandToServer("close");266} catch {}267})();268if (this.tcp != null) {269this.tcp.send.close();270this.tcp.recv.close();271// @ts-ignore272delete this.tcp;273}274this.alive?.close();275delete this.alive;276super.close();277}278279// writes will raise an exception if: (1) the socket is closed code='EPIPE', or (2)280// you hit maxQueueSize un-ACK'd messages, code='ENOBUFS'281write = (data, { headers }: { headers?: Headers } = {}): void => {282// @ts-ignore283if (this.state == "closed") {284throw new ConatError("closed", { code: "EPIPE" });285}286const mesg = messageData(data, { headers });287this.tcp?.send.process(mesg);288};289290iter = () => {291return new EventIterator<[any, Headers]>(this, "data");292};293}294295296