Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/src/vs/base/common/event.ts
5220 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
import { CancelablePromise } from './async.js';
7
import { CancellationToken } from './cancellation.js';
8
import { diffSets } from './collections.js';
9
import { onUnexpectedError } from './errors.js';
10
import { createSingleCallFunction } from './functional.js';
11
import { combinedDisposable, Disposable, DisposableMap, DisposableStore, IDisposable, toDisposable } from './lifecycle.js';
12
import { LinkedList } from './linkedList.js';
13
import { IObservable, IObservableWithChange, IObserver } from './observable.js';
14
import { StopWatch } from './stopwatch.js';
15
import { MicrotaskDelay } from './symbols.js';
16
17
18
// -----------------------------------------------------------------------------------------------------------------------
19
// Uncomment the next line to print warnings whenever an emitter with listeners is disposed. That is a sign of code smell.
20
// -----------------------------------------------------------------------------------------------------------------------
21
const _enableDisposeWithListenerWarning = false
22
// || Boolean("TRUE") // causes a linter warning so that it cannot be pushed
23
;
24
25
26
// -----------------------------------------------------------------------------------------------------------------------
27
// Uncomment the next line to print warnings whenever a snapshotted event is used repeatedly without cleanup.
28
// See https://github.com/microsoft/vscode/issues/142851
29
// -----------------------------------------------------------------------------------------------------------------------
30
const _enableSnapshotPotentialLeakWarning = false
31
// || Boolean("TRUE") // causes a linter warning so that it cannot be pushed
32
;
33
34
/**
35
* An event with zero or one parameters that can be subscribed to. The event is a function itself.
36
*/
37
export interface Event<T> {
38
(listener: (e: T) => unknown, thisArgs?: any, disposables?: IDisposable[] | DisposableStore): IDisposable;
39
}
40
41
export namespace Event {
42
export const None: Event<any> = () => Disposable.None;
43
44
function _addLeakageTraceLogic(options: EmitterOptions) {
45
if (_enableSnapshotPotentialLeakWarning) {
46
const { onDidAddListener: origListenerDidAdd } = options;
47
const stack = Stacktrace.create();
48
let count = 0;
49
options.onDidAddListener = () => {
50
if (++count === 2) {
51
console.warn('snapshotted emitter LIKELY used public and SHOULD HAVE BEEN created with DisposableStore. snapshotted here');
52
stack.print();
53
}
54
origListenerDidAdd?.();
55
};
56
}
57
}
58
59
/**
60
* Given an event, returns another event which debounces calls and defers the listeners to a later task via a shared
61
* `setTimeout`. The event is converted into a signal (`Event<void>`) to avoid additional object creation as a
62
* result of merging events and to try prevent race conditions that could arise when using related deferred and
63
* non-deferred events.
64
*
65
* This is useful for deferring non-critical work (eg. general UI updates) to ensure it does not block critical work
66
* (eg. latency of keypress to text rendered).
67
*
68
* *NOTE* that this function returns an `Event` and it MUST be called with a `DisposableStore` whenever the returned
69
* event is accessible to "third parties", e.g the event is a public property. Otherwise a leaked listener on the
70
* returned event causes this utility to leak a listener on the original event.
71
*
72
* @param event The event source for the new event.
73
* @param flushOnListenerRemove Whether to fire all debounced events when a listener is removed. If this is not
74
* specified, some events could go missing. Use this if it's important that all events are processed, even if the
75
* listener gets disposed before the debounced event fires.
76
* @param disposable A disposable store to add the new EventEmitter to.
77
*/
78
export function defer(event: Event<unknown>, flushOnListenerRemove?: boolean, disposable?: DisposableStore): Event<void> {
79
return debounce<unknown, void>(event, () => void 0, 0, undefined, flushOnListenerRemove ?? true, undefined, disposable);
80
}
81
82
/**
83
* Given an event, returns another event which only fires once.
84
*
85
* @param event The event source for the new event.
86
*/
87
export function once<T>(event: Event<T>): Event<T> {
88
return (listener, thisArgs = null, disposables?) => {
89
// we need this, in case the event fires during the listener call
90
let didFire = false;
91
let result: IDisposable | undefined = undefined;
92
result = event(e => {
93
if (didFire) {
94
return;
95
} else if (result) {
96
result.dispose();
97
} else {
98
didFire = true;
99
}
100
101
return listener.call(thisArgs, e);
102
}, null, disposables);
103
104
if (didFire) {
105
result.dispose();
106
}
107
108
return result;
109
};
110
}
111
112
/**
113
* Given an event, returns another event which only fires once, and only when the condition is met.
114
*
115
* @param event The event source for the new event.
116
*/
117
export function onceIf<T>(event: Event<T>, condition: (e: T) => boolean): Event<T> {
118
return Event.once(Event.filter(event, condition));
119
}
120
121
/**
122
* Maps an event of one type into an event of another type using a mapping function, similar to how
123
* `Array.prototype.map` works.
124
*
125
* *NOTE* that this function returns an `Event` and it MUST be called with a `DisposableStore` whenever the returned
126
* event is accessible to "third parties", e.g the event is a public property. Otherwise a leaked listener on the
127
* returned event causes this utility to leak a listener on the original event.
128
*
129
* @param event The event source for the new event.
130
* @param map The mapping function.
131
* @param disposable A disposable store to add the new EventEmitter to.
132
*/
133
export function map<I, O>(event: Event<I>, map: (i: I) => O, disposable?: DisposableStore): Event<O> {
134
return snapshot((listener, thisArgs = null, disposables?) => event(i => listener.call(thisArgs, map(i)), null, disposables), disposable);
135
}
136
137
/**
138
* Wraps an event in another event that performs some function on the event object before firing.
139
*
140
* *NOTE* that this function returns an `Event` and it MUST be called with a `DisposableStore` whenever the returned
141
* event is accessible to "third parties", e.g the event is a public property. Otherwise a leaked listener on the
142
* returned event causes this utility to leak a listener on the original event.
143
*
144
* @param event The event source for the new event.
145
* @param each The function to perform on the event object.
146
* @param disposable A disposable store to add the new EventEmitter to.
147
*/
148
export function forEach<I>(event: Event<I>, each: (i: I) => void, disposable?: DisposableStore): Event<I> {
149
return snapshot((listener, thisArgs = null, disposables?) => event(i => { each(i); listener.call(thisArgs, i); }, null, disposables), disposable);
150
}
151
152
/**
153
* Wraps an event in another event that fires only when some condition is met.
154
*
155
* *NOTE* that this function returns an `Event` and it MUST be called with a `DisposableStore` whenever the returned
156
* event is accessible to "third parties", e.g the event is a public property. Otherwise a leaked listener on the
157
* returned event causes this utility to leak a listener on the original event.
158
*
159
* @param event The event source for the new event.
160
* @param filter The filter function that defines the condition. The event will fire for the object if this function
161
* returns true.
162
* @param disposable A disposable store to add the new EventEmitter to.
163
*/
164
export function filter<T, U>(event: Event<T | U>, filter: (e: T | U) => e is T, disposable?: DisposableStore): Event<T>;
165
export function filter<T>(event: Event<T>, filter: (e: T) => boolean, disposable?: DisposableStore): Event<T>;
166
export function filter<T, R>(event: Event<T | R>, filter: (e: T | R) => e is R, disposable?: DisposableStore): Event<R>;
167
export function filter<T>(event: Event<T>, filter: (e: T) => boolean, disposable?: DisposableStore): Event<T> {
168
return snapshot((listener, thisArgs = null, disposables?) => event(e => filter(e) && listener.call(thisArgs, e), null, disposables), disposable);
169
}
170
171
/**
172
* Given an event, returns the same event but typed as `Event<void>`.
173
*/
174
export function signal<T>(event: Event<T>): Event<void> {
175
return event as Event<any> as Event<void>;
176
}
177
178
/**
179
* Given a collection of events, returns a single event which emits whenever any of the provided events emit.
180
*/
181
export function any<T>(...events: Event<T>[]): Event<T>;
182
export function any(...events: Event<any>[]): Event<void>;
183
export function any<T>(...events: Event<T>[]): Event<T> {
184
return (listener, thisArgs = null, disposables?) => {
185
const disposable = combinedDisposable(...events.map(event => event(e => listener.call(thisArgs, e))));
186
return addAndReturnDisposable(disposable, disposables);
187
};
188
}
189
190
/**
191
* *NOTE* that this function returns an `Event` and it MUST be called with a `DisposableStore` whenever the returned
192
* event is accessible to "third parties", e.g the event is a public property. Otherwise a leaked listener on the
193
* returned event causes this utility to leak a listener on the original event.
194
*/
195
export function reduce<I, O>(event: Event<I>, merge: (last: O | undefined, event: I) => O, initial?: O, disposable?: DisposableStore): Event<O> {
196
let output: O | undefined = initial;
197
198
return map<I, O>(event, e => {
199
output = merge(output, e);
200
return output;
201
}, disposable);
202
}
203
204
function snapshot<T>(event: Event<T>, disposable: DisposableStore | undefined): Event<T> {
205
let listener: IDisposable | undefined;
206
207
const options: EmitterOptions | undefined = {
208
onWillAddFirstListener() {
209
listener = event(emitter.fire, emitter);
210
},
211
onDidRemoveLastListener() {
212
listener?.dispose();
213
}
214
};
215
216
if (!disposable) {
217
_addLeakageTraceLogic(options);
218
}
219
220
const emitter = new Emitter<T>(options);
221
222
disposable?.add(emitter);
223
224
return emitter.event;
225
}
226
227
/**
228
* Adds the IDisposable to the store if it's set, and returns it. Useful to
229
* Event function implementation.
230
*/
231
function addAndReturnDisposable<T extends IDisposable>(d: T, store: DisposableStore | IDisposable[] | undefined): T {
232
if (store instanceof Array) {
233
store.push(d);
234
} else if (store) {
235
store.add(d);
236
}
237
return d;
238
}
239
240
/**
241
* Given an event, creates a new emitter that event that will debounce events based on {@link delay} and give an
242
* array event object of all events that fired.
243
*
244
* *NOTE* that this function returns an `Event` and it MUST be called with a `DisposableStore` whenever the returned
245
* event is accessible to "third parties", e.g the event is a public property. Otherwise a leaked listener on the
246
* returned event causes this utility to leak a listener on the original event.
247
*
248
* @param event The original event to debounce.
249
* @param merge A function that reduces all events into a single event.
250
* @param delay The number of milliseconds to debounce.
251
* @param leading Whether to fire a leading event without debouncing.
252
* @param flushOnListenerRemove Whether to fire all debounced events when a listener is removed. If this is not
253
* specified, some events could go missing. Use this if it's important that all events are processed, even if the
254
* listener gets disposed before the debounced event fires.
255
* @param leakWarningThreshold See {@link EmitterOptions.leakWarningThreshold}.
256
* @param disposable A disposable store to register the debounce emitter to.
257
*/
258
export function debounce<T>(event: Event<T>, merge: (last: T | undefined, event: T) => T, delay?: number | typeof MicrotaskDelay, leading?: boolean, flushOnListenerRemove?: boolean, leakWarningThreshold?: number, disposable?: DisposableStore): Event<T>;
259
export function debounce<I, O>(event: Event<I>, merge: (last: O | undefined, event: I) => O, delay?: number | typeof MicrotaskDelay, leading?: boolean, flushOnListenerRemove?: boolean, leakWarningThreshold?: number, disposable?: DisposableStore): Event<O>;
260
export function debounce<I, O>(event: Event<I>, merge: (last: O | undefined, event: I) => O, delay: number | typeof MicrotaskDelay = 100, leading = false, flushOnListenerRemove = false, leakWarningThreshold?: number, disposable?: DisposableStore): Event<O> {
261
let subscription: IDisposable;
262
let output: O | undefined = undefined;
263
let handle: Timeout | undefined | null = undefined;
264
let numDebouncedCalls = 0;
265
let doFire: (() => void) | undefined;
266
267
const options: EmitterOptions | undefined = {
268
leakWarningThreshold,
269
onWillAddFirstListener() {
270
subscription = event(cur => {
271
numDebouncedCalls++;
272
output = merge(output, cur);
273
274
if (leading && !handle) {
275
emitter.fire(output);
276
output = undefined;
277
}
278
279
doFire = () => {
280
const _output = output;
281
output = undefined;
282
handle = undefined;
283
if (!leading || numDebouncedCalls > 1) {
284
emitter.fire(_output!);
285
}
286
numDebouncedCalls = 0;
287
};
288
289
if (typeof delay === 'number') {
290
if (handle) {
291
clearTimeout(handle);
292
}
293
handle = setTimeout(doFire, delay);
294
} else {
295
if (handle === undefined) {
296
handle = null;
297
queueMicrotask(doFire);
298
}
299
}
300
});
301
},
302
onWillRemoveListener() {
303
if (flushOnListenerRemove && numDebouncedCalls > 0) {
304
doFire?.();
305
}
306
},
307
onDidRemoveLastListener() {
308
doFire = undefined;
309
subscription.dispose();
310
}
311
};
312
313
if (!disposable) {
314
_addLeakageTraceLogic(options);
315
}
316
317
const emitter = new Emitter<O>(options);
318
319
disposable?.add(emitter);
320
321
return emitter.event;
322
}
323
324
/**
325
* Debounces an event, firing after some delay (default=0) with an array of all event original objects.
326
*
327
* *NOTE* that this function returns an `Event` and it MUST be called with a `DisposableStore` whenever the returned
328
* event is accessible to "third parties", e.g the event is a public property. Otherwise a leaked listener on the
329
* returned event causes this utility to leak a listener on the original event.
330
*
331
* @param event The event source for the new event.
332
* @param delay The number of milliseconds to debounce.
333
* @param flushOnListenerRemove Whether to fire all debounced events when a listener is removed. If this is not
334
* specified, some events could go missing. Use this if it's important that all events are processed, even if the
335
* listener gets disposed before the debounced event fires.
336
* @param disposable A disposable store to add the new EventEmitter to.
337
*/
338
export function accumulate<T>(event: Event<T>, delay: number | typeof MicrotaskDelay = 0, flushOnListenerRemove?: boolean, disposable?: DisposableStore): Event<T[]> {
339
return Event.debounce<T, T[]>(event, (last, e) => {
340
if (!last) {
341
return [e];
342
}
343
last.push(e);
344
return last;
345
}, delay, undefined, flushOnListenerRemove ?? true, undefined, disposable);
346
}
347
348
/**
349
* Throttles an event, ensuring the event is fired at most once during the specified delay period.
350
* Unlike debounce, throttle will fire immediately on the leading edge and/or after the delay on the trailing edge.
351
*
352
* *NOTE* that this function returns an `Event` and it MUST be called with a `DisposableStore` whenever the returned
353
* event is accessible to "third parties", e.g the event is a public property. Otherwise a leaked listener on the
354
* returned event causes this utility to leak a listener on the original event.
355
*
356
* @param event The event source for the new event.
357
* @param merge An accumulator function that merges events if multiple occur during the throttle period.
358
* @param delay The number of milliseconds to throttle.
359
* @param leading Whether to fire on the leading edge (immediately on first event).
360
* @param trailing Whether to fire on the trailing edge (after delay with the last value).
361
* @param leakWarningThreshold See {@link EmitterOptions.leakWarningThreshold}.
362
* @param disposable A disposable store to register the throttle emitter to.
363
*/
364
export function throttle<T>(event: Event<T>, merge: (last: T | undefined, event: T) => T, delay?: number | typeof MicrotaskDelay, leading?: boolean, trailing?: boolean, leakWarningThreshold?: number, disposable?: DisposableStore): Event<T>;
365
export function throttle<I, O>(event: Event<I>, merge: (last: O | undefined, event: I) => O, delay?: number | typeof MicrotaskDelay, leading?: boolean, trailing?: boolean, leakWarningThreshold?: number, disposable?: DisposableStore): Event<O>;
366
export function throttle<I, O>(event: Event<I>, merge: (last: O | undefined, event: I) => O, delay: number | typeof MicrotaskDelay = 100, leading = true, trailing = true, leakWarningThreshold?: number, disposable?: DisposableStore): Event<O> {
367
let subscription: IDisposable;
368
let output: O | undefined = undefined;
369
let handle: Timeout | undefined = undefined;
370
let numThrottledCalls = 0;
371
372
const options: EmitterOptions | undefined = {
373
leakWarningThreshold,
374
onWillAddFirstListener() {
375
subscription = event(cur => {
376
numThrottledCalls++;
377
output = merge(output, cur);
378
379
// If not currently throttling, fire immediately if leading is enabled
380
if (handle === undefined) {
381
if (leading) {
382
emitter.fire(output);
383
output = undefined;
384
numThrottledCalls = 0;
385
}
386
387
// Set up the throttle period
388
if (typeof delay === 'number') {
389
handle = setTimeout(() => {
390
// Fire on trailing edge if there were calls during throttle period
391
if (trailing && numThrottledCalls > 0) {
392
emitter.fire(output!);
393
}
394
output = undefined;
395
handle = undefined;
396
numThrottledCalls = 0;
397
}, delay);
398
} else {
399
// Use a special marker to indicate microtask is pending
400
handle = 0 as unknown as Timeout;
401
queueMicrotask(() => {
402
// Fire on trailing edge if there were calls during throttle period
403
if (trailing && numThrottledCalls > 0) {
404
emitter.fire(output!);
405
}
406
output = undefined;
407
handle = undefined;
408
numThrottledCalls = 0;
409
});
410
}
411
}
412
// If already throttling, just accumulate the value for trailing edge
413
});
414
},
415
onDidRemoveLastListener() {
416
subscription.dispose();
417
}
418
};
419
420
if (!disposable) {
421
_addLeakageTraceLogic(options);
422
}
423
424
const emitter = new Emitter<O>(options);
425
426
disposable?.add(emitter);
427
428
return emitter.event;
429
}
430
431
/**
432
* Filters an event such that some condition is _not_ met more than once in a row, effectively ensuring duplicate
433
* event objects from different sources do not fire the same event object.
434
*
435
* *NOTE* that this function returns an `Event` and it MUST be called with a `DisposableStore` whenever the returned
436
* event is accessible to "third parties", e.g the event is a public property. Otherwise a leaked listener on the
437
* returned event causes this utility to leak a listener on the original event.
438
*
439
* @param event The event source for the new event.
440
* @param equals The equality condition.
441
* @param disposable A disposable store to add the new EventEmitter to.
442
*
443
* @example
444
* ```
445
* // Fire only one time when a single window is opened or focused
446
* Event.latch(Event.any(onDidOpenWindow, onDidFocusWindow))
447
* ```
448
*/
449
export function latch<T>(event: Event<T>, equals: (a: T, b: T) => boolean = (a, b) => a === b, disposable?: DisposableStore): Event<T> {
450
let firstCall = true;
451
let cache: T;
452
453
return filter(event, value => {
454
const shouldEmit = firstCall || !equals(value, cache);
455
firstCall = false;
456
cache = value;
457
return shouldEmit;
458
}, disposable);
459
}
460
461
/**
462
* Splits an event whose parameter is a union type into 2 separate events for each type in the union.
463
*
464
* *NOTE* that this function returns an `Event` and it MUST be called with a `DisposableStore` whenever the returned
465
* event is accessible to "third parties", e.g the event is a public property. Otherwise a leaked listener on the
466
* returned event causes this utility to leak a listener on the original event.
467
*
468
* @example
469
* ```
470
* const event = new EventEmitter<number | undefined>().event;
471
* const [numberEvent, undefinedEvent] = Event.split(event, isUndefined);
472
* ```
473
*
474
* @param event The event source for the new event.
475
* @param isT A function that determines what event is of the first type.
476
* @param disposable A disposable store to add the new EventEmitter to.
477
*/
478
export function split<T, U>(event: Event<T | U>, isT: (e: T | U) => e is T, disposable?: DisposableStore): [Event<T>, Event<U>] {
479
return [
480
Event.filter(event, isT, disposable),
481
Event.filter(event, e => !isT(e), disposable) as Event<U>,
482
];
483
}
484
485
/**
486
* Buffers an event until it has a listener attached.
487
*
488
* *NOTE* that this function returns an `Event` and it MUST be called with a `DisposableStore` whenever the returned
489
* event is accessible to "third parties", e.g the event is a public property. Otherwise a leaked listener on the
490
* returned event causes this utility to leak a listener on the original event.
491
*
492
* @param event The event source for the new event.
493
* @param flushAfterTimeout Determines whether to flush the buffer after a timeout immediately or after a
494
* `setTimeout` when the first event listener is added.
495
* @param _buffer Internal: A source event array used for tests.
496
*
497
* @example
498
* ```
499
* // Start accumulating events, when the first listener is attached, flush
500
* // the event after a timeout such that multiple listeners attached before
501
* // the timeout would receive the event
502
* this.onInstallExtension = Event.buffer(service.onInstallExtension, true);
503
* ```
504
*/
505
export function buffer<T>(event: Event<T>, flushAfterTimeout = false, _buffer: T[] = [], disposable?: DisposableStore): Event<T> {
506
let buffer: T[] | null = _buffer.slice();
507
508
let listener: IDisposable | null = event(e => {
509
if (buffer) {
510
buffer.push(e);
511
} else {
512
emitter.fire(e);
513
}
514
});
515
516
if (disposable) {
517
disposable.add(listener);
518
}
519
520
const flush = () => {
521
buffer?.forEach(e => emitter.fire(e));
522
buffer = null;
523
};
524
525
const emitter = new Emitter<T>({
526
onWillAddFirstListener() {
527
if (!listener) {
528
listener = event(e => emitter.fire(e));
529
if (disposable) {
530
disposable.add(listener);
531
}
532
}
533
},
534
535
onDidAddFirstListener() {
536
if (buffer) {
537
if (flushAfterTimeout) {
538
setTimeout(flush);
539
} else {
540
flush();
541
}
542
}
543
},
544
545
onDidRemoveLastListener() {
546
if (listener) {
547
listener.dispose();
548
}
549
listener = null;
550
}
551
});
552
553
if (disposable) {
554
disposable.add(emitter);
555
}
556
557
return emitter.event;
558
}
559
/**
560
* Wraps the event in an {@link IChainableEvent}, allowing a more functional programming style.
561
*
562
* @example
563
* ```
564
* // Normal
565
* const onEnterPressNormal = Event.filter(
566
* Event.map(onKeyPress.event, e => new StandardKeyboardEvent(e)),
567
* e.keyCode === KeyCode.Enter
568
* ).event;
569
*
570
* // Using chain
571
* const onEnterPressChain = Event.chain(onKeyPress.event, $ => $
572
* .map(e => new StandardKeyboardEvent(e))
573
* .filter(e => e.keyCode === KeyCode.Enter)
574
* );
575
* ```
576
*/
577
export function chain<T, R>(event: Event<T>, sythensize: ($: IChainableSythensis<T>) => IChainableSythensis<R>): Event<R> {
578
const fn: Event<R> = (listener, thisArgs, disposables) => {
579
const cs = sythensize(new ChainableSynthesis()) as ChainableSynthesis;
580
return event(function (value) {
581
const result = cs.evaluate(value);
582
if (result !== HaltChainable) {
583
listener.call(thisArgs, result);
584
}
585
}, undefined, disposables);
586
};
587
588
return fn;
589
}
590
591
const HaltChainable = Symbol('HaltChainable');
592
593
class ChainableSynthesis implements IChainableSythensis<any> {
594
private readonly steps: ((input: any) => unknown)[] = [];
595
596
map<O>(fn: (i: any) => O): this {
597
this.steps.push(fn);
598
return this;
599
}
600
601
forEach(fn: (i: any) => void): this {
602
this.steps.push(v => {
603
fn(v);
604
return v;
605
});
606
return this;
607
}
608
609
filter(fn: (e: any) => boolean): this {
610
this.steps.push(v => fn(v) ? v : HaltChainable);
611
return this;
612
}
613
614
reduce<R>(merge: (last: R | undefined, event: any) => R, initial?: R | undefined): this {
615
let last = initial;
616
this.steps.push(v => {
617
last = merge(last, v);
618
return last;
619
});
620
return this;
621
}
622
623
latch(equals: (a: any, b: any) => boolean = (a, b) => a === b): ChainableSynthesis {
624
let firstCall = true;
625
let cache: any;
626
this.steps.push(value => {
627
const shouldEmit = firstCall || !equals(value, cache);
628
firstCall = false;
629
cache = value;
630
return shouldEmit ? value : HaltChainable;
631
});
632
633
return this;
634
}
635
636
public evaluate(value: any) {
637
for (const step of this.steps) {
638
value = step(value);
639
if (value === HaltChainable) {
640
break;
641
}
642
}
643
644
return value;
645
}
646
}
647
648
export interface IChainableSythensis<T> {
649
map<O>(fn: (i: T) => O): IChainableSythensis<O>;
650
forEach(fn: (i: T) => void): IChainableSythensis<T>;
651
filter<R extends T>(fn: (e: T) => e is R): IChainableSythensis<R>;
652
filter(fn: (e: T) => boolean): IChainableSythensis<T>;
653
reduce<R>(merge: (last: R, event: T) => R, initial: R): IChainableSythensis<R>;
654
reduce<R>(merge: (last: R | undefined, event: T) => R): IChainableSythensis<R>;
655
latch(equals?: (a: T, b: T) => boolean): IChainableSythensis<T>;
656
}
657
658
export interface NodeEventEmitter {
659
on(event: string | symbol, listener: Function): unknown;
660
removeListener(event: string | symbol, listener: Function): unknown;
661
}
662
663
/**
664
* Creates an {@link Event} from a node event emitter.
665
*/
666
export function fromNodeEventEmitter<T>(emitter: NodeEventEmitter, eventName: string, map: (...args: any[]) => T = id => id): Event<T> {
667
const fn = (...args: any[]) => result.fire(map(...args));
668
const onFirstListenerAdd = () => emitter.on(eventName, fn);
669
const onLastListenerRemove = () => emitter.removeListener(eventName, fn);
670
const result = new Emitter<T>({ onWillAddFirstListener: onFirstListenerAdd, onDidRemoveLastListener: onLastListenerRemove });
671
672
return result.event;
673
}
674
675
export interface DOMEventEmitter {
676
addEventListener(event: string | symbol, listener: Function): void;
677
removeEventListener(event: string | symbol, listener: Function): void;
678
}
679
680
/**
681
* Creates an {@link Event} from a DOM event emitter.
682
*/
683
export function fromDOMEventEmitter<T>(emitter: DOMEventEmitter, eventName: string, map: (...args: any[]) => T = id => id): Event<T> {
684
const fn = (...args: any[]) => result.fire(map(...args));
685
const onFirstListenerAdd = () => emitter.addEventListener(eventName, fn);
686
const onLastListenerRemove = () => emitter.removeEventListener(eventName, fn);
687
const result = new Emitter<T>({ onWillAddFirstListener: onFirstListenerAdd, onDidRemoveLastListener: onLastListenerRemove });
688
689
return result.event;
690
}
691
692
/**
693
* Creates a promise out of an event, using the {@link Event.once} helper.
694
*/
695
export function toPromise<T>(event: Event<T>, disposables?: IDisposable[] | DisposableStore): CancelablePromise<T> {
696
let cancelRef: () => void;
697
let listener: IDisposable;
698
const promise = new Promise((resolve) => {
699
listener = once(event)(resolve);
700
addToDisposables(listener, disposables);
701
702
// not resolved, matching the behavior of a normal disposal
703
cancelRef = () => {
704
disposeAndRemove(listener, disposables);
705
};
706
}) as CancelablePromise<T>;
707
promise.cancel = cancelRef!;
708
709
if (disposables) {
710
promise.finally(() => disposeAndRemove(listener, disposables));
711
}
712
713
return promise;
714
}
715
716
/**
717
* A convenience function for forwarding an event to another emitter which
718
* improves readability.
719
*
720
* This is similar to {@link Relay} but allows instantiating and forwarding
721
* on a single line and also allows for multiple source events.
722
* @param from The event to forward.
723
* @param to The emitter to forward the event to.
724
* @example
725
* Event.forward(event, emitter);
726
* // equivalent to
727
* event(e => emitter.fire(e));
728
* // equivalent to
729
* event(emitter.fire, emitter);
730
*/
731
export function forward<T>(from: Event<T>, to: Emitter<T>): IDisposable {
732
return from(e => to.fire(e));
733
}
734
735
/**
736
* Adds a listener to an event and calls the listener immediately with undefined as the event object.
737
*
738
* @example
739
* ```
740
* // Initialize the UI and update it when dataChangeEvent fires
741
* runAndSubscribe(dataChangeEvent, () => this._updateUI());
742
* ```
743
*/
744
export function runAndSubscribe<T>(event: Event<T>, handler: (e: T) => unknown, initial: T): IDisposable;
745
export function runAndSubscribe<T>(event: Event<T>, handler: (e: T | undefined) => unknown): IDisposable;
746
export function runAndSubscribe<T>(event: Event<T>, handler: (e: T | undefined) => unknown, initial?: T): IDisposable {
747
handler(initial);
748
return event(e => handler(e));
749
}
750
751
class EmitterObserver<T> implements IObserver {
752
753
readonly emitter: Emitter<T>;
754
755
private _counter = 0;
756
private _hasChanged = false;
757
758
constructor(readonly _observable: IObservable<T>, store: DisposableStore | undefined) {
759
const options: EmitterOptions = {
760
onWillAddFirstListener: () => {
761
_observable.addObserver(this);
762
763
// Communicate to the observable that we received its current value and would like to be notified about future changes.
764
this._observable.reportChanges();
765
},
766
onDidRemoveLastListener: () => {
767
_observable.removeObserver(this);
768
}
769
};
770
if (!store) {
771
_addLeakageTraceLogic(options);
772
}
773
this.emitter = new Emitter<T>(options);
774
if (store) {
775
store.add(this.emitter);
776
}
777
}
778
779
beginUpdate<T>(_observable: IObservable<T>): void {
780
// assert(_observable === this.obs);
781
this._counter++;
782
}
783
784
handlePossibleChange<T>(_observable: IObservable<T>): void {
785
// assert(_observable === this.obs);
786
}
787
788
handleChange<T, TChange>(_observable: IObservableWithChange<T, TChange>, _change: TChange): void {
789
// assert(_observable === this.obs);
790
this._hasChanged = true;
791
}
792
793
endUpdate<T>(_observable: IObservable<T>): void {
794
// assert(_observable === this.obs);
795
this._counter--;
796
if (this._counter === 0) {
797
this._observable.reportChanges();
798
if (this._hasChanged) {
799
this._hasChanged = false;
800
this.emitter.fire(this._observable.get());
801
}
802
}
803
}
804
}
805
806
/**
807
* Creates an event emitter that is fired when the observable changes.
808
* Each listeners subscribes to the emitter.
809
*/
810
export function fromObservable<T>(obs: IObservable<T>, store?: DisposableStore): Event<T> {
811
const observer = new EmitterObserver(obs, store);
812
return observer.emitter.event;
813
}
814
815
/**
816
* Each listener is attached to the observable directly.
817
*/
818
export function fromObservableLight(observable: IObservable<unknown>): Event<void> {
819
return (listener, thisArgs, disposables) => {
820
let count = 0;
821
let didChange = false;
822
const observer: IObserver = {
823
beginUpdate() {
824
count++;
825
},
826
endUpdate() {
827
count--;
828
if (count === 0) {
829
observable.reportChanges();
830
if (didChange) {
831
didChange = false;
832
listener.call(thisArgs);
833
}
834
}
835
},
836
handlePossibleChange() {
837
// noop
838
},
839
handleChange() {
840
didChange = true;
841
}
842
};
843
observable.addObserver(observer);
844
observable.reportChanges();
845
const disposable = {
846
dispose() {
847
observable.removeObserver(observer);
848
}
849
};
850
851
addToDisposables(disposable, disposables);
852
853
return disposable;
854
};
855
}
856
}
857
858
export interface EmitterOptions {
859
/**
860
* Optional function that's called *before* the very first listener is added
861
*/
862
onWillAddFirstListener?: Function;
863
/**
864
* Optional function that's called *after* the very first listener is added
865
*/
866
onDidAddFirstListener?: Function;
867
/**
868
* Optional function that's called after a listener is added
869
*/
870
onDidAddListener?: Function;
871
/**
872
* Optional function that's called *after* remove the very last listener
873
*/
874
onDidRemoveLastListener?: Function;
875
/**
876
* Optional function that's called *before* a listener is removed
877
*/
878
onWillRemoveListener?: Function;
879
/**
880
* Optional function that's called when a listener throws an error. Defaults to
881
* {@link onUnexpectedError}
882
*/
883
onListenerError?: (e: any) => void;
884
/**
885
* Number of listeners that are allowed before assuming a leak. Default to
886
* a globally configured value
887
*
888
* @see setGlobalLeakWarningThreshold
889
*/
890
leakWarningThreshold?: number;
891
/**
892
* Pass in a delivery queue, which is useful for ensuring
893
* in order event delivery across multiple emitters.
894
*/
895
deliveryQueue?: EventDeliveryQueue;
896
897
/** ONLY enable this during development */
898
_profName?: string;
899
}
900
901
902
export class EventProfiling {
903
904
static readonly all = new Set<EventProfiling>();
905
906
private static _idPool = 0;
907
908
readonly name: string;
909
public listenerCount: number = 0;
910
public invocationCount = 0;
911
public elapsedOverall = 0;
912
public durations: number[] = [];
913
914
private _stopWatch?: StopWatch;
915
916
constructor(name: string) {
917
this.name = `${name}_${EventProfiling._idPool++}`;
918
EventProfiling.all.add(this);
919
}
920
921
start(listenerCount: number): void {
922
this._stopWatch = new StopWatch();
923
this.listenerCount = listenerCount;
924
}
925
926
stop(): void {
927
if (this._stopWatch) {
928
const elapsed = this._stopWatch.elapsed();
929
this.durations.push(elapsed);
930
this.elapsedOverall += elapsed;
931
this.invocationCount += 1;
932
this._stopWatch = undefined;
933
}
934
}
935
}
936
937
let _globalLeakWarningThreshold = -1;
938
export function setGlobalLeakWarningThreshold(n: number): IDisposable {
939
const oldValue = _globalLeakWarningThreshold;
940
_globalLeakWarningThreshold = n;
941
return {
942
dispose() {
943
_globalLeakWarningThreshold = oldValue;
944
}
945
};
946
}
947
948
class LeakageMonitor {
949
950
private static _idPool = 1;
951
952
private _stacks: Map<string, number> | undefined;
953
private _warnCountdown: number = 0;
954
955
constructor(
956
private readonly _errorHandler: (err: Error) => void,
957
readonly threshold: number,
958
readonly name: string = (LeakageMonitor._idPool++).toString(16).padStart(3, '0')
959
) { }
960
961
dispose(): void {
962
this._stacks?.clear();
963
}
964
965
check(stack: Stacktrace, listenerCount: number): undefined | (() => void) {
966
967
const threshold = this.threshold;
968
if (threshold <= 0 || listenerCount < threshold) {
969
return undefined;
970
}
971
972
if (!this._stacks) {
973
this._stacks = new Map();
974
}
975
const count = (this._stacks.get(stack.value) || 0);
976
this._stacks.set(stack.value, count + 1);
977
this._warnCountdown -= 1;
978
979
if (this._warnCountdown <= 0) {
980
// only warn on first exceed and then every time the limit
981
// is exceeded by 50% again
982
this._warnCountdown = threshold * 0.5;
983
984
const [topStack, topCount] = this.getMostFrequentStack()!;
985
const message = `[${this.name}] potential listener LEAK detected, having ${listenerCount} listeners already. MOST frequent listener (${topCount}):`;
986
console.warn(message);
987
console.warn(topStack);
988
989
const error = new ListenerLeakError(message, topStack);
990
this._errorHandler(error);
991
}
992
993
return () => {
994
const count = (this._stacks!.get(stack.value) || 0);
995
this._stacks!.set(stack.value, count - 1);
996
};
997
}
998
999
getMostFrequentStack(): [string, number] | undefined {
1000
if (!this._stacks) {
1001
return undefined;
1002
}
1003
let topStack: [string, number] | undefined;
1004
let topCount: number = 0;
1005
for (const [stack, count] of this._stacks) {
1006
if (!topStack || topCount < count) {
1007
topStack = [stack, count];
1008
topCount = count;
1009
}
1010
}
1011
return topStack;
1012
}
1013
}
1014
1015
class Stacktrace {
1016
1017
static create() {
1018
const err = new Error();
1019
return new Stacktrace(err.stack ?? '');
1020
}
1021
1022
private constructor(readonly value: string) { }
1023
1024
print() {
1025
console.warn(this.value.split('\n').slice(2).join('\n'));
1026
}
1027
}
1028
1029
// error that is logged when going over the configured listener threshold
1030
export class ListenerLeakError extends Error {
1031
constructor(message: string, stack: string) {
1032
super(message);
1033
this.name = 'ListenerLeakError';
1034
this.stack = stack;
1035
}
1036
}
1037
1038
// SEVERE error that is logged when having gone way over the configured listener
1039
// threshold so that the emitter refuses to accept more listeners
1040
export class ListenerRefusalError extends Error {
1041
constructor(message: string, stack: string) {
1042
super(message);
1043
this.name = 'ListenerRefusalError';
1044
this.stack = stack;
1045
}
1046
}
1047
1048
let id = 0;
1049
class UniqueContainer<T> {
1050
stack?: Stacktrace;
1051
public id = id++;
1052
constructor(public readonly value: T) { }
1053
}
1054
const compactionThreshold = 2;
1055
1056
type ListenerContainer<T> = UniqueContainer<(data: T) => void>;
1057
type ListenerOrListeners<T> = (ListenerContainer<T> | undefined)[] | ListenerContainer<T>;
1058
1059
const forEachListener = <T>(listeners: ListenerOrListeners<T>, fn: (c: ListenerContainer<T>) => void) => {
1060
if (listeners instanceof UniqueContainer) {
1061
fn(listeners);
1062
} else {
1063
for (let i = 0; i < listeners.length; i++) {
1064
const l = listeners[i];
1065
if (l) {
1066
fn(l);
1067
}
1068
}
1069
}
1070
};
1071
1072
/**
1073
* The Emitter can be used to expose an Event to the public
1074
* to fire it from the insides.
1075
* Sample:
1076
class Document {
1077
1078
private readonly _onDidChange = new Emitter<(value:string)=>any>();
1079
1080
public onDidChange = this._onDidChange.event;
1081
1082
// getter-style
1083
// get onDidChange(): Event<(value:string)=>any> {
1084
// return this._onDidChange.event;
1085
// }
1086
1087
private _doIt() {
1088
//...
1089
this._onDidChange.fire(value);
1090
}
1091
}
1092
*/
1093
export class Emitter<T> {
1094
1095
private readonly _options?: EmitterOptions;
1096
private readonly _leakageMon?: LeakageMonitor;
1097
private readonly _perfMon?: EventProfiling;
1098
private _disposed?: true;
1099
private _event?: Event<T>;
1100
1101
/**
1102
* A listener, or list of listeners. A single listener is the most common
1103
* for event emitters (#185789), so we optimize that special case to avoid
1104
* wrapping it in an array (just like Node.js itself.)
1105
*
1106
* A list of listeners never 'downgrades' back to a plain function if
1107
* listeners are removed, for two reasons:
1108
*
1109
* 1. That's complicated (especially with the deliveryQueue)
1110
* 2. A listener with >1 listener is likely to have >1 listener again at
1111
* some point, and swapping between arrays and functions may[citation needed]
1112
* introduce unnecessary work and garbage.
1113
*
1114
* The array listeners can be 'sparse', to avoid reallocating the array
1115
* whenever any listener is added or removed. If more than `1 / compactionThreshold`
1116
* of the array is empty, only then is it resized.
1117
*/
1118
protected _listeners?: ListenerOrListeners<T>;
1119
1120
/**
1121
* Always to be defined if _listeners is an array. It's no longer a true
1122
* queue, but holds the dispatching 'state'. If `fire()` is called on an
1123
* emitter, any work left in the _deliveryQueue is finished first.
1124
*/
1125
private _deliveryQueue?: EventDeliveryQueuePrivate;
1126
protected _size = 0;
1127
1128
constructor(options?: EmitterOptions) {
1129
this._options = options;
1130
this._leakageMon = (_globalLeakWarningThreshold > 0 || this._options?.leakWarningThreshold)
1131
? new LeakageMonitor(options?.onListenerError ?? onUnexpectedError, this._options?.leakWarningThreshold ?? _globalLeakWarningThreshold) :
1132
undefined;
1133
this._perfMon = this._options?._profName ? new EventProfiling(this._options._profName) : undefined;
1134
this._deliveryQueue = this._options?.deliveryQueue as EventDeliveryQueuePrivate | undefined;
1135
}
1136
1137
dispose() {
1138
if (!this._disposed) {
1139
this._disposed = true;
1140
1141
// It is bad to have listeners at the time of disposing an emitter, it is worst to have listeners keep the emitter
1142
// alive via the reference that's embedded in their disposables. Therefore we loop over all remaining listeners and
1143
// unset their subscriptions/disposables. Looping and blaming remaining listeners is done on next tick because the
1144
// the following programming pattern is very popular:
1145
//
1146
// const someModel = this._disposables.add(new ModelObject()); // (1) create and register model
1147
// this._disposables.add(someModel.onDidChange(() => { ... }); // (2) subscribe and register model-event listener
1148
// ...later...
1149
// this._disposables.dispose(); disposes (1) then (2): don't warn after (1) but after the "overall dispose" is done
1150
1151
if (this._deliveryQueue?.current === this) {
1152
this._deliveryQueue.reset();
1153
}
1154
if (this._listeners) {
1155
if (_enableDisposeWithListenerWarning) {
1156
const listeners = this._listeners;
1157
queueMicrotask(() => {
1158
forEachListener(listeners, l => l.stack?.print());
1159
});
1160
}
1161
1162
this._listeners = undefined;
1163
this._size = 0;
1164
}
1165
this._options?.onDidRemoveLastListener?.();
1166
this._leakageMon?.dispose();
1167
}
1168
}
1169
1170
/**
1171
* For the public to allow to subscribe
1172
* to events from this Emitter
1173
*/
1174
get event(): Event<T> {
1175
this._event ??= (callback: (e: T) => unknown, thisArgs?: any, disposables?: IDisposable[] | DisposableStore) => {
1176
if (this._leakageMon && this._size > this._leakageMon.threshold ** 2) {
1177
const message = `[${this._leakageMon.name}] REFUSES to accept new listeners because it exceeded its threshold by far (${this._size} vs ${this._leakageMon.threshold})`;
1178
console.warn(message);
1179
1180
const tuple = this._leakageMon.getMostFrequentStack() ?? ['UNKNOWN stack', -1];
1181
const error = new ListenerRefusalError(`${message}. HINT: Stack shows most frequent listener (${tuple[1]}-times)`, tuple[0]);
1182
const errorHandler = this._options?.onListenerError || onUnexpectedError;
1183
errorHandler(error);
1184
1185
return Disposable.None;
1186
}
1187
1188
if (this._disposed) {
1189
// todo: should we warn if a listener is added to a disposed emitter? This happens often
1190
return Disposable.None;
1191
}
1192
1193
if (thisArgs) {
1194
callback = callback.bind(thisArgs);
1195
}
1196
1197
const contained = new UniqueContainer(callback);
1198
1199
let removeMonitor: Function | undefined;
1200
let stack: Stacktrace | undefined;
1201
if (this._leakageMon && this._size >= Math.ceil(this._leakageMon.threshold * 0.2)) {
1202
// check and record this emitter for potential leakage
1203
contained.stack = Stacktrace.create();
1204
removeMonitor = this._leakageMon.check(contained.stack, this._size + 1);
1205
}
1206
1207
if (_enableDisposeWithListenerWarning) {
1208
contained.stack = stack ?? Stacktrace.create();
1209
}
1210
1211
if (!this._listeners) {
1212
this._options?.onWillAddFirstListener?.(this);
1213
this._listeners = contained;
1214
this._options?.onDidAddFirstListener?.(this);
1215
} else if (this._listeners instanceof UniqueContainer) {
1216
this._deliveryQueue ??= new EventDeliveryQueuePrivate();
1217
this._listeners = [this._listeners, contained];
1218
} else {
1219
this._listeners.push(contained);
1220
}
1221
this._options?.onDidAddListener?.(this);
1222
1223
this._size++;
1224
1225
1226
const result = toDisposable(() => {
1227
removeMonitor?.();
1228
this._removeListener(contained);
1229
});
1230
addToDisposables(result, disposables);
1231
1232
return result;
1233
};
1234
1235
return this._event;
1236
}
1237
1238
private _removeListener(listener: ListenerContainer<T>) {
1239
this._options?.onWillRemoveListener?.(this);
1240
1241
if (!this._listeners) {
1242
return; // expected if a listener gets disposed
1243
}
1244
1245
if (this._size === 1) {
1246
this._listeners = undefined;
1247
this._options?.onDidRemoveLastListener?.(this);
1248
this._size = 0;
1249
return;
1250
}
1251
1252
// size > 1 which requires that listeners be a list:
1253
const listeners = this._listeners as (ListenerContainer<T> | undefined)[];
1254
1255
const index = listeners.indexOf(listener);
1256
if (index === -1) {
1257
console.log('disposed?', this._disposed);
1258
console.log('size?', this._size);
1259
console.log('arr?', JSON.stringify(this._listeners));
1260
throw new Error('Attempted to dispose unknown listener');
1261
}
1262
1263
this._size--;
1264
listeners[index] = undefined;
1265
1266
const adjustDeliveryQueue = this._deliveryQueue!.current === this;
1267
if (this._size * compactionThreshold <= listeners.length) {
1268
let n = 0;
1269
for (let i = 0; i < listeners.length; i++) {
1270
if (listeners[i]) {
1271
listeners[n++] = listeners[i];
1272
} else if (adjustDeliveryQueue && n < this._deliveryQueue!.end) {
1273
this._deliveryQueue!.end--;
1274
if (n < this._deliveryQueue!.i) {
1275
this._deliveryQueue!.i--;
1276
}
1277
}
1278
}
1279
listeners.length = n;
1280
}
1281
}
1282
1283
private _deliver(listener: undefined | UniqueContainer<(value: T) => void>, value: T) {
1284
if (!listener) {
1285
return;
1286
}
1287
1288
const errorHandler = this._options?.onListenerError || onUnexpectedError;
1289
if (!errorHandler) {
1290
listener.value(value);
1291
return;
1292
}
1293
1294
try {
1295
listener.value(value);
1296
} catch (e) {
1297
errorHandler(e);
1298
}
1299
}
1300
1301
/** Delivers items in the queue. Assumes the queue is ready to go. */
1302
private _deliverQueue(dq: EventDeliveryQueuePrivate) {
1303
const listeners = dq.current!._listeners! as (ListenerContainer<T> | undefined)[];
1304
while (dq.i < dq.end) {
1305
// important: dq.i is incremented before calling deliver() because it might reenter deliverQueue()
1306
this._deliver(listeners[dq.i++], dq.value as T);
1307
}
1308
dq.reset();
1309
}
1310
1311
/**
1312
* To be kept private to fire an event to
1313
* subscribers
1314
*/
1315
fire(event: T): void {
1316
if (this._deliveryQueue?.current) {
1317
this._deliverQueue(this._deliveryQueue);
1318
this._perfMon?.stop(); // last fire() will have starting perfmon, stop it before starting the next dispatch
1319
}
1320
1321
this._perfMon?.start(this._size);
1322
1323
if (!this._listeners) {
1324
// no-op
1325
} else if (this._listeners instanceof UniqueContainer) {
1326
this._deliver(this._listeners, event);
1327
} else {
1328
const dq = this._deliveryQueue!;
1329
dq.enqueue(this, event, this._listeners.length);
1330
this._deliverQueue(dq);
1331
}
1332
1333
this._perfMon?.stop();
1334
}
1335
1336
hasListeners(): boolean {
1337
return this._size > 0;
1338
}
1339
}
1340
1341
export interface EventDeliveryQueue {
1342
_isEventDeliveryQueue: true;
1343
}
1344
1345
export const createEventDeliveryQueue = (): EventDeliveryQueue => new EventDeliveryQueuePrivate();
1346
1347
class EventDeliveryQueuePrivate implements EventDeliveryQueue {
1348
declare _isEventDeliveryQueue: true;
1349
1350
/**
1351
* Index in current's listener list.
1352
*/
1353
public i = -1;
1354
1355
/**
1356
* The last index in the listener's list to deliver.
1357
*/
1358
public end = 0;
1359
1360
/**
1361
* Emitter currently being dispatched on. Emitter._listeners is always an array.
1362
*/
1363
public current?: Emitter<any>;
1364
/**
1365
* Currently emitting value. Defined whenever `current` is.
1366
*/
1367
public value?: unknown;
1368
1369
public enqueue<T>(emitter: Emitter<T>, value: T, end: number) {
1370
this.i = 0;
1371
this.end = end;
1372
this.current = emitter;
1373
this.value = value;
1374
}
1375
1376
public reset() {
1377
this.i = this.end; // force any current emission loop to stop, mainly for during dispose
1378
this.current = undefined;
1379
this.value = undefined;
1380
}
1381
}
1382
1383
export interface IWaitUntil {
1384
token: CancellationToken;
1385
waitUntil(thenable: Promise<unknown>): void;
1386
}
1387
1388
export type IWaitUntilData<T> = Omit<Omit<T, 'waitUntil'>, 'token'>;
1389
1390
export class AsyncEmitter<T extends IWaitUntil> extends Emitter<T> {
1391
1392
private _asyncDeliveryQueue?: LinkedList<[(ev: T) => void, IWaitUntilData<T>]>;
1393
1394
async fireAsync(data: IWaitUntilData<T>, token: CancellationToken, promiseJoin?: (p: Promise<unknown>, listener: Function) => Promise<unknown>): Promise<void> {
1395
if (!this._listeners) {
1396
return;
1397
}
1398
1399
if (!this._asyncDeliveryQueue) {
1400
this._asyncDeliveryQueue = new LinkedList();
1401
}
1402
1403
forEachListener(this._listeners, listener => this._asyncDeliveryQueue!.push([listener.value, data]));
1404
1405
while (this._asyncDeliveryQueue.size > 0 && !token.isCancellationRequested) {
1406
1407
const [listener, data] = this._asyncDeliveryQueue.shift()!;
1408
const thenables: Promise<unknown>[] = [];
1409
1410
// eslint-disable-next-line local/code-no-dangerous-type-assertions
1411
const event = <T>{
1412
...data,
1413
token,
1414
waitUntil: (p: Promise<unknown>): void => {
1415
if (Object.isFrozen(thenables)) {
1416
throw new Error('waitUntil can NOT be called asynchronous');
1417
}
1418
if (promiseJoin) {
1419
p = promiseJoin(p, listener);
1420
}
1421
thenables.push(p);
1422
}
1423
};
1424
1425
try {
1426
listener(event);
1427
} catch (e) {
1428
onUnexpectedError(e);
1429
continue;
1430
}
1431
1432
// freeze thenables-collection to enforce sync-calls to
1433
// wait until and then wait for all thenables to resolve
1434
Object.freeze(thenables);
1435
1436
await Promise.allSettled(thenables).then(values => {
1437
for (const value of values) {
1438
if (value.status === 'rejected') {
1439
onUnexpectedError(value.reason);
1440
}
1441
}
1442
});
1443
}
1444
}
1445
}
1446
1447
1448
export class PauseableEmitter<T> extends Emitter<T> {
1449
1450
private _isPaused = 0;
1451
protected _eventQueue = new LinkedList<T>();
1452
private _mergeFn?: (input: T[]) => T;
1453
1454
public get isPaused(): boolean {
1455
return this._isPaused !== 0;
1456
}
1457
1458
constructor(options?: EmitterOptions & { merge?: (input: T[]) => T }) {
1459
super(options);
1460
this._mergeFn = options?.merge;
1461
}
1462
1463
pause(): void {
1464
this._isPaused++;
1465
}
1466
1467
resume(): void {
1468
if (this._isPaused !== 0 && --this._isPaused === 0) {
1469
if (this._mergeFn) {
1470
// use the merge function to create a single composite
1471
// event. make a copy in case firing pauses this emitter
1472
if (this._eventQueue.size > 0) {
1473
const events = Array.from(this._eventQueue);
1474
this._eventQueue.clear();
1475
super.fire(this._mergeFn(events));
1476
}
1477
1478
} else {
1479
// no merging, fire each event individually and test
1480
// that this emitter isn't paused halfway through
1481
while (!this._isPaused && this._eventQueue.size !== 0) {
1482
super.fire(this._eventQueue.shift()!);
1483
}
1484
}
1485
}
1486
}
1487
1488
override fire(event: T): void {
1489
if (this._size) {
1490
if (this._isPaused !== 0) {
1491
this._eventQueue.push(event);
1492
} else {
1493
super.fire(event);
1494
}
1495
}
1496
}
1497
}
1498
1499
export class DebounceEmitter<T> extends PauseableEmitter<T> {
1500
1501
private readonly _delay: number;
1502
private _handle: Timeout | undefined;
1503
1504
constructor(options: EmitterOptions & { merge: (input: T[]) => T; delay?: number }) {
1505
super(options);
1506
this._delay = options.delay ?? 100;
1507
}
1508
1509
override fire(event: T): void {
1510
if (!this._handle) {
1511
this.pause();
1512
this._handle = setTimeout(() => {
1513
this._handle = undefined;
1514
this.resume();
1515
}, this._delay);
1516
}
1517
super.fire(event);
1518
}
1519
}
1520
1521
/**
1522
* An emitter which queue all events and then process them at the
1523
* end of the event loop.
1524
*/
1525
export class MicrotaskEmitter<T> extends Emitter<T> {
1526
private _queuedEvents: T[] = [];
1527
private _mergeFn?: (input: T[]) => T;
1528
1529
constructor(options?: EmitterOptions & { merge?: (input: T[]) => T }) {
1530
super(options);
1531
this._mergeFn = options?.merge;
1532
}
1533
override fire(event: T): void {
1534
1535
if (!this.hasListeners()) {
1536
return;
1537
}
1538
1539
this._queuedEvents.push(event);
1540
if (this._queuedEvents.length === 1) {
1541
queueMicrotask(() => {
1542
if (this._mergeFn) {
1543
super.fire(this._mergeFn(this._queuedEvents));
1544
} else {
1545
this._queuedEvents.forEach(e => super.fire(e));
1546
}
1547
this._queuedEvents = [];
1548
});
1549
}
1550
}
1551
}
1552
1553
/**
1554
* An event emitter that multiplexes many events into a single event.
1555
*
1556
* @example Listen to the `onData` event of all `Thing`s, dynamically adding and removing `Thing`s
1557
* to the multiplexer as needed.
1558
*
1559
* ```typescript
1560
* const anythingDataMultiplexer = new EventMultiplexer<{ data: string }>();
1561
*
1562
* const thingListeners = DisposableMap<Thing, IDisposable>();
1563
*
1564
* thingService.onDidAddThing(thing => {
1565
* thingListeners.set(thing, anythingDataMultiplexer.add(thing.onData);
1566
* });
1567
* thingService.onDidRemoveThing(thing => {
1568
* thingListeners.deleteAndDispose(thing);
1569
* });
1570
*
1571
* anythingDataMultiplexer.event(e => {
1572
* console.log('Something fired data ' + e.data)
1573
* });
1574
* ```
1575
*/
1576
export class EventMultiplexer<T> implements IDisposable {
1577
1578
private readonly emitter: Emitter<T>;
1579
private hasListeners = false;
1580
private events: { event: Event<T>; listener: IDisposable | null }[] = [];
1581
1582
constructor() {
1583
this.emitter = new Emitter<T>({
1584
onWillAddFirstListener: () => this.onFirstListenerAdd(),
1585
onDidRemoveLastListener: () => this.onLastListenerRemove()
1586
});
1587
}
1588
1589
get event(): Event<T> {
1590
return this.emitter.event;
1591
}
1592
1593
add(event: Event<T>): IDisposable {
1594
const e = { event: event, listener: null };
1595
this.events.push(e);
1596
1597
if (this.hasListeners) {
1598
this.hook(e);
1599
}
1600
1601
const dispose = () => {
1602
if (this.hasListeners) {
1603
this.unhook(e);
1604
}
1605
1606
const idx = this.events.indexOf(e);
1607
this.events.splice(idx, 1);
1608
};
1609
1610
return toDisposable(createSingleCallFunction(dispose));
1611
}
1612
1613
private onFirstListenerAdd(): void {
1614
this.hasListeners = true;
1615
this.events.forEach(e => this.hook(e));
1616
}
1617
1618
private onLastListenerRemove(): void {
1619
this.hasListeners = false;
1620
this.events.forEach(e => this.unhook(e));
1621
}
1622
1623
private hook(e: { event: Event<T>; listener: IDisposable | null }): void {
1624
e.listener = e.event(r => this.emitter.fire(r));
1625
}
1626
1627
private unhook(e: { event: Event<T>; listener: IDisposable | null }): void {
1628
e.listener?.dispose();
1629
e.listener = null;
1630
}
1631
1632
dispose(): void {
1633
this.emitter.dispose();
1634
1635
for (const e of this.events) {
1636
e.listener?.dispose();
1637
}
1638
this.events = [];
1639
}
1640
}
1641
1642
export interface IDynamicListEventMultiplexer<TEventType> extends IDisposable {
1643
readonly event: Event<TEventType>;
1644
}
1645
export class DynamicListEventMultiplexer<TItem, TEventType> implements IDynamicListEventMultiplexer<TEventType> {
1646
private readonly _store = new DisposableStore();
1647
1648
readonly event: Event<TEventType>;
1649
1650
constructor(
1651
items: TItem[],
1652
onAddItem: Event<TItem>,
1653
onRemoveItem: Event<TItem>,
1654
getEvent: (item: TItem) => Event<TEventType>
1655
) {
1656
const multiplexer = this._store.add(new EventMultiplexer<TEventType>());
1657
const itemListeners = this._store.add(new DisposableMap<TItem, IDisposable>());
1658
1659
function addItem(instance: TItem) {
1660
itemListeners.set(instance, multiplexer.add(getEvent(instance)));
1661
}
1662
1663
// Existing items
1664
for (const instance of items) {
1665
addItem(instance);
1666
}
1667
1668
// Added items
1669
this._store.add(onAddItem(instance => {
1670
addItem(instance);
1671
}));
1672
1673
// Removed items
1674
this._store.add(onRemoveItem(instance => {
1675
itemListeners.deleteAndDispose(instance);
1676
}));
1677
1678
this.event = multiplexer.event;
1679
}
1680
1681
dispose() {
1682
this._store.dispose();
1683
}
1684
}
1685
1686
/**
1687
* The EventBufferer is useful in situations in which you want
1688
* to delay firing your events during some code.
1689
* You can wrap that code and be sure that the event will not
1690
* be fired during that wrap.
1691
*
1692
* ```
1693
* const emitter: Emitter;
1694
* const delayer = new EventDelayer();
1695
* const delayedEvent = delayer.wrapEvent(emitter.event);
1696
*
1697
* delayedEvent(console.log);
1698
*
1699
* delayer.bufferEvents(() => {
1700
* emitter.fire(); // event will not be fired yet
1701
* });
1702
*
1703
* // event will only be fired at this point
1704
* ```
1705
*/
1706
export class EventBufferer {
1707
1708
private data: { buffers: Function[] }[] = [];
1709
1710
wrapEvent<T>(event: Event<T>): Event<T>;
1711
wrapEvent<T>(event: Event<T>, reduce: (last: T | undefined, event: T) => T): Event<T>;
1712
wrapEvent<T, O>(event: Event<T>, reduce: (last: O | undefined, event: T) => O, initial: O): Event<O>;
1713
wrapEvent<T, O>(event: Event<T>, reduce?: (last: T | O | undefined, event: T) => T | O, initial?: O): Event<O | T> {
1714
return (listener, thisArgs?, disposables?) => {
1715
return event(i => {
1716
const data = this.data[this.data.length - 1];
1717
1718
// Non-reduce scenario
1719
if (!reduce) {
1720
// Buffering case
1721
if (data) {
1722
data.buffers.push(() => listener.call(thisArgs, i));
1723
} else {
1724
// Not buffering case
1725
listener.call(thisArgs, i);
1726
}
1727
return;
1728
}
1729
1730
// Reduce scenario
1731
const reduceData = data as typeof data & {
1732
/**
1733
* The accumulated items that will be reduced.
1734
*/
1735
items?: T[];
1736
/**
1737
* The reduced result cached to be shared with other listeners.
1738
*/
1739
reducedResult?: T | O;
1740
};
1741
1742
// Not buffering case
1743
if (!reduceData) {
1744
// TODO: Is there a way to cache this reduce call for all listeners?
1745
listener.call(thisArgs, reduce(initial, i));
1746
return;
1747
}
1748
1749
// Buffering case
1750
reduceData.items ??= [];
1751
reduceData.items.push(i);
1752
if (reduceData.buffers.length === 0) {
1753
// Include a single buffered function that will reduce all events when we're done buffering events
1754
data.buffers.push(() => {
1755
// cache the reduced result so that the value can be shared across all listeners
1756
reduceData.reducedResult ??= initial
1757
? reduceData.items!.reduce(reduce as (last: O | undefined, event: T) => O, initial)
1758
: reduceData.items!.reduce(reduce as (last: T | undefined, event: T) => T);
1759
listener.call(thisArgs, reduceData.reducedResult);
1760
});
1761
}
1762
}, undefined, disposables);
1763
};
1764
}
1765
1766
bufferEvents<R = void>(fn: () => R): R {
1767
const data = { buffers: new Array<Function>() };
1768
this.data.push(data);
1769
const r = fn();
1770
this.data.pop();
1771
data.buffers.forEach(flush => flush());
1772
return r;
1773
}
1774
}
1775
1776
/**
1777
* A Relay is an event forwarder which functions as a replugabble event pipe.
1778
* Once created, you can connect an input event to it and it will simply forward
1779
* events from that input event through its own `event` property. The `input`
1780
* can be changed at any point in time.
1781
*/
1782
export class Relay<T> implements IDisposable {
1783
1784
private listening = false;
1785
private inputEvent: Event<T> = Event.None;
1786
private inputEventListener: IDisposable = Disposable.None;
1787
1788
private readonly emitter = new Emitter<T>({
1789
onDidAddFirstListener: () => {
1790
this.listening = true;
1791
this.inputEventListener = this.inputEvent(this.emitter.fire, this.emitter);
1792
},
1793
onDidRemoveLastListener: () => {
1794
this.listening = false;
1795
this.inputEventListener.dispose();
1796
}
1797
});
1798
1799
readonly event: Event<T> = this.emitter.event;
1800
1801
set input(event: Event<T>) {
1802
this.inputEvent = event;
1803
1804
if (this.listening) {
1805
this.inputEventListener.dispose();
1806
this.inputEventListener = event(this.emitter.fire, this.emitter);
1807
}
1808
}
1809
1810
dispose() {
1811
this.inputEventListener.dispose();
1812
this.emitter.dispose();
1813
}
1814
}
1815
1816
export interface IValueWithChangeEvent<T> {
1817
readonly onDidChange: Event<void>;
1818
get value(): T;
1819
}
1820
1821
export class ValueWithChangeEvent<T> implements IValueWithChangeEvent<T> {
1822
public static const<T>(value: T): IValueWithChangeEvent<T> {
1823
return new ConstValueWithChangeEvent(value);
1824
}
1825
1826
private readonly _onDidChange = new Emitter<void>();
1827
readonly onDidChange: Event<void> = this._onDidChange.event;
1828
1829
constructor(private _value: T) { }
1830
1831
get value(): T {
1832
return this._value;
1833
}
1834
1835
set value(value: T) {
1836
if (value !== this._value) {
1837
this._value = value;
1838
this._onDidChange.fire(undefined);
1839
}
1840
}
1841
}
1842
1843
class ConstValueWithChangeEvent<T> implements IValueWithChangeEvent<T> {
1844
public readonly onDidChange: Event<void> = Event.None;
1845
1846
constructor(readonly value: T) { }
1847
}
1848
1849
/**
1850
* @param handleItem Is called for each item in the set (but only the first time the item is seen in the set).
1851
* The returned disposable is disposed if the item is no longer in the set.
1852
*/
1853
export function trackSetChanges<T>(getData: () => ReadonlySet<T>, onDidChangeData: Event<unknown>, handleItem: (d: T) => IDisposable): IDisposable {
1854
const map = new DisposableMap<T, IDisposable>();
1855
let oldData = new Set(getData());
1856
for (const d of oldData) {
1857
map.set(d, handleItem(d));
1858
}
1859
1860
const store = new DisposableStore();
1861
store.add(onDidChangeData(() => {
1862
const newData = getData();
1863
const diff = diffSets(oldData, newData);
1864
for (const r of diff.removed) {
1865
map.deleteAndDispose(r);
1866
}
1867
for (const a of diff.added) {
1868
map.set(a, handleItem(a));
1869
}
1870
oldData = new Set(newData);
1871
}));
1872
store.add(map);
1873
return store;
1874
}
1875
1876
1877
function addToDisposables(result: IDisposable, disposables: DisposableStore | IDisposable[] | undefined) {
1878
if (disposables instanceof DisposableStore) {
1879
disposables.add(result);
1880
} else if (Array.isArray(disposables)) {
1881
disposables.push(result);
1882
}
1883
}
1884
1885
function disposeAndRemove(result: IDisposable, disposables: DisposableStore | IDisposable[] | undefined) {
1886
if (disposables instanceof DisposableStore) {
1887
disposables.delete(result);
1888
} else if (Array.isArray(disposables)) {
1889
const index = disposables.indexOf(result);
1890
if (index !== -1) {
1891
disposables.splice(index, 1);
1892
}
1893
}
1894
result.dispose();
1895
}
1896
1897