Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/extensions/copilot/src/util/vs/base/common/event.ts
13405 views
1
//!!! DO NOT modify, this file was COPIED from 'microsoft/vscode'
2
3
/*---------------------------------------------------------------------------------------------
4
* Copyright (c) Microsoft Corporation. All rights reserved.
5
* Licensed under the MIT License. See License.txt in the project root for license information.
6
*--------------------------------------------------------------------------------------------*/
7
8
import { CancelablePromise } from './async';
9
import { CancellationToken } from './cancellation';
10
import { diffSets } from './collections';
11
import { onUnexpectedError } from './errors';
12
import { createSingleCallFunction } from './functional';
13
import { combinedDisposable, Disposable, DisposableMap, DisposableStore, IDisposable, toDisposable } from './lifecycle';
14
import { LinkedList } from './linkedList';
15
import { IObservable, IObservableWithChange, IObserver } from './observable';
16
import { StopWatch } from './stopwatch';
17
import { MicrotaskDelay } from './symbols';
18
19
20
// -----------------------------------------------------------------------------------------------------------------------
21
// Uncomment the next line to print warnings whenever an emitter with listeners is disposed. That is a sign of code smell.
22
// -----------------------------------------------------------------------------------------------------------------------
23
const _enableDisposeWithListenerWarning = false
24
// || Boolean("TRUE") // causes a linter warning so that it cannot be pushed
25
;
26
27
28
// -----------------------------------------------------------------------------------------------------------------------
29
// Uncomment the next line to print warnings whenever a snapshotted event is used repeatedly without cleanup.
30
// See https://github.com/microsoft/vscode/issues/142851
31
// -----------------------------------------------------------------------------------------------------------------------
32
const _enableSnapshotPotentialLeakWarning = false
33
// || Boolean("TRUE") // causes a linter warning so that it cannot be pushed
34
;
35
36
/**
37
* An event with zero or one parameters that can be subscribed to. The event is a function itself.
38
*/
39
export interface Event<T> {
40
(listener: (e: T) => unknown, thisArgs?: any, disposables?: IDisposable[] | DisposableStore): IDisposable;
41
}
42
43
export namespace Event {
44
export const None: Event<any> = () => Disposable.None;
45
46
function _addLeakageTraceLogic(options: EmitterOptions) {
47
if (_enableSnapshotPotentialLeakWarning) {
48
const { onDidAddListener: origListenerDidAdd } = options;
49
const stack = Stacktrace.create();
50
let count = 0;
51
options.onDidAddListener = () => {
52
if (++count === 2) {
53
console.warn('snapshotted emitter LIKELY used public and SHOULD HAVE BEEN created with DisposableStore. snapshotted here');
54
stack.print();
55
}
56
origListenerDidAdd?.();
57
};
58
}
59
}
60
61
/**
62
* Given an event, returns another event which debounces calls and defers the listeners to a later task via a shared
63
* `setTimeout`. The event is converted into a signal (`Event<void>`) to avoid additional object creation as a
64
* result of merging events and to try prevent race conditions that could arise when using related deferred and
65
* non-deferred events.
66
*
67
* This is useful for deferring non-critical work (eg. general UI updates) to ensure it does not block critical work
68
* (eg. latency of keypress to text rendered).
69
*
70
* *NOTE* that this function returns an `Event` and it MUST be called with a `DisposableStore` whenever the returned
71
* event is accessible to "third parties", e.g the event is a public property. Otherwise a leaked listener on the
72
* returned event causes this utility to leak a listener on the original event.
73
*
74
* @param event The event source for the new event.
75
* @param flushOnListenerRemove Whether to fire all debounced events when a listener is removed. If this is not
76
* specified, some events could go missing. Use this if it's important that all events are processed, even if the
77
* listener gets disposed before the debounced event fires.
78
* @param disposable A disposable store to add the new EventEmitter to.
79
*/
80
export function defer(event: Event<unknown>, flushOnListenerRemove?: boolean, disposable?: DisposableStore): Event<void> {
81
return debounce<unknown, void>(event, () => void 0, 0, undefined, flushOnListenerRemove ?? true, undefined, disposable);
82
}
83
84
/**
85
* Given an event, returns another event which only fires once.
86
*
87
* @param event The event source for the new event.
88
*/
89
export function once<T>(event: Event<T>): Event<T> {
90
return (listener, thisArgs = null, disposables?) => {
91
// we need this, in case the event fires during the listener call
92
let didFire = false;
93
let result: IDisposable | undefined = undefined;
94
result = event(e => {
95
if (didFire) {
96
return;
97
} else if (result) {
98
result.dispose();
99
} else {
100
didFire = true;
101
}
102
103
return listener.call(thisArgs, e);
104
}, null, disposables);
105
106
if (didFire) {
107
result.dispose();
108
}
109
110
return result;
111
};
112
}
113
114
/**
115
* Given an event, returns another event which only fires once, and only when the condition is met.
116
*
117
* @param event The event source for the new event.
118
*/
119
export function onceIf<T>(event: Event<T>, condition: (e: T) => boolean): Event<T> {
120
return Event.once(Event.filter(event, condition));
121
}
122
123
/**
124
* Maps an event of one type into an event of another type using a mapping function, similar to how
125
* `Array.prototype.map` works.
126
*
127
* *NOTE* that this function returns an `Event` and it MUST be called with a `DisposableStore` whenever the returned
128
* event is accessible to "third parties", e.g the event is a public property. Otherwise a leaked listener on the
129
* returned event causes this utility to leak a listener on the original event.
130
*
131
* @param event The event source for the new event.
132
* @param map The mapping function.
133
* @param disposable A disposable store to add the new EventEmitter to.
134
*/
135
export function map<I, O>(event: Event<I>, map: (i: I) => O, disposable?: DisposableStore): Event<O> {
136
return snapshot((listener, thisArgs = null, disposables?) => event(i => listener.call(thisArgs, map(i)), null, disposables), disposable);
137
}
138
139
/**
140
* Wraps an event in another event that performs some function on the event object before firing.
141
*
142
* *NOTE* that this function returns an `Event` and it MUST be called with a `DisposableStore` whenever the returned
143
* event is accessible to "third parties", e.g the event is a public property. Otherwise a leaked listener on the
144
* returned event causes this utility to leak a listener on the original event.
145
*
146
* @param event The event source for the new event.
147
* @param each The function to perform on the event object.
148
* @param disposable A disposable store to add the new EventEmitter to.
149
*/
150
export function forEach<I>(event: Event<I>, each: (i: I) => void, disposable?: DisposableStore): Event<I> {
151
return snapshot((listener, thisArgs = null, disposables?) => event(i => { each(i); listener.call(thisArgs, i); }, null, disposables), disposable);
152
}
153
154
/**
155
* Wraps an event in another event that fires only when some condition is met.
156
*
157
* *NOTE* that this function returns an `Event` and it MUST be called with a `DisposableStore` whenever the returned
158
* event is accessible to "third parties", e.g the event is a public property. Otherwise a leaked listener on the
159
* returned event causes this utility to leak a listener on the original event.
160
*
161
* @param event The event source for the new event.
162
* @param filter The filter function that defines the condition. The event will fire for the object if this function
163
* returns true.
164
* @param disposable A disposable store to add the new EventEmitter to.
165
*/
166
export function filter<T, U>(event: Event<T | U>, filter: (e: T | U) => e is T, disposable?: DisposableStore): Event<T>;
167
export function filter<T>(event: Event<T>, filter: (e: T) => boolean, disposable?: DisposableStore): Event<T>;
168
export function filter<T, R>(event: Event<T | R>, filter: (e: T | R) => e is R, disposable?: DisposableStore): Event<R>;
169
export function filter<T>(event: Event<T>, filter: (e: T) => boolean, disposable?: DisposableStore): Event<T> {
170
return snapshot((listener, thisArgs = null, disposables?) => event(e => filter(e) && listener.call(thisArgs, e), null, disposables), disposable);
171
}
172
173
/**
174
* Given an event, returns the same event but typed as `Event<void>`.
175
*/
176
export function signal<T>(event: Event<T>): Event<void> {
177
return event as Event<any> as Event<void>;
178
}
179
180
/**
181
* Given a collection of events, returns a single event which emits whenever any of the provided events emit.
182
*/
183
export function any<T>(...events: Event<T>[]): Event<T>;
184
export function any(...events: Event<any>[]): Event<void>;
185
export function any<T>(...events: Event<T>[]): Event<T> {
186
return (listener, thisArgs = null, disposables?) => {
187
const disposable = combinedDisposable(...events.map(event => event(e => listener.call(thisArgs, e))));
188
return addAndReturnDisposable(disposable, disposables);
189
};
190
}
191
192
/**
193
* *NOTE* that this function returns an `Event` and it MUST be called with a `DisposableStore` whenever the returned
194
* event is accessible to "third parties", e.g the event is a public property. Otherwise a leaked listener on the
195
* returned event causes this utility to leak a listener on the original event.
196
*/
197
export function reduce<I, O>(event: Event<I>, merge: (last: O | undefined, event: I) => O, initial?: O, disposable?: DisposableStore): Event<O> {
198
let output: O | undefined = initial;
199
200
return map<I, O>(event, e => {
201
output = merge(output, e);
202
return output;
203
}, disposable);
204
}
205
206
function snapshot<T>(event: Event<T>, disposable: DisposableStore | undefined): Event<T> {
207
let listener: IDisposable | undefined;
208
209
const options: EmitterOptions | undefined = {
210
onWillAddFirstListener() {
211
listener = event(emitter.fire, emitter);
212
},
213
onDidRemoveLastListener() {
214
listener?.dispose();
215
}
216
};
217
218
if (!disposable) {
219
_addLeakageTraceLogic(options);
220
}
221
222
const emitter = new Emitter<T>(options);
223
224
disposable?.add(emitter);
225
226
return emitter.event;
227
}
228
229
/**
230
* Adds the IDisposable to the store if it's set, and returns it. Useful to
231
* Event function implementation.
232
*/
233
function addAndReturnDisposable<T extends IDisposable>(d: T, store: DisposableStore | IDisposable[] | undefined): T {
234
if (store instanceof Array) {
235
store.push(d);
236
} else if (store) {
237
store.add(d);
238
}
239
return d;
240
}
241
242
/**
243
* Given an event, creates a new emitter that event that will debounce events based on {@link delay} and give an
244
* array event object of all events that fired.
245
*
246
* *NOTE* that this function returns an `Event` and it MUST be called with a `DisposableStore` whenever the returned
247
* event is accessible to "third parties", e.g the event is a public property. Otherwise a leaked listener on the
248
* returned event causes this utility to leak a listener on the original event.
249
*
250
* @param event The original event to debounce.
251
* @param merge A function that reduces all events into a single event.
252
* @param delay The number of milliseconds to debounce.
253
* @param leading Whether to fire a leading event without debouncing.
254
* @param flushOnListenerRemove Whether to fire all debounced events when a listener is removed. If this is not
255
* specified, some events could go missing. Use this if it's important that all events are processed, even if the
256
* listener gets disposed before the debounced event fires.
257
* @param leakWarningThreshold See {@link EmitterOptions.leakWarningThreshold}.
258
* @param disposable A disposable store to register the debounce emitter to.
259
*/
260
export function debounce<T>(event: Event<T>, merge: (last: T | undefined, event: T) => T, delay?: number | typeof MicrotaskDelay, leading?: boolean, flushOnListenerRemove?: boolean, leakWarningThreshold?: number, disposable?: DisposableStore): Event<T>;
261
export function debounce<I, O>(event: Event<I>, merge: (last: O | undefined, event: I) => O, delay?: number | typeof MicrotaskDelay, leading?: boolean, flushOnListenerRemove?: boolean, leakWarningThreshold?: number, disposable?: DisposableStore): Event<O>;
262
export function debounce<I, O>(event: Event<I>, merge: (last: O | undefined, event: I) => O, delay: number | typeof MicrotaskDelay = 100, leading = false, flushOnListenerRemove = false, leakWarningThreshold?: number, disposable?: DisposableStore): Event<O> {
263
let subscription: IDisposable;
264
let output: O | undefined = undefined;
265
let handle: Timeout | undefined | null = undefined;
266
let numDebouncedCalls = 0;
267
let doFire: (() => void) | undefined;
268
269
const options: EmitterOptions | undefined = {
270
leakWarningThreshold,
271
onWillAddFirstListener() {
272
subscription = event(cur => {
273
numDebouncedCalls++;
274
output = merge(output, cur);
275
276
if (leading && !handle) {
277
emitter.fire(output);
278
output = undefined;
279
}
280
281
doFire = () => {
282
const _output = output;
283
output = undefined;
284
handle = undefined;
285
if (!leading || numDebouncedCalls > 1) {
286
emitter.fire(_output!);
287
}
288
numDebouncedCalls = 0;
289
};
290
291
if (typeof delay === 'number') {
292
if (handle) {
293
clearTimeout(handle);
294
}
295
handle = setTimeout(doFire, delay);
296
} else {
297
if (handle === undefined) {
298
handle = null;
299
queueMicrotask(doFire);
300
}
301
}
302
});
303
},
304
onWillRemoveListener() {
305
if (flushOnListenerRemove && numDebouncedCalls > 0) {
306
doFire?.();
307
}
308
},
309
onDidRemoveLastListener() {
310
doFire = undefined;
311
subscription.dispose();
312
}
313
};
314
315
if (!disposable) {
316
_addLeakageTraceLogic(options);
317
}
318
319
const emitter = new Emitter<O>(options);
320
321
disposable?.add(emitter);
322
323
return emitter.event;
324
}
325
326
/**
327
* Debounces an event, firing after some delay (default=0) with an array of all event original objects.
328
*
329
* *NOTE* that this function returns an `Event` and it MUST be called with a `DisposableStore` whenever the returned
330
* event is accessible to "third parties", e.g the event is a public property. Otherwise a leaked listener on the
331
* returned event causes this utility to leak a listener on the original event.
332
*
333
* @param event The event source for the new event.
334
* @param delay The number of milliseconds to debounce.
335
* @param flushOnListenerRemove Whether to fire all debounced events when a listener is removed. If this is not
336
* specified, some events could go missing. Use this if it's important that all events are processed, even if the
337
* listener gets disposed before the debounced event fires.
338
* @param disposable A disposable store to add the new EventEmitter to.
339
*/
340
export function accumulate<T>(event: Event<T>, delay: number | typeof MicrotaskDelay = 0, flushOnListenerRemove?: boolean, disposable?: DisposableStore): Event<T[]> {
341
return Event.debounce<T, T[]>(event, (last, e) => {
342
if (!last) {
343
return [e];
344
}
345
last.push(e);
346
return last;
347
}, delay, undefined, flushOnListenerRemove ?? true, undefined, disposable);
348
}
349
350
/**
351
* Throttles an event, ensuring the event is fired at most once during the specified delay period.
352
* Unlike debounce, throttle will fire immediately on the leading edge and/or after the delay on the trailing edge.
353
*
354
* *NOTE* that this function returns an `Event` and it MUST be called with a `DisposableStore` whenever the returned
355
* event is accessible to "third parties", e.g the event is a public property. Otherwise a leaked listener on the
356
* returned event causes this utility to leak a listener on the original event.
357
*
358
* @param event The event source for the new event.
359
* @param merge An accumulator function that merges events if multiple occur during the throttle period.
360
* @param delay The number of milliseconds to throttle.
361
* @param leading Whether to fire on the leading edge (immediately on first event).
362
* @param trailing Whether to fire on the trailing edge (after delay with the last value).
363
* @param leakWarningThreshold See {@link EmitterOptions.leakWarningThreshold}.
364
* @param disposable A disposable store to register the throttle emitter to.
365
*/
366
export function throttle<T>(event: Event<T>, merge: (last: T | undefined, event: T) => T, delay?: number | typeof MicrotaskDelay, leading?: boolean, trailing?: boolean, leakWarningThreshold?: number, disposable?: DisposableStore): Event<T>;
367
export function throttle<I, O>(event: Event<I>, merge: (last: O | undefined, event: I) => O, delay?: number | typeof MicrotaskDelay, leading?: boolean, trailing?: boolean, leakWarningThreshold?: number, disposable?: DisposableStore): Event<O>;
368
export function throttle<I, O>(event: Event<I>, merge: (last: O | undefined, event: I) => O, delay: number | typeof MicrotaskDelay = 100, leading = true, trailing = true, leakWarningThreshold?: number, disposable?: DisposableStore): Event<O> {
369
let subscription: IDisposable;
370
let output: O | undefined = undefined;
371
let handle: Timeout | undefined = undefined;
372
let numThrottledCalls = 0;
373
374
const options: EmitterOptions | undefined = {
375
leakWarningThreshold,
376
onWillAddFirstListener() {
377
subscription = event(cur => {
378
numThrottledCalls++;
379
output = merge(output, cur);
380
381
// If not currently throttling, fire immediately if leading is enabled
382
if (handle === undefined) {
383
if (leading) {
384
emitter.fire(output);
385
output = undefined;
386
numThrottledCalls = 0;
387
}
388
389
// Set up the throttle period
390
if (typeof delay === 'number') {
391
handle = setTimeout(() => {
392
// Fire on trailing edge if there were calls during throttle period
393
if (trailing && numThrottledCalls > 0) {
394
emitter.fire(output!);
395
}
396
output = undefined;
397
handle = undefined;
398
numThrottledCalls = 0;
399
}, delay);
400
} else {
401
// Use a special marker to indicate microtask is pending
402
handle = 0 as unknown as Timeout;
403
queueMicrotask(() => {
404
// Fire on trailing edge if there were calls during throttle period
405
if (trailing && numThrottledCalls > 0) {
406
emitter.fire(output!);
407
}
408
output = undefined;
409
handle = undefined;
410
numThrottledCalls = 0;
411
});
412
}
413
}
414
// If already throttling, just accumulate the value for trailing edge
415
});
416
},
417
onDidRemoveLastListener() {
418
subscription.dispose();
419
}
420
};
421
422
if (!disposable) {
423
_addLeakageTraceLogic(options);
424
}
425
426
const emitter = new Emitter<O>(options);
427
428
disposable?.add(emitter);
429
430
return emitter.event;
431
}
432
433
/**
434
* Filters an event such that some condition is _not_ met more than once in a row, effectively ensuring duplicate
435
* event objects from different sources do not fire the same event object.
436
*
437
* *NOTE* that this function returns an `Event` and it MUST be called with a `DisposableStore` whenever the returned
438
* event is accessible to "third parties", e.g the event is a public property. Otherwise a leaked listener on the
439
* returned event causes this utility to leak a listener on the original event.
440
*
441
* @param event The event source for the new event.
442
* @param equals The equality condition.
443
* @param disposable A disposable store to add the new EventEmitter to.
444
*
445
* @example
446
* ```
447
* // Fire only one time when a single window is opened or focused
448
* Event.latch(Event.any(onDidOpenWindow, onDidFocusWindow))
449
* ```
450
*/
451
export function latch<T>(event: Event<T>, equals: (a: T, b: T) => boolean = (a, b) => a === b, disposable?: DisposableStore): Event<T> {
452
let firstCall = true;
453
let cache: T;
454
455
return filter(event, value => {
456
const shouldEmit = firstCall || !equals(value, cache);
457
firstCall = false;
458
cache = value;
459
return shouldEmit;
460
}, disposable);
461
}
462
463
/**
464
* Splits an event whose parameter is a union type into 2 separate events for each type in the union.
465
*
466
* *NOTE* that this function returns an `Event` and it MUST be called with a `DisposableStore` whenever the returned
467
* event is accessible to "third parties", e.g the event is a public property. Otherwise a leaked listener on the
468
* returned event causes this utility to leak a listener on the original event.
469
*
470
* @example
471
* ```
472
* const event = new EventEmitter<number | undefined>().event;
473
* const [numberEvent, undefinedEvent] = Event.split(event, isUndefined);
474
* ```
475
*
476
* @param event The event source for the new event.
477
* @param isT A function that determines what event is of the first type.
478
* @param disposable A disposable store to add the new EventEmitter to.
479
*/
480
export function split<T, U>(event: Event<T | U>, isT: (e: T | U) => e is T, disposable?: DisposableStore): [Event<T>, Event<U>] {
481
return [
482
Event.filter(event, isT, disposable),
483
Event.filter(event, e => !isT(e), disposable) as Event<U>,
484
];
485
}
486
487
/**
488
* Buffers an event until it has a listener attached.
489
*
490
* *NOTE* that this function returns an `Event` and it MUST be called with a `DisposableStore` whenever the returned
491
* event is accessible to "third parties", e.g the event is a public property. Otherwise a leaked listener on the
492
* returned event causes this utility to leak a listener on the original event.
493
*
494
* @param event The event source for the new event.
495
* @param flushAfterTimeout Determines whether to flush the buffer after a timeout immediately or after a
496
* `setTimeout` when the first event listener is added.
497
* @param _buffer Internal: A source event array used for tests.
498
*
499
* @example
500
* ```
501
* // Start accumulating events, when the first listener is attached, flush
502
* // the event after a timeout such that multiple listeners attached before
503
* // the timeout would receive the event
504
* this.onInstallExtension = Event.buffer(service.onInstallExtension, true);
505
* ```
506
*/
507
export function buffer<T>(event: Event<T>, flushAfterTimeout = false, _buffer: T[] = [], disposable?: DisposableStore): Event<T> {
508
let buffer: T[] | null = _buffer.slice();
509
510
let listener: IDisposable | null = event(e => {
511
if (buffer) {
512
buffer.push(e);
513
} else {
514
emitter.fire(e);
515
}
516
});
517
518
if (disposable) {
519
disposable.add(listener);
520
}
521
522
const flush = () => {
523
buffer?.forEach(e => emitter.fire(e));
524
buffer = null;
525
};
526
527
const emitter = new Emitter<T>({
528
onWillAddFirstListener() {
529
if (!listener) {
530
listener = event(e => emitter.fire(e));
531
if (disposable) {
532
disposable.add(listener);
533
}
534
}
535
},
536
537
onDidAddFirstListener() {
538
if (buffer) {
539
if (flushAfterTimeout) {
540
setTimeout(flush);
541
} else {
542
flush();
543
}
544
}
545
},
546
547
onDidRemoveLastListener() {
548
if (listener) {
549
listener.dispose();
550
}
551
listener = null;
552
}
553
});
554
555
if (disposable) {
556
disposable.add(emitter);
557
}
558
559
return emitter.event;
560
}
561
/**
562
* Wraps the event in an {@link IChainableEvent}, allowing a more functional programming style.
563
*
564
* @example
565
* ```
566
* // Normal
567
* const onEnterPressNormal = Event.filter(
568
* Event.map(onKeyPress.event, e => new StandardKeyboardEvent(e)),
569
* e.keyCode === KeyCode.Enter
570
* ).event;
571
*
572
* // Using chain
573
* const onEnterPressChain = Event.chain(onKeyPress.event, $ => $
574
* .map(e => new StandardKeyboardEvent(e))
575
* .filter(e => e.keyCode === KeyCode.Enter)
576
* );
577
* ```
578
*/
579
export function chain<T, R>(event: Event<T>, sythensize: ($: IChainableSythensis<T>) => IChainableSythensis<R>): Event<R> {
580
const fn: Event<R> = (listener, thisArgs, disposables) => {
581
const cs = sythensize(new ChainableSynthesis()) as ChainableSynthesis;
582
return event(function (value) {
583
const result = cs.evaluate(value);
584
if (result !== HaltChainable) {
585
listener.call(thisArgs, result);
586
}
587
}, undefined, disposables);
588
};
589
590
return fn;
591
}
592
593
const HaltChainable = Symbol('HaltChainable');
594
595
class ChainableSynthesis implements IChainableSythensis<any> {
596
private readonly steps: ((input: any) => unknown)[] = [];
597
598
map<O>(fn: (i: any) => O): this {
599
this.steps.push(fn);
600
return this;
601
}
602
603
forEach(fn: (i: any) => void): this {
604
this.steps.push(v => {
605
fn(v);
606
return v;
607
});
608
return this;
609
}
610
611
filter(fn: (e: any) => boolean): this {
612
this.steps.push(v => fn(v) ? v : HaltChainable);
613
return this;
614
}
615
616
reduce<R>(merge: (last: R | undefined, event: any) => R, initial?: R | undefined): this {
617
let last = initial;
618
this.steps.push(v => {
619
last = merge(last, v);
620
return last;
621
});
622
return this;
623
}
624
625
latch(equals: (a: any, b: any) => boolean = (a, b) => a === b): ChainableSynthesis {
626
let firstCall = true;
627
let cache: any;
628
this.steps.push(value => {
629
const shouldEmit = firstCall || !equals(value, cache);
630
firstCall = false;
631
cache = value;
632
return shouldEmit ? value : HaltChainable;
633
});
634
635
return this;
636
}
637
638
public evaluate(value: any) {
639
for (const step of this.steps) {
640
value = step(value);
641
if (value === HaltChainable) {
642
break;
643
}
644
}
645
646
return value;
647
}
648
}
649
650
export interface IChainableSythensis<T> {
651
map<O>(fn: (i: T) => O): IChainableSythensis<O>;
652
forEach(fn: (i: T) => void): IChainableSythensis<T>;
653
filter<R extends T>(fn: (e: T) => e is R): IChainableSythensis<R>;
654
filter(fn: (e: T) => boolean): IChainableSythensis<T>;
655
reduce<R>(merge: (last: R, event: T) => R, initial: R): IChainableSythensis<R>;
656
reduce<R>(merge: (last: R | undefined, event: T) => R): IChainableSythensis<R>;
657
latch(equals?: (a: T, b: T) => boolean): IChainableSythensis<T>;
658
}
659
660
export interface NodeEventEmitter {
661
on(event: string | symbol, listener: Function): unknown;
662
removeListener(event: string | symbol, listener: Function): unknown;
663
}
664
665
/**
666
* Creates an {@link Event} from a node event emitter.
667
*/
668
export function fromNodeEventEmitter<T>(emitter: NodeEventEmitter, eventName: string, map: (...args: any[]) => T = id => id): Event<T> {
669
const fn = (...args: any[]) => result.fire(map(...args));
670
const onFirstListenerAdd = () => emitter.on(eventName, fn);
671
const onLastListenerRemove = () => emitter.removeListener(eventName, fn);
672
const result = new Emitter<T>({ onWillAddFirstListener: onFirstListenerAdd, onDidRemoveLastListener: onLastListenerRemove });
673
674
return result.event;
675
}
676
677
export interface DOMEventEmitter {
678
addEventListener(event: string | symbol, listener: Function): void;
679
removeEventListener(event: string | symbol, listener: Function): void;
680
}
681
682
/**
683
* Creates an {@link Event} from a DOM event emitter.
684
*/
685
export function fromDOMEventEmitter<T>(emitter: DOMEventEmitter, eventName: string, map: (...args: any[]) => T = id => id): Event<T> {
686
const fn = (...args: any[]) => result.fire(map(...args));
687
const onFirstListenerAdd = () => emitter.addEventListener(eventName, fn);
688
const onLastListenerRemove = () => emitter.removeEventListener(eventName, fn);
689
const result = new Emitter<T>({ onWillAddFirstListener: onFirstListenerAdd, onDidRemoveLastListener: onLastListenerRemove });
690
691
return result.event;
692
}
693
694
/**
695
* Creates a promise out of an event, using the {@link Event.once} helper.
696
*/
697
export function toPromise<T>(event: Event<T>, disposables?: IDisposable[] | DisposableStore): CancelablePromise<T> {
698
let cancelRef: () => void;
699
let listener: IDisposable;
700
const promise = new Promise((resolve) => {
701
listener = once(event)(resolve);
702
addToDisposables(listener, disposables);
703
704
// not resolved, matching the behavior of a normal disposal
705
cancelRef = () => {
706
disposeAndRemove(listener, disposables);
707
};
708
}) as CancelablePromise<T>;
709
promise.cancel = cancelRef!;
710
711
if (disposables) {
712
promise.finally(() => disposeAndRemove(listener, disposables));
713
}
714
715
return promise;
716
}
717
718
/**
719
* A convenience function for forwarding an event to another emitter which
720
* improves readability.
721
*
722
* This is similar to {@link Relay} but allows instantiating and forwarding
723
* on a single line and also allows for multiple source events.
724
* @param from The event to forward.
725
* @param to The emitter to forward the event to.
726
* @example
727
* Event.forward(event, emitter);
728
* // equivalent to
729
* event(e => emitter.fire(e));
730
* // equivalent to
731
* event(emitter.fire, emitter);
732
*/
733
export function forward<T>(from: Event<T>, to: Emitter<T>): IDisposable {
734
return from(e => to.fire(e));
735
}
736
737
/**
738
* Adds a listener to an event and calls the listener immediately with undefined as the event object.
739
*
740
* @example
741
* ```
742
* // Initialize the UI and update it when dataChangeEvent fires
743
* runAndSubscribe(dataChangeEvent, () => this._updateUI());
744
* ```
745
*/
746
export function runAndSubscribe<T>(event: Event<T>, handler: (e: T) => unknown, initial: T): IDisposable;
747
export function runAndSubscribe<T>(event: Event<T>, handler: (e: T | undefined) => unknown): IDisposable;
748
export function runAndSubscribe<T>(event: Event<T>, handler: (e: T | undefined) => unknown, initial?: T): IDisposable {
749
handler(initial);
750
return event(e => handler(e));
751
}
752
753
class EmitterObserver<T> implements IObserver {
754
755
readonly emitter: Emitter<T>;
756
757
private _counter = 0;
758
private _hasChanged = false;
759
760
constructor(readonly _observable: IObservable<T>, store: DisposableStore | undefined) {
761
const options: EmitterOptions = {
762
onWillAddFirstListener: () => {
763
_observable.addObserver(this);
764
765
// Communicate to the observable that we received its current value and would like to be notified about future changes.
766
this._observable.reportChanges();
767
},
768
onDidRemoveLastListener: () => {
769
_observable.removeObserver(this);
770
}
771
};
772
if (!store) {
773
_addLeakageTraceLogic(options);
774
}
775
this.emitter = new Emitter<T>(options);
776
if (store) {
777
store.add(this.emitter);
778
}
779
}
780
781
beginUpdate<T>(_observable: IObservable<T>): void {
782
// assert(_observable === this.obs);
783
this._counter++;
784
}
785
786
handlePossibleChange<T>(_observable: IObservable<T>): void {
787
// assert(_observable === this.obs);
788
}
789
790
handleChange<T, TChange>(_observable: IObservableWithChange<T, TChange>, _change: TChange): void {
791
// assert(_observable === this.obs);
792
this._hasChanged = true;
793
}
794
795
endUpdate<T>(_observable: IObservable<T>): void {
796
// assert(_observable === this.obs);
797
this._counter--;
798
if (this._counter === 0) {
799
this._observable.reportChanges();
800
if (this._hasChanged) {
801
this._hasChanged = false;
802
this.emitter.fire(this._observable.get());
803
}
804
}
805
}
806
}
807
808
/**
809
* Creates an event emitter that is fired when the observable changes.
810
* Each listeners subscribes to the emitter.
811
*/
812
export function fromObservable<T>(obs: IObservable<T>, store?: DisposableStore): Event<T> {
813
const observer = new EmitterObserver(obs, store);
814
return observer.emitter.event;
815
}
816
817
/**
818
* Each listener is attached to the observable directly.
819
*/
820
export function fromObservableLight(observable: IObservable<unknown>): Event<void> {
821
return (listener, thisArgs, disposables) => {
822
let count = 0;
823
let didChange = false;
824
const observer: IObserver = {
825
beginUpdate() {
826
count++;
827
},
828
endUpdate() {
829
count--;
830
if (count === 0) {
831
observable.reportChanges();
832
if (didChange) {
833
didChange = false;
834
listener.call(thisArgs);
835
}
836
}
837
},
838
handlePossibleChange() {
839
// noop
840
},
841
handleChange() {
842
didChange = true;
843
}
844
};
845
observable.addObserver(observer);
846
observable.reportChanges();
847
const disposable = {
848
dispose() {
849
observable.removeObserver(observer);
850
}
851
};
852
853
addToDisposables(disposable, disposables);
854
855
return disposable;
856
};
857
}
858
}
859
860
export interface EmitterOptions {
861
/**
862
* Optional function that's called *before* the very first listener is added
863
*/
864
onWillAddFirstListener?: Function;
865
/**
866
* Optional function that's called *after* the very first listener is added
867
*/
868
onDidAddFirstListener?: Function;
869
/**
870
* Optional function that's called after a listener is added
871
*/
872
onDidAddListener?: Function;
873
/**
874
* Optional function that's called *after* remove the very last listener
875
*/
876
onDidRemoveLastListener?: Function;
877
/**
878
* Optional function that's called *before* a listener is removed
879
*/
880
onWillRemoveListener?: Function;
881
/**
882
* Optional function that's called when a listener throws an error. Defaults to
883
* {@link onUnexpectedError}
884
*/
885
onListenerError?: (e: any) => void;
886
/**
887
* Number of listeners that are allowed before assuming a leak. Default to
888
* a globally configured value
889
*
890
* @see setGlobalLeakWarningThreshold
891
*/
892
leakWarningThreshold?: number;
893
/**
894
* Pass in a delivery queue, which is useful for ensuring
895
* in order event delivery across multiple emitters.
896
*/
897
deliveryQueue?: EventDeliveryQueue;
898
899
/** ONLY enable this during development */
900
_profName?: string;
901
}
902
903
904
export class EventProfiling {
905
906
static readonly all = new Set<EventProfiling>();
907
908
private static _idPool = 0;
909
910
readonly name: string;
911
public listenerCount: number = 0;
912
public invocationCount = 0;
913
public elapsedOverall = 0;
914
public durations: number[] = [];
915
916
private _stopWatch?: StopWatch;
917
918
constructor(name: string) {
919
this.name = `${name}_${EventProfiling._idPool++}`;
920
EventProfiling.all.add(this);
921
}
922
923
start(listenerCount: number): void {
924
this._stopWatch = new StopWatch();
925
this.listenerCount = listenerCount;
926
}
927
928
stop(): void {
929
if (this._stopWatch) {
930
const elapsed = this._stopWatch.elapsed();
931
this.durations.push(elapsed);
932
this.elapsedOverall += elapsed;
933
this.invocationCount += 1;
934
this._stopWatch = undefined;
935
}
936
}
937
}
938
939
let _globalLeakWarningThreshold = -1;
940
export function setGlobalLeakWarningThreshold(n: number): IDisposable {
941
const oldValue = _globalLeakWarningThreshold;
942
_globalLeakWarningThreshold = n;
943
return {
944
dispose() {
945
_globalLeakWarningThreshold = oldValue;
946
}
947
};
948
}
949
950
class LeakageMonitor {
951
952
private static _idPool = 1;
953
954
private _stacks: Map<string, number> | undefined;
955
private _warnCountdown: number = 0;
956
957
constructor(
958
private readonly _errorHandler: (err: Error) => void,
959
readonly threshold: number,
960
readonly name: string = (LeakageMonitor._idPool++).toString(16).padStart(3, '0')
961
) { }
962
963
dispose(): void {
964
this._stacks?.clear();
965
}
966
967
check(stack: Stacktrace, listenerCount: number): undefined | (() => void) {
968
969
const threshold = this.threshold;
970
if (threshold <= 0 || listenerCount < threshold) {
971
return undefined;
972
}
973
974
if (!this._stacks) {
975
this._stacks = new Map();
976
}
977
const count = (this._stacks.get(stack.value) || 0);
978
this._stacks.set(stack.value, count + 1);
979
this._warnCountdown -= 1;
980
981
if (this._warnCountdown <= 0) {
982
// only warn on first exceed and then every time the limit
983
// is exceeded by 50% again
984
this._warnCountdown = threshold * 0.5;
985
986
const [topStack, topCount] = this.getMostFrequentStack()!;
987
const message = `[${this.name}] potential listener LEAK detected, having ${listenerCount} listeners already. MOST frequent listener (${topCount}):`;
988
console.warn(message);
989
console.warn(topStack);
990
991
const error = new ListenerLeakError(message, topStack);
992
this._errorHandler(error);
993
}
994
995
return () => {
996
const count = (this._stacks!.get(stack.value) || 0);
997
this._stacks!.set(stack.value, count - 1);
998
};
999
}
1000
1001
getMostFrequentStack(): [string, number] | undefined {
1002
if (!this._stacks) {
1003
return undefined;
1004
}
1005
let topStack: [string, number] | undefined;
1006
let topCount: number = 0;
1007
for (const [stack, count] of this._stacks) {
1008
if (!topStack || topCount < count) {
1009
topStack = [stack, count];
1010
topCount = count;
1011
}
1012
}
1013
return topStack;
1014
}
1015
}
1016
1017
class Stacktrace {
1018
1019
static create() {
1020
const err = new Error();
1021
return new Stacktrace(err.stack ?? '');
1022
}
1023
1024
private constructor(readonly value: string) { }
1025
1026
print() {
1027
console.warn(this.value.split('\n').slice(2).join('\n'));
1028
}
1029
}
1030
1031
// error that is logged when going over the configured listener threshold
1032
export class ListenerLeakError extends Error {
1033
constructor(message: string, stack: string) {
1034
super(message);
1035
this.name = 'ListenerLeakError';
1036
this.stack = stack;
1037
}
1038
}
1039
1040
// SEVERE error that is logged when having gone way over the configured listener
1041
// threshold so that the emitter refuses to accept more listeners
1042
export class ListenerRefusalError extends Error {
1043
constructor(message: string, stack: string) {
1044
super(message);
1045
this.name = 'ListenerRefusalError';
1046
this.stack = stack;
1047
}
1048
}
1049
1050
let id = 0;
1051
class UniqueContainer<T> {
1052
stack?: Stacktrace;
1053
public id = id++;
1054
constructor(public readonly value: T) { }
1055
}
1056
const compactionThreshold = 2;
1057
1058
type ListenerContainer<T> = UniqueContainer<(data: T) => void>;
1059
type ListenerOrListeners<T> = (ListenerContainer<T> | undefined)[] | ListenerContainer<T>;
1060
1061
const forEachListener = <T>(listeners: ListenerOrListeners<T>, fn: (c: ListenerContainer<T>) => void) => {
1062
if (listeners instanceof UniqueContainer) {
1063
fn(listeners);
1064
} else {
1065
for (let i = 0; i < listeners.length; i++) {
1066
const l = listeners[i];
1067
if (l) {
1068
fn(l);
1069
}
1070
}
1071
}
1072
};
1073
1074
/**
1075
* The Emitter can be used to expose an Event to the public
1076
* to fire it from the insides.
1077
* Sample:
1078
class Document {
1079
1080
private readonly _onDidChange = new Emitter<(value:string)=>any>();
1081
1082
public onDidChange = this._onDidChange.event;
1083
1084
// getter-style
1085
// get onDidChange(): Event<(value:string)=>any> {
1086
// return this._onDidChange.event;
1087
// }
1088
1089
private _doIt() {
1090
//...
1091
this._onDidChange.fire(value);
1092
}
1093
}
1094
*/
1095
export class Emitter<T> {
1096
1097
private readonly _options?: EmitterOptions;
1098
private readonly _leakageMon?: LeakageMonitor;
1099
private readonly _perfMon?: EventProfiling;
1100
private _disposed?: true;
1101
private _event?: Event<T>;
1102
1103
/**
1104
* A listener, or list of listeners. A single listener is the most common
1105
* for event emitters (#185789), so we optimize that special case to avoid
1106
* wrapping it in an array (just like Node.js itself.)
1107
*
1108
* A list of listeners never 'downgrades' back to a plain function if
1109
* listeners are removed, for two reasons:
1110
*
1111
* 1. That's complicated (especially with the deliveryQueue)
1112
* 2. A listener with >1 listener is likely to have >1 listener again at
1113
* some point, and swapping between arrays and functions may[citation needed]
1114
* introduce unnecessary work and garbage.
1115
*
1116
* The array listeners can be 'sparse', to avoid reallocating the array
1117
* whenever any listener is added or removed. If more than `1 / compactionThreshold`
1118
* of the array is empty, only then is it resized.
1119
*/
1120
protected _listeners?: ListenerOrListeners<T>;
1121
1122
/**
1123
* Always to be defined if _listeners is an array. It's no longer a true
1124
* queue, but holds the dispatching 'state'. If `fire()` is called on an
1125
* emitter, any work left in the _deliveryQueue is finished first.
1126
*/
1127
private _deliveryQueue?: EventDeliveryQueuePrivate;
1128
protected _size = 0;
1129
1130
constructor(options?: EmitterOptions) {
1131
this._options = options;
1132
this._leakageMon = (_globalLeakWarningThreshold > 0 || this._options?.leakWarningThreshold)
1133
? new LeakageMonitor(options?.onListenerError ?? onUnexpectedError, this._options?.leakWarningThreshold ?? _globalLeakWarningThreshold) :
1134
undefined;
1135
this._perfMon = this._options?._profName ? new EventProfiling(this._options._profName) : undefined;
1136
this._deliveryQueue = this._options?.deliveryQueue as EventDeliveryQueuePrivate | undefined;
1137
}
1138
1139
dispose() {
1140
if (!this._disposed) {
1141
this._disposed = true;
1142
1143
// It is bad to have listeners at the time of disposing an emitter, it is worst to have listeners keep the emitter
1144
// alive via the reference that's embedded in their disposables. Therefore we loop over all remaining listeners and
1145
// unset their subscriptions/disposables. Looping and blaming remaining listeners is done on next tick because the
1146
// the following programming pattern is very popular:
1147
//
1148
// const someModel = this._disposables.add(new ModelObject()); // (1) create and register model
1149
// this._disposables.add(someModel.onDidChange(() => { ... }); // (2) subscribe and register model-event listener
1150
// ...later...
1151
// this._disposables.dispose(); disposes (1) then (2): don't warn after (1) but after the "overall dispose" is done
1152
1153
if (this._deliveryQueue?.current === this) {
1154
this._deliveryQueue.reset();
1155
}
1156
if (this._listeners) {
1157
if (_enableDisposeWithListenerWarning) {
1158
const listeners = this._listeners;
1159
queueMicrotask(() => {
1160
forEachListener(listeners, l => l.stack?.print());
1161
});
1162
}
1163
1164
this._listeners = undefined;
1165
this._size = 0;
1166
}
1167
this._options?.onDidRemoveLastListener?.();
1168
this._leakageMon?.dispose();
1169
}
1170
}
1171
1172
/**
1173
* For the public to allow to subscribe
1174
* to events from this Emitter
1175
*/
1176
get event(): Event<T> {
1177
this._event ??= (callback: (e: T) => unknown, thisArgs?: any, disposables?: IDisposable[] | DisposableStore) => {
1178
if (this._leakageMon && this._size > this._leakageMon.threshold ** 2) {
1179
const message = `[${this._leakageMon.name}] REFUSES to accept new listeners because it exceeded its threshold by far (${this._size} vs ${this._leakageMon.threshold})`;
1180
console.warn(message);
1181
1182
const tuple = this._leakageMon.getMostFrequentStack() ?? ['UNKNOWN stack', -1];
1183
const error = new ListenerRefusalError(`${message}. HINT: Stack shows most frequent listener (${tuple[1]}-times)`, tuple[0]);
1184
const errorHandler = this._options?.onListenerError || onUnexpectedError;
1185
errorHandler(error);
1186
1187
return Disposable.None;
1188
}
1189
1190
if (this._disposed) {
1191
// todo: should we warn if a listener is added to a disposed emitter? This happens often
1192
return Disposable.None;
1193
}
1194
1195
if (thisArgs) {
1196
callback = callback.bind(thisArgs);
1197
}
1198
1199
const contained = new UniqueContainer(callback);
1200
1201
let removeMonitor: Function | undefined;
1202
let stack: Stacktrace | undefined;
1203
if (this._leakageMon && this._size >= Math.ceil(this._leakageMon.threshold * 0.2)) {
1204
// check and record this emitter for potential leakage
1205
contained.stack = Stacktrace.create();
1206
removeMonitor = this._leakageMon.check(contained.stack, this._size + 1);
1207
}
1208
1209
if (_enableDisposeWithListenerWarning) {
1210
contained.stack = stack ?? Stacktrace.create();
1211
}
1212
1213
if (!this._listeners) {
1214
this._options?.onWillAddFirstListener?.(this);
1215
this._listeners = contained;
1216
this._options?.onDidAddFirstListener?.(this);
1217
} else if (this._listeners instanceof UniqueContainer) {
1218
this._deliveryQueue ??= new EventDeliveryQueuePrivate();
1219
this._listeners = [this._listeners, contained];
1220
} else {
1221
this._listeners.push(contained);
1222
}
1223
this._options?.onDidAddListener?.(this);
1224
1225
this._size++;
1226
1227
1228
const result = toDisposable(() => {
1229
removeMonitor?.();
1230
this._removeListener(contained);
1231
});
1232
addToDisposables(result, disposables);
1233
1234
return result;
1235
};
1236
1237
return this._event;
1238
}
1239
1240
private _removeListener(listener: ListenerContainer<T>) {
1241
this._options?.onWillRemoveListener?.(this);
1242
1243
if (!this._listeners) {
1244
return; // expected if a listener gets disposed
1245
}
1246
1247
if (this._size === 1) {
1248
this._listeners = undefined;
1249
this._options?.onDidRemoveLastListener?.(this);
1250
this._size = 0;
1251
return;
1252
}
1253
1254
// size > 1 which requires that listeners be a list:
1255
const listeners = this._listeners as (ListenerContainer<T> | undefined)[];
1256
1257
const index = listeners.indexOf(listener);
1258
if (index === -1) {
1259
console.log('disposed?', this._disposed);
1260
console.log('size?', this._size);
1261
console.log('arr?', JSON.stringify(this._listeners));
1262
throw new Error('Attempted to dispose unknown listener');
1263
}
1264
1265
this._size--;
1266
listeners[index] = undefined;
1267
1268
const adjustDeliveryQueue = this._deliveryQueue!.current === this;
1269
if (this._size * compactionThreshold <= listeners.length) {
1270
let n = 0;
1271
for (let i = 0; i < listeners.length; i++) {
1272
if (listeners[i]) {
1273
listeners[n++] = listeners[i];
1274
} else if (adjustDeliveryQueue && n < this._deliveryQueue!.end) {
1275
this._deliveryQueue!.end--;
1276
if (n < this._deliveryQueue!.i) {
1277
this._deliveryQueue!.i--;
1278
}
1279
}
1280
}
1281
listeners.length = n;
1282
}
1283
}
1284
1285
private _deliver(listener: undefined | UniqueContainer<(value: T) => void>, value: T) {
1286
if (!listener) {
1287
return;
1288
}
1289
1290
const errorHandler = this._options?.onListenerError || onUnexpectedError;
1291
if (!errorHandler) {
1292
listener.value(value);
1293
return;
1294
}
1295
1296
try {
1297
listener.value(value);
1298
} catch (e) {
1299
errorHandler(e);
1300
}
1301
}
1302
1303
/** Delivers items in the queue. Assumes the queue is ready to go. */
1304
private _deliverQueue(dq: EventDeliveryQueuePrivate) {
1305
const listeners = dq.current!._listeners! as (ListenerContainer<T> | undefined)[];
1306
while (dq.i < dq.end) {
1307
// important: dq.i is incremented before calling deliver() because it might reenter deliverQueue()
1308
this._deliver(listeners[dq.i++], dq.value as T);
1309
}
1310
dq.reset();
1311
}
1312
1313
/**
1314
* To be kept private to fire an event to
1315
* subscribers
1316
*/
1317
fire(event: T): void {
1318
if (this._deliveryQueue?.current) {
1319
this._deliverQueue(this._deliveryQueue);
1320
this._perfMon?.stop(); // last fire() will have starting perfmon, stop it before starting the next dispatch
1321
}
1322
1323
this._perfMon?.start(this._size);
1324
1325
if (!this._listeners) {
1326
// no-op
1327
} else if (this._listeners instanceof UniqueContainer) {
1328
this._deliver(this._listeners, event);
1329
} else {
1330
const dq = this._deliveryQueue!;
1331
dq.enqueue(this, event, this._listeners.length);
1332
this._deliverQueue(dq);
1333
}
1334
1335
this._perfMon?.stop();
1336
}
1337
1338
hasListeners(): boolean {
1339
return this._size > 0;
1340
}
1341
}
1342
1343
export interface EventDeliveryQueue {
1344
_isEventDeliveryQueue: true;
1345
}
1346
1347
export const createEventDeliveryQueue = (): EventDeliveryQueue => new EventDeliveryQueuePrivate();
1348
1349
class EventDeliveryQueuePrivate implements EventDeliveryQueue {
1350
declare _isEventDeliveryQueue: true;
1351
1352
/**
1353
* Index in current's listener list.
1354
*/
1355
public i = -1;
1356
1357
/**
1358
* The last index in the listener's list to deliver.
1359
*/
1360
public end = 0;
1361
1362
/**
1363
* Emitter currently being dispatched on. Emitter._listeners is always an array.
1364
*/
1365
public current?: Emitter<any>;
1366
/**
1367
* Currently emitting value. Defined whenever `current` is.
1368
*/
1369
public value?: unknown;
1370
1371
public enqueue<T>(emitter: Emitter<T>, value: T, end: number) {
1372
this.i = 0;
1373
this.end = end;
1374
this.current = emitter;
1375
this.value = value;
1376
}
1377
1378
public reset() {
1379
this.i = this.end; // force any current emission loop to stop, mainly for during dispose
1380
this.current = undefined;
1381
this.value = undefined;
1382
}
1383
}
1384
1385
export interface IWaitUntil {
1386
token: CancellationToken;
1387
waitUntil(thenable: Promise<unknown>): void;
1388
}
1389
1390
export type IWaitUntilData<T> = Omit<Omit<T, 'waitUntil'>, 'token'>;
1391
1392
export class AsyncEmitter<T extends IWaitUntil> extends Emitter<T> {
1393
1394
private _asyncDeliveryQueue?: LinkedList<[(ev: T) => void, IWaitUntilData<T>]>;
1395
1396
async fireAsync(data: IWaitUntilData<T>, token: CancellationToken, promiseJoin?: (p: Promise<unknown>, listener: Function) => Promise<unknown>): Promise<void> {
1397
if (!this._listeners) {
1398
return;
1399
}
1400
1401
if (!this._asyncDeliveryQueue) {
1402
this._asyncDeliveryQueue = new LinkedList();
1403
}
1404
1405
forEachListener(this._listeners, listener => this._asyncDeliveryQueue!.push([listener.value, data]));
1406
1407
while (this._asyncDeliveryQueue.size > 0 && !token.isCancellationRequested) {
1408
1409
const [listener, data] = this._asyncDeliveryQueue.shift()!;
1410
const thenables: Promise<unknown>[] = [];
1411
1412
// eslint-disable-next-line local/code-no-dangerous-type-assertions
1413
const event = <T>{
1414
...data,
1415
token,
1416
waitUntil: (p: Promise<unknown>): void => {
1417
if (Object.isFrozen(thenables)) {
1418
throw new Error('waitUntil can NOT be called asynchronous');
1419
}
1420
if (promiseJoin) {
1421
p = promiseJoin(p, listener);
1422
}
1423
thenables.push(p);
1424
}
1425
};
1426
1427
try {
1428
listener(event);
1429
} catch (e) {
1430
onUnexpectedError(e);
1431
continue;
1432
}
1433
1434
// freeze thenables-collection to enforce sync-calls to
1435
// wait until and then wait for all thenables to resolve
1436
Object.freeze(thenables);
1437
1438
await Promise.allSettled(thenables).then(values => {
1439
for (const value of values) {
1440
if (value.status === 'rejected') {
1441
onUnexpectedError(value.reason);
1442
}
1443
}
1444
});
1445
}
1446
}
1447
}
1448
1449
1450
export class PauseableEmitter<T> extends Emitter<T> {
1451
1452
private _isPaused = 0;
1453
protected _eventQueue = new LinkedList<T>();
1454
private _mergeFn?: (input: T[]) => T;
1455
1456
public get isPaused(): boolean {
1457
return this._isPaused !== 0;
1458
}
1459
1460
constructor(options?: EmitterOptions & { merge?: (input: T[]) => T }) {
1461
super(options);
1462
this._mergeFn = options?.merge;
1463
}
1464
1465
pause(): void {
1466
this._isPaused++;
1467
}
1468
1469
resume(): void {
1470
if (this._isPaused !== 0 && --this._isPaused === 0) {
1471
if (this._mergeFn) {
1472
// use the merge function to create a single composite
1473
// event. make a copy in case firing pauses this emitter
1474
if (this._eventQueue.size > 0) {
1475
const events = Array.from(this._eventQueue);
1476
this._eventQueue.clear();
1477
super.fire(this._mergeFn(events));
1478
}
1479
1480
} else {
1481
// no merging, fire each event individually and test
1482
// that this emitter isn't paused halfway through
1483
while (!this._isPaused && this._eventQueue.size !== 0) {
1484
super.fire(this._eventQueue.shift()!);
1485
}
1486
}
1487
}
1488
}
1489
1490
override fire(event: T): void {
1491
if (this._size) {
1492
if (this._isPaused !== 0) {
1493
this._eventQueue.push(event);
1494
} else {
1495
super.fire(event);
1496
}
1497
}
1498
}
1499
}
1500
1501
export class DebounceEmitter<T> extends PauseableEmitter<T> {
1502
1503
private readonly _delay: number;
1504
private _handle: Timeout | undefined;
1505
1506
constructor(options: EmitterOptions & { merge: (input: T[]) => T; delay?: number }) {
1507
super(options);
1508
this._delay = options.delay ?? 100;
1509
}
1510
1511
override fire(event: T): void {
1512
if (!this._handle) {
1513
this.pause();
1514
this._handle = setTimeout(() => {
1515
this._handle = undefined;
1516
this.resume();
1517
}, this._delay);
1518
}
1519
super.fire(event);
1520
}
1521
}
1522
1523
/**
1524
* An emitter which queue all events and then process them at the
1525
* end of the event loop.
1526
*/
1527
export class MicrotaskEmitter<T> extends Emitter<T> {
1528
private _queuedEvents: T[] = [];
1529
private _mergeFn?: (input: T[]) => T;
1530
1531
constructor(options?: EmitterOptions & { merge?: (input: T[]) => T }) {
1532
super(options);
1533
this._mergeFn = options?.merge;
1534
}
1535
override fire(event: T): void {
1536
1537
if (!this.hasListeners()) {
1538
return;
1539
}
1540
1541
this._queuedEvents.push(event);
1542
if (this._queuedEvents.length === 1) {
1543
queueMicrotask(() => {
1544
if (this._mergeFn) {
1545
super.fire(this._mergeFn(this._queuedEvents));
1546
} else {
1547
this._queuedEvents.forEach(e => super.fire(e));
1548
}
1549
this._queuedEvents = [];
1550
});
1551
}
1552
}
1553
}
1554
1555
/**
1556
* An event emitter that multiplexes many events into a single event.
1557
*
1558
* @example Listen to the `onData` event of all `Thing`s, dynamically adding and removing `Thing`s
1559
* to the multiplexer as needed.
1560
*
1561
* ```typescript
1562
* const anythingDataMultiplexer = new EventMultiplexer<{ data: string }>();
1563
*
1564
* const thingListeners = DisposableMap<Thing, IDisposable>();
1565
*
1566
* thingService.onDidAddThing(thing => {
1567
* thingListeners.set(thing, anythingDataMultiplexer.add(thing.onData);
1568
* });
1569
* thingService.onDidRemoveThing(thing => {
1570
* thingListeners.deleteAndDispose(thing);
1571
* });
1572
*
1573
* anythingDataMultiplexer.event(e => {
1574
* console.log('Something fired data ' + e.data)
1575
* });
1576
* ```
1577
*/
1578
export class EventMultiplexer<T> implements IDisposable {
1579
1580
private readonly emitter: Emitter<T>;
1581
private hasListeners = false;
1582
private events: { event: Event<T>; listener: IDisposable | null }[] = [];
1583
1584
constructor() {
1585
this.emitter = new Emitter<T>({
1586
onWillAddFirstListener: () => this.onFirstListenerAdd(),
1587
onDidRemoveLastListener: () => this.onLastListenerRemove()
1588
});
1589
}
1590
1591
get event(): Event<T> {
1592
return this.emitter.event;
1593
}
1594
1595
add(event: Event<T>): IDisposable {
1596
const e = { event: event, listener: null };
1597
this.events.push(e);
1598
1599
if (this.hasListeners) {
1600
this.hook(e);
1601
}
1602
1603
const dispose = () => {
1604
if (this.hasListeners) {
1605
this.unhook(e);
1606
}
1607
1608
const idx = this.events.indexOf(e);
1609
this.events.splice(idx, 1);
1610
};
1611
1612
return toDisposable(createSingleCallFunction(dispose));
1613
}
1614
1615
private onFirstListenerAdd(): void {
1616
this.hasListeners = true;
1617
this.events.forEach(e => this.hook(e));
1618
}
1619
1620
private onLastListenerRemove(): void {
1621
this.hasListeners = false;
1622
this.events.forEach(e => this.unhook(e));
1623
}
1624
1625
private hook(e: { event: Event<T>; listener: IDisposable | null }): void {
1626
e.listener = e.event(r => this.emitter.fire(r));
1627
}
1628
1629
private unhook(e: { event: Event<T>; listener: IDisposable | null }): void {
1630
e.listener?.dispose();
1631
e.listener = null;
1632
}
1633
1634
dispose(): void {
1635
this.emitter.dispose();
1636
1637
for (const e of this.events) {
1638
e.listener?.dispose();
1639
}
1640
this.events = [];
1641
}
1642
}
1643
1644
export interface IDynamicListEventMultiplexer<TEventType> extends IDisposable {
1645
readonly event: Event<TEventType>;
1646
}
1647
export class DynamicListEventMultiplexer<TItem, TEventType> implements IDynamicListEventMultiplexer<TEventType> {
1648
private readonly _store = new DisposableStore();
1649
1650
readonly event: Event<TEventType>;
1651
1652
constructor(
1653
items: TItem[],
1654
onAddItem: Event<TItem>,
1655
onRemoveItem: Event<TItem>,
1656
getEvent: (item: TItem) => Event<TEventType>
1657
) {
1658
const multiplexer = this._store.add(new EventMultiplexer<TEventType>());
1659
const itemListeners = this._store.add(new DisposableMap<TItem, IDisposable>());
1660
1661
function addItem(instance: TItem) {
1662
itemListeners.set(instance, multiplexer.add(getEvent(instance)));
1663
}
1664
1665
// Existing items
1666
for (const instance of items) {
1667
addItem(instance);
1668
}
1669
1670
// Added items
1671
this._store.add(onAddItem(instance => {
1672
addItem(instance);
1673
}));
1674
1675
// Removed items
1676
this._store.add(onRemoveItem(instance => {
1677
itemListeners.deleteAndDispose(instance);
1678
}));
1679
1680
this.event = multiplexer.event;
1681
}
1682
1683
dispose() {
1684
this._store.dispose();
1685
}
1686
}
1687
1688
/**
1689
* The EventBufferer is useful in situations in which you want
1690
* to delay firing your events during some code.
1691
* You can wrap that code and be sure that the event will not
1692
* be fired during that wrap.
1693
*
1694
* ```
1695
* const emitter: Emitter;
1696
* const delayer = new EventDelayer();
1697
* const delayedEvent = delayer.wrapEvent(emitter.event);
1698
*
1699
* delayedEvent(console.log);
1700
*
1701
* delayer.bufferEvents(() => {
1702
* emitter.fire(); // event will not be fired yet
1703
* });
1704
*
1705
* // event will only be fired at this point
1706
* ```
1707
*/
1708
export class EventBufferer {
1709
1710
private data: { buffers: Function[] }[] = [];
1711
1712
wrapEvent<T>(event: Event<T>): Event<T>;
1713
wrapEvent<T>(event: Event<T>, reduce: (last: T | undefined, event: T) => T): Event<T>;
1714
wrapEvent<T, O>(event: Event<T>, reduce: (last: O | undefined, event: T) => O, initial: O): Event<O>;
1715
wrapEvent<T, O>(event: Event<T>, reduce?: (last: T | O | undefined, event: T) => T | O, initial?: O): Event<O | T> {
1716
return (listener, thisArgs?, disposables?) => {
1717
return event(i => {
1718
const data = this.data[this.data.length - 1];
1719
1720
// Non-reduce scenario
1721
if (!reduce) {
1722
// Buffering case
1723
if (data) {
1724
data.buffers.push(() => listener.call(thisArgs, i));
1725
} else {
1726
// Not buffering case
1727
listener.call(thisArgs, i);
1728
}
1729
return;
1730
}
1731
1732
// Reduce scenario
1733
const reduceData = data as typeof data & {
1734
/**
1735
* The accumulated items that will be reduced.
1736
*/
1737
items?: T[];
1738
/**
1739
* The reduced result cached to be shared with other listeners.
1740
*/
1741
reducedResult?: T | O;
1742
};
1743
1744
// Not buffering case
1745
if (!reduceData) {
1746
// TODO: Is there a way to cache this reduce call for all listeners?
1747
listener.call(thisArgs, reduce(initial, i));
1748
return;
1749
}
1750
1751
// Buffering case
1752
reduceData.items ??= [];
1753
reduceData.items.push(i);
1754
if (reduceData.buffers.length === 0) {
1755
// Include a single buffered function that will reduce all events when we're done buffering events
1756
data.buffers.push(() => {
1757
// cache the reduced result so that the value can be shared across all listeners
1758
reduceData.reducedResult ??= initial
1759
? reduceData.items!.reduce(reduce as (last: O | undefined, event: T) => O, initial)
1760
: reduceData.items!.reduce(reduce as (last: T | undefined, event: T) => T);
1761
listener.call(thisArgs, reduceData.reducedResult);
1762
});
1763
}
1764
}, undefined, disposables);
1765
};
1766
}
1767
1768
bufferEvents<R = void>(fn: () => R): R {
1769
const data = { buffers: new Array<Function>() };
1770
this.data.push(data);
1771
const r = fn();
1772
this.data.pop();
1773
data.buffers.forEach(flush => flush());
1774
return r;
1775
}
1776
}
1777
1778
/**
1779
* A Relay is an event forwarder which functions as a replugabble event pipe.
1780
* Once created, you can connect an input event to it and it will simply forward
1781
* events from that input event through its own `event` property. The `input`
1782
* can be changed at any point in time.
1783
*/
1784
export class Relay<T> implements IDisposable {
1785
1786
private listening = false;
1787
private inputEvent: Event<T> = Event.None;
1788
private inputEventListener: IDisposable = Disposable.None;
1789
1790
private readonly emitter = new Emitter<T>({
1791
onDidAddFirstListener: () => {
1792
this.listening = true;
1793
this.inputEventListener = this.inputEvent(this.emitter.fire, this.emitter);
1794
},
1795
onDidRemoveLastListener: () => {
1796
this.listening = false;
1797
this.inputEventListener.dispose();
1798
}
1799
});
1800
1801
readonly event: Event<T> = this.emitter.event;
1802
1803
set input(event: Event<T>) {
1804
this.inputEvent = event;
1805
1806
if (this.listening) {
1807
this.inputEventListener.dispose();
1808
this.inputEventListener = event(this.emitter.fire, this.emitter);
1809
}
1810
}
1811
1812
dispose() {
1813
this.inputEventListener.dispose();
1814
this.emitter.dispose();
1815
}
1816
}
1817
1818
export interface IValueWithChangeEvent<T> {
1819
readonly onDidChange: Event<void>;
1820
get value(): T;
1821
}
1822
1823
export class ValueWithChangeEvent<T> implements IValueWithChangeEvent<T> {
1824
public static const<T>(value: T): IValueWithChangeEvent<T> {
1825
return new ConstValueWithChangeEvent(value);
1826
}
1827
1828
private readonly _onDidChange = new Emitter<void>();
1829
readonly onDidChange: Event<void> = this._onDidChange.event;
1830
1831
constructor(private _value: T) { }
1832
1833
get value(): T {
1834
return this._value;
1835
}
1836
1837
set value(value: T) {
1838
if (value !== this._value) {
1839
this._value = value;
1840
this._onDidChange.fire(undefined);
1841
}
1842
}
1843
}
1844
1845
class ConstValueWithChangeEvent<T> implements IValueWithChangeEvent<T> {
1846
public readonly onDidChange: Event<void> = Event.None;
1847
1848
constructor(readonly value: T) { }
1849
}
1850
1851
/**
1852
* @param handleItem Is called for each item in the set (but only the first time the item is seen in the set).
1853
* The returned disposable is disposed if the item is no longer in the set.
1854
*/
1855
export function trackSetChanges<T>(getData: () => ReadonlySet<T>, onDidChangeData: Event<unknown>, handleItem: (d: T) => IDisposable): IDisposable {
1856
const map = new DisposableMap<T, IDisposable>();
1857
let oldData = new Set(getData());
1858
for (const d of oldData) {
1859
map.set(d, handleItem(d));
1860
}
1861
1862
const store = new DisposableStore();
1863
store.add(onDidChangeData(() => {
1864
const newData = getData();
1865
const diff = diffSets(oldData, newData);
1866
for (const r of diff.removed) {
1867
map.deleteAndDispose(r);
1868
}
1869
for (const a of diff.added) {
1870
map.set(a, handleItem(a));
1871
}
1872
oldData = new Set(newData);
1873
}));
1874
store.add(map);
1875
return store;
1876
}
1877
1878
1879
function addToDisposables(result: IDisposable, disposables: DisposableStore | IDisposable[] | undefined) {
1880
if (disposables instanceof DisposableStore) {
1881
disposables.add(result);
1882
} else if (Array.isArray(disposables)) {
1883
disposables.push(result);
1884
}
1885
}
1886
1887
function disposeAndRemove(result: IDisposable, disposables: DisposableStore | IDisposable[] | undefined) {
1888
if (disposables instanceof DisposableStore) {
1889
disposables.delete(result);
1890
} else if (Array.isArray(disposables)) {
1891
const index = disposables.indexOf(result);
1892
if (index !== -1) {
1893
disposables.splice(index, 1);
1894
}
1895
}
1896
result.dispose();
1897
}
1898
1899