Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
ulixee
GitHub Repository: ulixee/secret-agent
Path: blob/main/client/connections/ConnectionToCore.ts
2605 views
1
import ICoreRequestPayload from '@secret-agent/interfaces/ICoreRequestPayload';
2
import ICoreEventPayload from '@secret-agent/interfaces/ICoreEventPayload';
3
import ICoreResponsePayload from '@secret-agent/interfaces/ICoreResponsePayload';
4
import { bindFunctions, createPromise } from '@secret-agent/commons/utils';
5
import IResolvablePromise from '@secret-agent/interfaces/IResolvablePromise';
6
import Log from '@secret-agent/commons/Logger';
7
import ISessionCreateOptions from '@secret-agent/interfaces/ISessionCreateOptions';
8
import ISessionMeta from '@secret-agent/interfaces/ISessionMeta';
9
import { CanceledPromiseError } from '@secret-agent/commons/interfaces/IPendingWaitEvent';
10
import ICoreConfigureOptions from '@secret-agent/interfaces/ICoreConfigureOptions';
11
import { TypedEventEmitter } from '@secret-agent/commons/eventUtils';
12
import SessionClosedOrMissingError from '@secret-agent/commons/SessionClosedOrMissingError';
13
import Resolvable from '@secret-agent/commons/Resolvable';
14
import ICoreConnectionEventPayload from '@secret-agent/interfaces/ICoreConnectionEventPayload';
15
import IConnectionToCoreOptions from '../interfaces/IConnectionToCoreOptions';
16
import CoreCommandQueue from '../lib/CoreCommandQueue';
17
import CoreSession from '../lib/CoreSession';
18
import { IAgentCreateOptions } from '../index';
19
import Agent from '../lib/Agent';
20
import CoreSessions from '../lib/CoreSessions';
21
import DisconnectedFromCoreError from './DisconnectedFromCoreError';
22
23
const { log } = Log(module);
24
25
export default abstract class ConnectionToCore extends TypedEventEmitter<{
26
disconnected: void;
27
connected: void;
28
}> {
29
public readonly commandQueue: CoreCommandQueue;
30
public readonly hostOrError: Promise<string | Error>;
31
public options: IConnectionToCoreOptions;
32
public isDisconnecting = false;
33
public isConnectionTerminated = false;
34
35
protected resolvedHost: string;
36
37
private connectPromise: IResolvablePromise<Error | null>;
38
private get connectOptions(): ICoreConfigureOptions & { isPersistent: boolean } {
39
return {
40
coreServerPort: this.options.coreServerPort,
41
localProxyPortStart: this.options.localProxyPortStart,
42
sessionsDir: this.options.sessionsDir,
43
isPersistent: this.options.isPersistent,
44
};
45
}
46
47
private connectRequestId: string;
48
private disconnectRequestId: string;
49
private didAutoConnect = false;
50
private coreSessions: CoreSessions;
51
private readonly pendingRequestsById = new Map<string, IResolvablePromiseWithId>();
52
private lastId = 0;
53
54
constructor(options?: IConnectionToCoreOptions) {
55
super();
56
this.options = options ?? { isPersistent: true };
57
this.commandQueue = new CoreCommandQueue(null, this, null);
58
this.coreSessions = new CoreSessions(
59
this.options.maxConcurrency,
60
this.options.agentTimeoutMillis,
61
);
62
63
if (this.options.host) {
64
this.hostOrError = Promise.resolve(this.options.host)
65
.then(x => {
66
if (!x.includes('://')) {
67
return `ws://${x}`;
68
}
69
return x;
70
})
71
.then(x => {
72
this.resolvedHost = x;
73
return this.resolvedHost;
74
})
75
.catch(err => err);
76
} else {
77
this.hostOrError = Promise.resolve(new Error('No host provided'));
78
}
79
bindFunctions(this);
80
}
81
82
protected abstract internalSendRequest(payload: ICoreRequestPayload): Promise<void>;
83
protected abstract createConnection(): Promise<Error | null>;
84
protected abstract destroyConnection(): Promise<any>;
85
86
public async connect(isAutoConnect = false): Promise<Error | null> {
87
if (!this.connectPromise) {
88
this.didAutoConnect = isAutoConnect;
89
this.connectPromise = new Resolvable();
90
try {
91
const startTime = new Date();
92
const connectError = await this.createConnection();
93
if (connectError) throw connectError;
94
if (this.isDisconnecting) {
95
if (this.coreSessions.size > 0 && !this.didAutoConnect) {
96
throw new DisconnectedFromCoreError(this.resolvedHost);
97
}
98
this.connectPromise.resolve();
99
}
100
// can be resolved if canceled by a disconnect
101
if (this.connectPromise.isResolved) return;
102
103
const connectResult = await this.internalSendRequestAndWait({
104
startDate: startTime,
105
command: 'Core.connect',
106
args: [this.connectOptions],
107
});
108
if (connectResult?.data) {
109
const { maxConcurrency } = connectResult.data;
110
if (
111
maxConcurrency &&
112
(!this.options.maxConcurrency || maxConcurrency < this.options.maxConcurrency)
113
) {
114
log.info('Overriding max concurrency with Core value', {
115
maxConcurrency,
116
sessionId: null,
117
});
118
this.coreSessions.concurrency = maxConcurrency;
119
this.options.maxConcurrency = maxConcurrency;
120
}
121
}
122
this.emit('connected');
123
} catch (err) {
124
this.connectPromise.resolve(err);
125
} finally {
126
if (!this.connectPromise.isResolved) this.connectPromise.resolve();
127
}
128
}
129
130
return this.connectPromise.promise;
131
}
132
133
public async disconnect(fatalError?: Error): Promise<void> {
134
// user triggered disconnect sends a disconnect to Core
135
const startTime = new Date();
136
await this.internalDisconnect(fatalError, async () => {
137
try {
138
await this.internalSendRequestAndWait(
139
{
140
command: 'Core.disconnect',
141
startDate: startTime,
142
args: [fatalError],
143
},
144
2e3,
145
);
146
} catch (error) {
147
// don't do anything
148
}
149
});
150
}
151
152
public willDisconnect(): void {
153
this.coreSessions.willStop();
154
this.commandQueue.willStop();
155
}
156
/////// PIPE FUNCTIONS /////////////////////////////////////////////////////////////////////////////////////////////
157
158
public async sendRequest(
159
payload: Omit<ICoreRequestPayload, 'messageId' | 'sendDate'>,
160
): Promise<ICoreResponsePayload> {
161
const result = await this.connect();
162
if (result) throw result;
163
164
return this.internalSendRequestAndWait(payload);
165
}
166
167
public onMessage(
168
payload: ICoreResponsePayload | ICoreEventPayload | ICoreConnectionEventPayload,
169
): void {
170
if ((payload as ICoreConnectionEventPayload).disconnecting) {
171
this.willDisconnect();
172
} else if ((payload as ICoreResponsePayload).responseId) {
173
payload = payload as ICoreResponsePayload;
174
this.onResponse(payload.responseId, payload);
175
} else if ((payload as ICoreEventPayload).listenerId) {
176
this.onEvent(payload as ICoreEventPayload);
177
} else {
178
throw new Error(`message could not be processed: ${JSON.stringify(payload)}`);
179
}
180
}
181
/////// SESSION FUNCTIONS //////////////////////////////////////////////////////////////////////////////////////////
182
183
public useAgent(
184
options: IAgentCreateOptions,
185
callbackFn: (agent: Agent) => Promise<any>,
186
): Promise<void> {
187
// just kick off
188
this.connect().catch(() => null);
189
return this.coreSessions.waitForAvailable(() => {
190
const agent = new Agent({
191
...options,
192
connectionToCore: this,
193
});
194
195
return callbackFn(agent);
196
});
197
}
198
199
public canCreateSessionNow(): boolean {
200
return this.isDisconnecting === false && this.coreSessions.hasAvailability();
201
}
202
203
public async createSession(options: ISessionCreateOptions): Promise<CoreSession> {
204
try {
205
const sessionMeta = await this.commandQueue.run<ISessionMeta>('Session.create', options);
206
const session = new CoreSession({ ...sessionMeta, sessionName: options.sessionName }, this);
207
this.coreSessions.track(session);
208
return session;
209
} catch (error) {
210
if (error instanceof DisconnectedFromCoreError && this.isDisconnecting) return null;
211
throw error;
212
}
213
}
214
215
public getSession(sessionId: string): CoreSession {
216
return this.coreSessions.get(sessionId);
217
}
218
219
public closeSession(coreSession: CoreSession): void {
220
this.coreSessions.untrack(coreSession.sessionId);
221
}
222
223
public async logUnhandledError(error: Error): Promise<void> {
224
await this.commandQueue.run('Core.logUnhandledError', error);
225
}
226
227
protected async internalDisconnect(
228
fatalError?: Error,
229
beforeClose?: () => Promise<any>,
230
): Promise<void> {
231
if (this.isDisconnecting) return;
232
this.isDisconnecting = true;
233
const logid = log.stats('ConnectionToCore.Disconnecting', {
234
host: this.hostOrError,
235
sessionId: null,
236
});
237
238
const hasSessions = this.coreSessions?.size > 0;
239
240
this.cancelPendingRequests();
241
242
if (this.connectPromise) {
243
if (!this.connectPromise.isResolved && hasSessions && !this.didAutoConnect) {
244
this.connectPromise.resolve(new DisconnectedFromCoreError(this.resolvedHost));
245
} else if (beforeClose) {
246
await beforeClose();
247
}
248
}
249
await this.destroyConnection();
250
log.stats('ConnectionToCore.Disconnected', {
251
parentLogId: logid,
252
host: this.hostOrError,
253
sessionId: null,
254
});
255
256
this.emit('disconnected');
257
}
258
259
protected async internalSendRequestAndWait(
260
payload: Omit<ICoreRequestPayload, 'messageId' | 'sendDate'>,
261
timeoutMs?: number,
262
): Promise<ICoreResponsePayload> {
263
const { promise, id, resolve } = this.createPendingResult();
264
const { command } = payload;
265
266
if (command === 'Core.connect') this.connectRequestId = id;
267
if (command === 'Core.disconnect') this.disconnectRequestId = id;
268
269
let timeout: NodeJS.Timeout;
270
if (timeoutMs) timeout = setTimeout(() => resolve(null), timeoutMs).unref();
271
try {
272
await this.internalSendRequest({
273
messageId: id,
274
sendDate: new Date(),
275
...payload,
276
});
277
} catch (error) {
278
clearTimeout(timeout);
279
if (error instanceof CanceledPromiseError) {
280
this.pendingRequestsById.delete(id);
281
return;
282
}
283
throw error;
284
}
285
286
// now run to completion with timeout
287
try {
288
return await promise;
289
} finally {
290
clearTimeout(timeout);
291
}
292
}
293
294
protected onEvent(payload: ICoreEventPayload): void {
295
const { meta, listenerId, eventArgs } = payload as ICoreEventPayload;
296
const session = this.getSession(meta.sessionId);
297
session?.onEvent(meta, listenerId, eventArgs);
298
}
299
300
protected async onConnectionTerminated(): Promise<void> {
301
if (this.isConnectionTerminated) return;
302
303
this.isConnectionTerminated = true;
304
await this.internalDisconnect();
305
if (this.connectRequestId) {
306
this.onResponse(this.connectRequestId, {
307
data: this.didAutoConnect ? new DisconnectedFromCoreError(this.resolvedHost) : null,
308
});
309
}
310
if (this.disconnectRequestId) {
311
this.onResponse(this.disconnectRequestId, {
312
data: null,
313
});
314
}
315
}
316
317
protected onResponse(id: string, message: ICoreResponsePayload): void {
318
const pending = this.pendingRequestsById.get(id);
319
if (!pending) return;
320
this.pendingRequestsById.delete(id);
321
const isInternalRequest = this.connectRequestId === id || this.disconnectRequestId === id;
322
if (this.disconnectRequestId === id) this.disconnectRequestId = null;
323
if (this.connectRequestId === id) this.connectRequestId = null;
324
325
if (message.data instanceof Error) {
326
let responseError = message.data;
327
const isDisconnected =
328
this.isDisconnecting ||
329
responseError.name === SessionClosedOrMissingError.name ||
330
(responseError as any).isDisconnecting === true;
331
delete (responseError as any).isDisconnecting;
332
333
if (!isInternalRequest && isDisconnected) {
334
responseError = new DisconnectedFromCoreError(this.resolvedHost);
335
}
336
this.rejectPendingRequest(pending, responseError);
337
} else {
338
pending.resolve({ data: message.data });
339
}
340
}
341
342
protected cancelPendingRequests(): void {
343
const host = String(this.resolvedHost);
344
for (const entry of this.pendingRequestsById.values()) {
345
const id = entry.id;
346
if (this.connectRequestId === id || this.disconnectRequestId === id) {
347
continue;
348
}
349
this.pendingRequestsById.delete(id);
350
this.rejectPendingRequest(entry, new DisconnectedFromCoreError(host));
351
}
352
this.commandQueue.stop(new DisconnectedFromCoreError(host));
353
this.coreSessions.stop(new DisconnectedFromCoreError(host));
354
}
355
356
private createPendingResult(): IResolvablePromiseWithId {
357
const resolvablePromise = createPromise<ICoreResponsePayload>() as IResolvablePromiseWithId;
358
this.lastId += 1;
359
const id = this.lastId.toString();
360
resolvablePromise.id = id;
361
this.pendingRequestsById.set(id, resolvablePromise);
362
return this.pendingRequestsById.get(id);
363
}
364
365
private rejectPendingRequest(pending: IResolvablePromiseWithId, error: Error): void {
366
error.stack += `\n${'------CONNECTION'.padEnd(50, '-')}\n${pending.stack}`;
367
pending.reject(error);
368
}
369
}
370
371
interface IResolvablePromiseWithId extends IResolvablePromise<ICoreResponsePayload> {
372
id: string;
373
}
374
375