Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
ulixee
GitHub Repository: ulixee/secret-agent
Path: blob/main/client/lib/CoreCommandQueue.ts
1028 views
1
import ISessionMeta from '@secret-agent/interfaces/ISessionMeta';
2
import { CanceledPromiseError } from '@secret-agent/commons/interfaces/IPendingWaitEvent';
3
import Queue from '@secret-agent/commons/Queue';
4
import ICoreRequestPayload from '@secret-agent/interfaces/ICoreRequestPayload';
5
import ConnectionToCore from '../connections/ConnectionToCore';
6
import { convertJsPathArgs } from './SetupAwaitedHandler';
7
import ICommandCounter from '../interfaces/ICommandCounter';
8
9
export default class CoreCommandQueue {
10
public get lastCommandId(): number {
11
return this.commandCounter?.lastCommandId;
12
}
13
14
public get nextCommandId(): number {
15
return this.commandCounter?.nextCommandId;
16
}
17
18
private readonly internalState: {
19
queue: Queue;
20
commandsToRecord: ICoreRequestPayload['recordCommands'];
21
};
22
23
private readonly commandCounter?: ICommandCounter;
24
private readonly sessionMarker: string = '';
25
private readonly meta: ISessionMeta;
26
private readonly connection: ConnectionToCore;
27
private flushOnTimeout: NodeJS.Timeout;
28
private flushes: Promise<any>[] = [];
29
30
private get internalQueue(): Queue {
31
return this.internalState.queue;
32
}
33
34
constructor(
35
meta: (ISessionMeta & { sessionName: string }) | null,
36
connection: ConnectionToCore,
37
commandCounter: ICommandCounter,
38
internalState?: CoreCommandQueue['internalState'],
39
) {
40
this.connection = connection;
41
if (meta) {
42
const markers = [
43
''.padEnd(50, '-'),
44
`------${meta.sessionName ?? ''}`.padEnd(50, '-'),
45
`------${meta.sessionId ?? ''}`.padEnd(50, '-'),
46
''.padEnd(50, '-'),
47
].join('\n');
48
this.sessionMarker = `\n\n${markers}`;
49
this.meta = { sessionId: meta.sessionId, tabId: meta.tabId, frameId: meta.frameId };
50
}
51
this.commandCounter = commandCounter;
52
53
this.internalState = internalState ?? {
54
queue: new Queue('CORE COMMANDS', 1),
55
commandsToRecord: [],
56
};
57
}
58
59
public record(command: { command: string; args: any[]; commandId?: number }): void {
60
this.internalState.commandsToRecord.push({
61
...command,
62
startDate: new Date(),
63
});
64
if (this.internalState.commandsToRecord.length > 1000) {
65
this.flush().catch(() => null);
66
} else if (!this.flushOnTimeout) {
67
this.flushOnTimeout = setTimeout(() => this.flush(), 1e3).unref();
68
}
69
}
70
71
public async flush(): Promise<void> {
72
clearTimeout(this.flushOnTimeout);
73
this.flushOnTimeout = null;
74
if (!this.internalState.commandsToRecord.length) return;
75
const recordCommands = [...this.internalState.commandsToRecord];
76
this.internalState.commandsToRecord.length = 0;
77
78
const flush = this.connection.sendRequest({
79
meta: this.meta,
80
command: 'Session.flush',
81
startDate: new Date(),
82
args: [],
83
recordCommands,
84
});
85
await flush;
86
this.flushes.push(flush);
87
// wait for all pending flushes
88
await Promise.all(this.flushes);
89
}
90
91
public async runOutOfBand<T>(command: string, ...args: any[]): Promise<T> {
92
return await this.sendRequest({
93
command,
94
args,
95
commandId: this.nextCommandId,
96
startDate: new Date(),
97
});
98
}
99
100
public run<T>(command: string, ...args: any[]): Promise<T> {
101
clearTimeout(this.flushOnTimeout);
102
this.flushOnTimeout = null;
103
if (this.connection.isDisconnecting) {
104
return Promise.resolve(null);
105
}
106
for (const arg of args) {
107
if (Array.isArray(arg)) {
108
convertJsPathArgs(arg);
109
}
110
}
111
const startDate = new Date();
112
const commandId = this.nextCommandId;
113
return this.internalQueue
114
.run<T>(async () => {
115
const recordCommands = [...this.internalState.commandsToRecord];
116
this.internalState.commandsToRecord.length = 0;
117
return await this.sendRequest<T>({
118
command,
119
args,
120
startDate,
121
commandId,
122
recordCommands,
123
});
124
})
125
.catch(error => {
126
error.stack += `${this.sessionMarker}`;
127
throw error;
128
});
129
}
130
131
public willStop(): void {
132
this.internalQueue.willStop();
133
}
134
135
public stop(cancelError: CanceledPromiseError): void {
136
this.internalQueue.stop(cancelError);
137
}
138
139
public createSharedQueue(meta: ISessionMeta & { sessionName: string }): CoreCommandQueue {
140
return new CoreCommandQueue(meta, this.connection, this.commandCounter, this.internalState);
141
}
142
143
private async sendRequest<T>(
144
payload: Omit<ICoreRequestPayload, 'meta' | 'messageId' | 'sendDate'>,
145
): Promise<T> {
146
if (this.connection.isDisconnecting) {
147
return Promise.resolve(null);
148
}
149
150
const response = await this.connection.sendRequest({
151
meta: this.meta,
152
...payload,
153
});
154
155
if (response) {
156
return response.data;
157
}
158
}
159
}
160
161