Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
ulixee
GitHub Repository: ulixee/secret-agent
Path: blob/main/client/connections/RemoteConnectionToCore.ts
2605 views
1
import ICoreRequestPayload from '@secret-agent/interfaces/ICoreRequestPayload';
2
import * as WebSocket from 'ws';
3
import TypeSerializer from '@secret-agent/commons/TypeSerializer';
4
import { createPromise } from '@secret-agent/commons/utils';
5
import IResolvablePromise from '@secret-agent/interfaces/IResolvablePromise';
6
import { CanceledPromiseError } from '@secret-agent/commons/interfaces/IPendingWaitEvent';
7
import ConnectionToCore from './ConnectionToCore';
8
import IConnectionToCoreOptions from '../interfaces/IConnectionToCoreOptions';
9
import DisconnectedFromCoreError from './DisconnectedFromCoreError';
10
11
export default class RemoteConnectionToCore extends ConnectionToCore {
12
private webSocketOrError: IResolvablePromise<WebSocket | Error>;
13
14
constructor(options: IConnectionToCoreOptions) {
15
if (!options.host) throw new Error('A remote connection to core needs a host parameter!');
16
super(options);
17
}
18
19
protected async internalSendRequest(payload: ICoreRequestPayload): Promise<void> {
20
if (!this.webSocketOrError) throw new CanceledPromiseError('No websocket connection');
21
const message = TypeSerializer.stringify(payload);
22
23
const webSocket = await this.getWebsocket();
24
25
if (webSocket?.readyState !== WebSocket.OPEN) {
26
throw new CanceledPromiseError('Websocket was not open');
27
}
28
29
return new Promise((resolve, reject) =>
30
webSocket.send(message, err => {
31
if (err) {
32
const { code } = err as any;
33
if (code === 'EPIPE' && super.isDisconnecting) {
34
return reject(new DisconnectedFromCoreError(this.resolvedHost));
35
}
36
reject(err);
37
} else resolve();
38
}),
39
);
40
}
41
42
protected async destroyConnection(): Promise<any> {
43
const webSocket = await this.getWebsocket(false);
44
if (webSocket?.readyState === WebSocket.OPEN) {
45
try {
46
webSocket.off('close', this.onConnectionTerminated);
47
webSocket.off('error', this.internalDisconnect);
48
webSocket.terminate();
49
} catch (_) {
50
// ignore errors terminating
51
}
52
}
53
}
54
55
protected async createConnection(): Promise<Error | null> {
56
// do this first to see if we can resolve the host
57
const hostOrError = await this.hostOrError;
58
if (hostOrError instanceof Error) return hostOrError;
59
60
if (!this.webSocketOrError) {
61
this.webSocketOrError = connectToWebsocketHost(hostOrError);
62
try {
63
const webSocket = await this.getWebsocket();
64
webSocket.once('close', this.onConnectionTerminated);
65
webSocket.once('error', this.internalDisconnect);
66
webSocket.on('message', message => {
67
const payload = TypeSerializer.parse(message.toString(), 'REMOTE CORE');
68
this.onMessage(payload);
69
});
70
} catch (error) {
71
return error;
72
}
73
}
74
}
75
76
private async getWebsocket(throwIfError = true): Promise<WebSocket> {
77
if (!this.webSocketOrError) return null;
78
const webSocketOrError = await this.webSocketOrError.promise;
79
if (webSocketOrError instanceof Error) {
80
if (throwIfError) throw webSocketOrError;
81
return null;
82
}
83
return webSocketOrError;
84
}
85
}
86
87
function connectToWebsocketHost(host: string): IResolvablePromise<WebSocket | Error> {
88
const resolvable = createPromise<WebSocket | Error>(30e3);
89
const webSocket = new WebSocket(host);
90
function onError(error: Error): void {
91
if (error instanceof Error) resolvable.resolve(error);
92
else resolvable.resolve(new Error(`Error connecting to Websocket host -> ${error}`));
93
}
94
webSocket.once('close', onError);
95
webSocket.once('error', onError);
96
webSocket.once('open', () => {
97
webSocket.off('error', onError);
98
webSocket.off('close', onError);
99
resolvable.resolve(webSocket);
100
});
101
return resolvable;
102
}
103
104