Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
ulixee
GitHub Repository: ulixee/secret-agent
Path: blob/main/commons/eventUtils.ts
1028 views
1
import { EventEmitter } from 'events';
2
import ITypedEventEmitter from '@secret-agent/interfaces/ITypedEventEmitter';
3
import IRegisteredEventListener from '@secret-agent/interfaces/IRegisteredEventListener';
4
import { IBoundLog } from '@secret-agent/interfaces/ILog';
5
import { createPromise } from './utils';
6
import IPendingWaitEvent, { CanceledPromiseError } from './interfaces/IPendingWaitEvent';
7
8
export class TypedEventEmitter<T> extends EventEmitter implements ITypedEventEmitter<T> {
9
public storeEventsWithoutListeners = false;
10
11
protected logger?: IBoundLog;
12
13
private pendingIdCounter = 0;
14
private pendingWaitEvents: IPendingWaitEvent[] = [];
15
16
private eventsToLog = new Set<string | symbol>();
17
private storedEventsByType = new Map<keyof T & (string | symbol), any[]>();
18
private reemitterCountByEventType: { [eventType: string]: number } = {};
19
20
public cancelPendingEvents(message?: string, excludeEvents?: (keyof T & string)[]): void {
21
const events = [...this.pendingWaitEvents];
22
this.pendingWaitEvents.length = 0;
23
while (events.length) {
24
const event = events.shift();
25
if (excludeEvents && excludeEvents.includes(event.event as any)) {
26
this.pendingWaitEvents.push(event);
27
continue;
28
}
29
if (message) event.error.message = message;
30
event.resolvable.reject(event.error);
31
}
32
}
33
34
public setEventsToLog<K extends keyof T & (string | symbol)>(events: K[]): void {
35
this.eventsToLog = new Set<string | symbol>(events);
36
}
37
38
public waitOn<K extends keyof T & (string | symbol)>(
39
eventType: K,
40
listenerFn?: (this: this, event?: T[K]) => boolean,
41
timeoutMillis = 30e3,
42
): Promise<T[K]> {
43
const promise = createPromise<T[K]>(
44
timeoutMillis ?? 30e3,
45
`Timeout waiting for ${String(eventType)}`,
46
);
47
48
this.pendingIdCounter += 1;
49
const id = this.pendingIdCounter;
50
51
this.pendingWaitEvents.push({
52
id,
53
event: eventType,
54
resolvable: promise,
55
error: new CanceledPromiseError(`Event (${String(eventType)}) canceled`),
56
});
57
const messageId = this.logger?.stats(`waitOn:${eventType}`, {
58
timeoutMillis,
59
});
60
61
const callbackFn = (result: T[K]): void => {
62
// give the listeners a second to register
63
if (!listenerFn || listenerFn.call(this, result)) {
64
this.logger?.stats(`waitOn.resolve:${eventType}`, {
65
parentLogId: messageId,
66
});
67
promise.resolve(result);
68
}
69
};
70
71
this.on(eventType, callbackFn);
72
73
return promise.promise.finally(() => {
74
this.off(eventType, callbackFn);
75
const idx = this.pendingWaitEvents.findIndex(x => x.id === id);
76
if (idx >= 0) this.pendingWaitEvents.splice(idx, 1);
77
});
78
}
79
80
public addEventEmitter<Y, K extends keyof T & keyof Y & (string | symbol)>(
81
emitter: TypedEventEmitter<Y>,
82
eventTypes: K[],
83
): IRegisteredEventListener[] {
84
const listeners: IRegisteredEventListener[] = [];
85
for (const eventName of eventTypes) {
86
const handler = emitter.emit.bind(emitter, eventName);
87
listeners.push({ handler, eventName, emitter: this });
88
super.on(eventName, handler);
89
this.reemitterCountByEventType[eventName as string] ??= 0;
90
this.reemitterCountByEventType[eventName as string] += 1;
91
}
92
return listeners;
93
}
94
95
public on<K extends keyof T & (string | symbol)>(
96
eventType: K,
97
listenerFn: (this: this, event?: T[K]) => any,
98
includeUnhandledEvents = false,
99
): this {
100
super.on(eventType, listenerFn);
101
return this.replayOrClearMissedEvents(includeUnhandledEvents, eventType);
102
}
103
104
public off<K extends keyof T & (string | symbol)>(
105
eventType: K,
106
listenerFn: (this: this, event?: T[K]) => any,
107
): this {
108
return super.off(eventType, listenerFn);
109
}
110
111
public once<K extends keyof T & (string | symbol)>(
112
eventType: K,
113
listenerFn: (this: this, event?: T[K]) => any,
114
includeUnhandledEvents = false,
115
): this {
116
super.once(eventType, listenerFn);
117
return this.replayOrClearMissedEvents(includeUnhandledEvents, eventType);
118
}
119
120
public emit<K extends keyof T & (string | symbol)>(
121
eventType: K,
122
event?: T[K],
123
sendInitiator?: object,
124
): boolean {
125
const listeners = super.listenerCount(eventType);
126
if (this.storeEventsWithoutListeners && !listeners) {
127
if (!this.storedEventsByType.has(eventType)) this.storedEventsByType.set(eventType, []);
128
this.storedEventsByType.get(eventType).push(event);
129
return false;
130
}
131
this.logEvent(eventType, event);
132
133
return super.emit(eventType, event, sendInitiator);
134
}
135
136
public addListener<K extends keyof T & (string | symbol)>(
137
eventType: K,
138
listenerFn: (this: this, event?: T[K]) => any,
139
includeUnhandledEvents = false,
140
): this {
141
return this.on(eventType, listenerFn, includeUnhandledEvents);
142
}
143
144
public removeListener<K extends keyof T & (string | symbol)>(
145
eventType: K,
146
listenerFn: (this: this, event?: T[K]) => any,
147
): this {
148
return super.removeListener(eventType, listenerFn);
149
}
150
151
public prependListener<K extends keyof T & (string | symbol)>(
152
eventType: K,
153
listenerFn: (this: this, event?: T[K]) => void,
154
includeUnhandledEvents = false,
155
): this {
156
super.prependListener(eventType, listenerFn);
157
return this.replayOrClearMissedEvents(includeUnhandledEvents, eventType);
158
}
159
160
public prependOnceListener<K extends keyof T & (string | symbol)>(
161
eventType: K,
162
listenerFn: (this: this, event?: T[K]) => void,
163
includeUnhandledEvents = false,
164
): this {
165
super.prependOnceListener(eventType, listenerFn);
166
return this.replayOrClearMissedEvents(includeUnhandledEvents, eventType);
167
}
168
169
private replayOrClearMissedEvents<K extends keyof T & (string | symbol)>(
170
shouldReplay: boolean,
171
eventType: K,
172
): this {
173
const events = this.storedEventsByType.get(eventType);
174
if (!events || !events.length) return this;
175
this.storedEventsByType.delete(eventType);
176
if (shouldReplay) {
177
for (const event of events) {
178
this.logEvent(eventType, event);
179
super.emit(eventType, event);
180
}
181
}
182
return this;
183
}
184
185
private logEvent<K extends keyof T & (string | symbol)>(eventType: K, event?: T[K]): void {
186
if (this.eventsToLog.has(eventType)) {
187
let data: any = event;
188
if (eventType) {
189
if (typeof event === 'object') {
190
if ((event as any).toJSON) {
191
data = (event as any).toJSON();
192
} else {
193
data = { ...event };
194
for (const [key, val] of Object.entries(data)) {
195
if (!val) continue;
196
if ((val as any).toJSON) {
197
data[key] = (val as any).toJSON();
198
}
199
}
200
}
201
}
202
}
203
this.logger?.stats(`emit:${eventType}`, data);
204
}
205
}
206
}
207
208