Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
ulixee
GitHub Repository: ulixee/secret-agent
Path: blob/main/client/lib/Handler.ts
1028 views
1
import { createPromise, pickRandom } from '@secret-agent/commons/utils';
2
import ShutdownHandler from '@secret-agent/commons/ShutdownHandler';
3
import Log, { hasBeenLoggedSymbol } from '@secret-agent/commons/Logger';
4
import { CanceledPromiseError } from '@secret-agent/commons/interfaces/IPendingWaitEvent';
5
import StateMachine from 'awaited-dom/base/StateMachine';
6
import IAgentCreateOptions from '../interfaces/IAgentCreateOptions';
7
import IConnectionToCoreOptions from '../interfaces/IConnectionToCoreOptions';
8
import Agent, { IState } from './Agent';
9
import ConnectionToCore from '../connections/ConnectionToCore';
10
import ConnectionFactory from '../connections/ConnectionFactory';
11
import DisconnectedFromCoreError from '../connections/DisconnectedFromCoreError';
12
13
type SettledDispatch = {
14
sessionId: string;
15
name: string;
16
error?: Error;
17
output: any;
18
input: any;
19
options: IAgentCreateOptions;
20
retries: number;
21
};
22
23
type PendingDispatch = {
24
resolution: Promise<Error | any>;
25
sessionId?: string;
26
options: IAgentCreateOptions;
27
retries: number;
28
};
29
30
const { log } = Log(module);
31
const { getState } = StateMachine<Agent, IState>();
32
33
export default class Handler {
34
public disconnectedDispatchRetries = 3;
35
public defaultAgentOptions: IAgentCreateOptions = {};
36
public get coreHosts(): Promise<string[]> {
37
return Promise.all(this.connections.map(x => x.hostOrError)).then(x => {
38
const hosts: string[] = [];
39
for (const host of x) {
40
if (host instanceof Error) continue;
41
hosts.push(host);
42
}
43
return hosts;
44
});
45
}
46
47
private readonly connections: ConnectionToCore[] = [];
48
private readonly dispatches: PendingDispatch[] = [];
49
50
private isClosing = false;
51
52
constructor(...connectionOptions: (IConnectionToCoreOptions | ConnectionToCore)[]) {
53
if (!connectionOptions.length) {
54
connectionOptions.push({});
55
}
56
57
for (const options of connectionOptions) {
58
const connection = ConnectionFactory.createConnection(options);
59
this.connections.push(connection);
60
connection.on('disconnected', this.onDisconnected.bind(this, connection));
61
}
62
63
ShutdownHandler.register(() => this.close());
64
this.registerUnhandledExceptionHandlers();
65
}
66
67
public async addConnectionToCore(
68
options: IConnectionToCoreOptions | ConnectionToCore,
69
): Promise<void> {
70
const connection = ConnectionFactory.createConnection(options);
71
const error = await connection.connect();
72
if (error) throw error;
73
this.connections.push(connection);
74
}
75
76
public async removeConnectionToCore(host: string): Promise<void> {
77
const wsHost = host.startsWith('ws') ? host : `ws://${host}`;
78
for (const connection of this.connections) {
79
const coreHost = await connection.hostOrError;
80
if (typeof coreHost === 'string' && coreHost === wsHost) {
81
await connection.disconnect();
82
}
83
}
84
}
85
86
public dispatchAgent(
87
runFn: (agent: Agent) => Promise<void>,
88
createAgentOptions?: IAgentCreateOptions,
89
): void {
90
const options = {
91
...this.defaultAgentOptions,
92
...createAgentOptions,
93
};
94
95
this.internalDispatchAgent(runFn, options, {
96
options,
97
resolution: null,
98
retries: 0,
99
});
100
}
101
102
public async createAgent(createAgentOptions: IAgentCreateOptions = {}): Promise<Agent> {
103
const options = {
104
...this.defaultAgentOptions,
105
...createAgentOptions,
106
};
107
const promise = createPromise<Agent>();
108
109
const connection = this.getConnection();
110
111
connection
112
.useAgent(options, agent => {
113
return agent
114
.then(() => {
115
// don't return until agent is closed
116
const onClose = new Promise<void>(resolve => agent.once('close', resolve));
117
promise.resolve(agent);
118
return onClose;
119
})
120
.catch(promise.reject);
121
})
122
.catch(promise.reject);
123
124
// NOTE: keep await to ensure createAgent stays in stack trace
125
return await promise.promise;
126
}
127
128
public async waitForAllDispatches(): Promise<SettledDispatch[]> {
129
const settledDispatches = new Map<PendingDispatch, Promise<SettledDispatch>>();
130
const startStack = new Error('').stack.slice(8); // "Error: \n" is 8 chars
131
132
do {
133
// clear out dispatches everytime you check it
134
const dispatches = this.dispatches.splice(0);
135
136
// put in request order
137
for (const dispatch of dispatches) {
138
settledDispatches.set(
139
dispatch,
140
this.resolveDispatch(startStack, dispatch).then(x => {
141
if (x.error) throw x.error;
142
return x;
143
}),
144
);
145
}
146
await Promise.all(settledDispatches.values());
147
148
await new Promise(setImmediate);
149
} while (this.dispatches.length);
150
151
return await Promise.all(settledDispatches.values());
152
}
153
154
public async waitForAllDispatchesSettled(): Promise<SettledDispatch[]> {
155
const settledDispatches = new Map<PendingDispatch, Promise<SettledDispatch>>();
156
const startStack = new Error('').stack.slice(8); // "Error: \n" is 8 chars
157
158
do {
159
// clear out dispatches everytime you check it
160
const dispatches = this.dispatches.splice(0);
161
for (const dispatch of dispatches) {
162
settledDispatches.set(dispatch, this.resolveDispatch(startStack, dispatch));
163
}
164
await Promise.all(settledDispatches.values());
165
166
await new Promise(setImmediate);
167
} while (this.dispatches.length);
168
169
return await Promise.all(settledDispatches.values());
170
}
171
172
public async close(error?: Error): Promise<void> {
173
if (this.isClosing) return;
174
this.isClosing = true;
175
// eslint-disable-next-line promise/no-promise-in-callback
176
await Promise.all(this.connections.map(x => x.disconnect(error).catch(() => null)));
177
}
178
179
private async resolveDispatch(
180
startStack: string,
181
dispatch: PendingDispatch,
182
): Promise<SettledDispatch> {
183
const { sessionId, resolution, options, retries } = dispatch;
184
const dispatchResolution = <SettledDispatch>{
185
options: { ...options, connectionToCore: undefined },
186
name: options.name,
187
sessionId,
188
error: undefined,
189
output: undefined,
190
retries,
191
};
192
const result = await resolution;
193
if (result instanceof Error) {
194
const marker = `------WAIT FOR ALL DISPATCHES`.padEnd(50, '-');
195
result.stack += `\n${marker}\n${startStack}`;
196
dispatchResolution.error = result;
197
} else {
198
dispatchResolution.output = result;
199
}
200
return dispatchResolution;
201
}
202
203
private internalDispatchAgent(
204
runFn: (agent: Agent) => Promise<void>,
205
options: IAgentCreateOptions,
206
dispatched: PendingDispatch,
207
): void {
208
// if no available connection, return
209
const connection = this.getConnection();
210
if (!connection) {
211
dispatched.resolution = Promise.resolve(
212
new Error("There aren't any connections available to dispatch this agent"),
213
);
214
this.dispatches.push(dispatched);
215
return;
216
}
217
218
dispatched.resolution = connection
219
.useAgent(options, async agent => {
220
try {
221
// set session id once connected
222
getState(agent).connection.onConnected = session => {
223
dispatched.sessionId = session.sessionId;
224
dispatched.options.name = session.sessionName;
225
};
226
await runFn(agent);
227
} finally {
228
await agent.close();
229
}
230
return agent.output.toJSON();
231
})
232
.catch(err => {
233
const canRetry =
234
!dispatched.sessionId && dispatched.retries < this.disconnectedDispatchRetries;
235
if (canRetry && !this.isClosing && this.connections.length) {
236
dispatched.retries += 1;
237
return this.internalDispatchAgent(runFn, options, dispatched);
238
}
239
240
return err;
241
});
242
243
this.dispatches.push(dispatched);
244
}
245
246
private getAvailableConnections(): ConnectionToCore[] {
247
// prefer a connection that can create a session right now
248
let connections = this.connections.filter(x => x.canCreateSessionNow());
249
if (!connections.length) connections = this.connections.filter(x => !x.isDisconnecting);
250
return connections;
251
}
252
253
private getConnection(): ConnectionToCore {
254
const connections = this.getAvailableConnections();
255
if (!connections.length) throw new Error('There are no active Core connections available.');
256
return pickRandom(connections);
257
}
258
259
private registerUnhandledExceptionHandlers(): void {
260
if (process.env.NODE_ENV === 'test') return;
261
262
process.on('uncaughtExceptionMonitor', this.close.bind(this));
263
process.on('unhandledRejection', this.logUnhandledError.bind(this));
264
}
265
266
private async logUnhandledError(error: Error): Promise<void> {
267
if (error instanceof DisconnectedFromCoreError) return;
268
if (!error || error[hasBeenLoggedSymbol]) return;
269
// if error and there are remote connections, log error here
270
if (this.connections.some(x => !!x.options.host)) {
271
log.error('UnhandledRejection (Client)', { error, sessionId: null });
272
}
273
// eslint-disable-next-line promise/no-promise-in-callback
274
await Promise.all(
275
this.connections.map(x => {
276
return x.logUnhandledError(error).catch(logError => {
277
if (logError instanceof CanceledPromiseError) return;
278
log.error('UnhandledRejection.CouldNotSendToCore', {
279
error: logError,
280
connectionHost: x.hostOrError,
281
sessionId: null,
282
});
283
});
284
}),
285
);
286
}
287
288
private onDisconnected(connection: ConnectionToCore): void {
289
const idx = this.connections.indexOf(connection);
290
if (idx >= 0) this.connections.splice(idx, 1);
291
}
292
}
293
294