Path: blob/main/client/connections/RemoteConnectionToCore.ts
2605 views
import ICoreRequestPayload from '@secret-agent/interfaces/ICoreRequestPayload';1import * as WebSocket from 'ws';2import TypeSerializer from '@secret-agent/commons/TypeSerializer';3import { createPromise } from '@secret-agent/commons/utils';4import IResolvablePromise from '@secret-agent/interfaces/IResolvablePromise';5import { CanceledPromiseError } from '@secret-agent/commons/interfaces/IPendingWaitEvent';6import ConnectionToCore from './ConnectionToCore';7import IConnectionToCoreOptions from '../interfaces/IConnectionToCoreOptions';8import DisconnectedFromCoreError from './DisconnectedFromCoreError';910export default class RemoteConnectionToCore extends ConnectionToCore {11private webSocketOrError: IResolvablePromise<WebSocket | Error>;1213constructor(options: IConnectionToCoreOptions) {14if (!options.host) throw new Error('A remote connection to core needs a host parameter!');15super(options);16}1718protected async internalSendRequest(payload: ICoreRequestPayload): Promise<void> {19if (!this.webSocketOrError) throw new CanceledPromiseError('No websocket connection');20const message = TypeSerializer.stringify(payload);2122const webSocket = await this.getWebsocket();2324if (webSocket?.readyState !== WebSocket.OPEN) {25throw new CanceledPromiseError('Websocket was not open');26}2728return new Promise((resolve, reject) =>29webSocket.send(message, err => {30if (err) {31const { code } = err as any;32if (code === 'EPIPE' && super.isDisconnecting) {33return reject(new DisconnectedFromCoreError(this.resolvedHost));34}35reject(err);36} else resolve();37}),38);39}4041protected async destroyConnection(): Promise<any> {42const webSocket = await this.getWebsocket(false);43if (webSocket?.readyState === WebSocket.OPEN) {44try {45webSocket.off('close', this.onConnectionTerminated);46webSocket.off('error', this.internalDisconnect);47webSocket.terminate();48} catch (_) {49// ignore errors terminating50}51}52}5354protected async createConnection(): Promise<Error | null> {55// do this first to see if we can resolve the host56const hostOrError = await this.hostOrError;57if (hostOrError instanceof Error) return hostOrError;5859if (!this.webSocketOrError) {60this.webSocketOrError = connectToWebsocketHost(hostOrError);61try {62const webSocket = await this.getWebsocket();63webSocket.once('close', this.onConnectionTerminated);64webSocket.once('error', this.internalDisconnect);65webSocket.on('message', message => {66const payload = TypeSerializer.parse(message.toString(), 'REMOTE CORE');67this.onMessage(payload);68});69} catch (error) {70return error;71}72}73}7475private async getWebsocket(throwIfError = true): Promise<WebSocket> {76if (!this.webSocketOrError) return null;77const webSocketOrError = await this.webSocketOrError.promise;78if (webSocketOrError instanceof Error) {79if (throwIfError) throw webSocketOrError;80return null;81}82return webSocketOrError;83}84}8586function connectToWebsocketHost(host: string): IResolvablePromise<WebSocket | Error> {87const resolvable = createPromise<WebSocket | Error>(30e3);88const webSocket = new WebSocket(host);89function onError(error: Error): void {90if (error instanceof Error) resolvable.resolve(error);91else resolvable.resolve(new Error(`Error connecting to Websocket host -> ${error}`));92}93webSocket.once('close', onError);94webSocket.once('error', onError);95webSocket.once('open', () => {96webSocket.off('error', onError);97webSocket.off('close', onError);98resolvable.resolve(webSocket);99});100return resolvable;101}102103104