Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/src/vs/base/common/event.ts
3291 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
import { CancelablePromise } from './async.js';
7
import { CancellationToken } from './cancellation.js';
8
import { diffSets } from './collections.js';
9
import { onUnexpectedError } from './errors.js';
10
import { createSingleCallFunction } from './functional.js';
11
import { combinedDisposable, Disposable, DisposableMap, DisposableStore, IDisposable, toDisposable } from './lifecycle.js';
12
import { LinkedList } from './linkedList.js';
13
import { IObservable, IObservableWithChange, IObserver } from './observable.js';
14
import { StopWatch } from './stopwatch.js';
15
import { MicrotaskDelay } from './symbols.js';
16
17
18
// -----------------------------------------------------------------------------------------------------------------------
19
// Uncomment the next line to print warnings whenever an emitter with listeners is disposed. That is a sign of code smell.
20
// -----------------------------------------------------------------------------------------------------------------------
21
const _enableDisposeWithListenerWarning = false
22
// || Boolean("TRUE") // causes a linter warning so that it cannot be pushed
23
;
24
25
26
// -----------------------------------------------------------------------------------------------------------------------
27
// Uncomment the next line to print warnings whenever a snapshotted event is used repeatedly without cleanup.
28
// See https://github.com/microsoft/vscode/issues/142851
29
// -----------------------------------------------------------------------------------------------------------------------
30
const _enableSnapshotPotentialLeakWarning = false
31
// || Boolean("TRUE") // causes a linter warning so that it cannot be pushed
32
;
33
34
/**
35
* An event with zero or one parameters that can be subscribed to. The event is a function itself.
36
*/
37
export interface Event<T> {
38
(listener: (e: T) => unknown, thisArgs?: any, disposables?: IDisposable[] | DisposableStore): IDisposable;
39
}
40
41
export namespace Event {
42
export const None: Event<any> = () => Disposable.None;
43
44
function _addLeakageTraceLogic(options: EmitterOptions) {
45
if (_enableSnapshotPotentialLeakWarning) {
46
const { onDidAddListener: origListenerDidAdd } = options;
47
const stack = Stacktrace.create();
48
let count = 0;
49
options.onDidAddListener = () => {
50
if (++count === 2) {
51
console.warn('snapshotted emitter LIKELY used public and SHOULD HAVE BEEN created with DisposableStore. snapshotted here');
52
stack.print();
53
}
54
origListenerDidAdd?.();
55
};
56
}
57
}
58
59
/**
60
* Given an event, returns another event which debounces calls and defers the listeners to a later task via a shared
61
* `setTimeout`. The event is converted into a signal (`Event<void>`) to avoid additional object creation as a
62
* result of merging events and to try prevent race conditions that could arise when using related deferred and
63
* non-deferred events.
64
*
65
* This is useful for deferring non-critical work (eg. general UI updates) to ensure it does not block critical work
66
* (eg. latency of keypress to text rendered).
67
*
68
* *NOTE* that this function returns an `Event` and it MUST be called with a `DisposableStore` whenever the returned
69
* event is accessible to "third parties", e.g the event is a public property. Otherwise a leaked listener on the
70
* returned event causes this utility to leak a listener on the original event.
71
*
72
* @param event The event source for the new event.
73
* @param disposable A disposable store to add the new EventEmitter to.
74
*/
75
export function defer(event: Event<unknown>, disposable?: DisposableStore): Event<void> {
76
return debounce<unknown, void>(event, () => void 0, 0, undefined, true, undefined, disposable);
77
}
78
79
/**
80
* Given an event, returns another event which only fires once.
81
*
82
* @param event The event source for the new event.
83
*/
84
export function once<T>(event: Event<T>): Event<T> {
85
return (listener, thisArgs = null, disposables?) => {
86
// we need this, in case the event fires during the listener call
87
let didFire = false;
88
let result: IDisposable | undefined = undefined;
89
result = event(e => {
90
if (didFire) {
91
return;
92
} else if (result) {
93
result.dispose();
94
} else {
95
didFire = true;
96
}
97
98
return listener.call(thisArgs, e);
99
}, null, disposables);
100
101
if (didFire) {
102
result.dispose();
103
}
104
105
return result;
106
};
107
}
108
109
/**
110
* Given an event, returns another event which only fires once, and only when the condition is met.
111
*
112
* @param event The event source for the new event.
113
*/
114
export function onceIf<T>(event: Event<T>, condition: (e: T) => boolean): Event<T> {
115
return Event.once(Event.filter(event, condition));
116
}
117
118
/**
119
* Maps an event of one type into an event of another type using a mapping function, similar to how
120
* `Array.prototype.map` works.
121
*
122
* *NOTE* that this function returns an `Event` and it MUST be called with a `DisposableStore` whenever the returned
123
* event is accessible to "third parties", e.g the event is a public property. Otherwise a leaked listener on the
124
* returned event causes this utility to leak a listener on the original event.
125
*
126
* @param event The event source for the new event.
127
* @param map The mapping function.
128
* @param disposable A disposable store to add the new EventEmitter to.
129
*/
130
export function map<I, O>(event: Event<I>, map: (i: I) => O, disposable?: DisposableStore): Event<O> {
131
return snapshot((listener, thisArgs = null, disposables?) => event(i => listener.call(thisArgs, map(i)), null, disposables), disposable);
132
}
133
134
/**
135
* Wraps an event in another event that performs some function on the event object before firing.
136
*
137
* *NOTE* that this function returns an `Event` and it MUST be called with a `DisposableStore` whenever the returned
138
* event is accessible to "third parties", e.g the event is a public property. Otherwise a leaked listener on the
139
* returned event causes this utility to leak a listener on the original event.
140
*
141
* @param event The event source for the new event.
142
* @param each The function to perform on the event object.
143
* @param disposable A disposable store to add the new EventEmitter to.
144
*/
145
export function forEach<I>(event: Event<I>, each: (i: I) => void, disposable?: DisposableStore): Event<I> {
146
return snapshot((listener, thisArgs = null, disposables?) => event(i => { each(i); listener.call(thisArgs, i); }, null, disposables), disposable);
147
}
148
149
/**
150
* Wraps an event in another event that fires only when some condition is met.
151
*
152
* *NOTE* that this function returns an `Event` and it MUST be called with a `DisposableStore` whenever the returned
153
* event is accessible to "third parties", e.g the event is a public property. Otherwise a leaked listener on the
154
* returned event causes this utility to leak a listener on the original event.
155
*
156
* @param event The event source for the new event.
157
* @param filter The filter function that defines the condition. The event will fire for the object if this function
158
* returns true.
159
* @param disposable A disposable store to add the new EventEmitter to.
160
*/
161
export function filter<T, U>(event: Event<T | U>, filter: (e: T | U) => e is T, disposable?: DisposableStore): Event<T>;
162
export function filter<T>(event: Event<T>, filter: (e: T) => boolean, disposable?: DisposableStore): Event<T>;
163
export function filter<T, R>(event: Event<T | R>, filter: (e: T | R) => e is R, disposable?: DisposableStore): Event<R>;
164
export function filter<T>(event: Event<T>, filter: (e: T) => boolean, disposable?: DisposableStore): Event<T> {
165
return snapshot((listener, thisArgs = null, disposables?) => event(e => filter(e) && listener.call(thisArgs, e), null, disposables), disposable);
166
}
167
168
/**
169
* Given an event, returns the same event but typed as `Event<void>`.
170
*/
171
export function signal<T>(event: Event<T>): Event<void> {
172
return event as Event<any> as Event<void>;
173
}
174
175
/**
176
* Given a collection of events, returns a single event which emits whenever any of the provided events emit.
177
*/
178
export function any<T>(...events: Event<T>[]): Event<T>;
179
export function any(...events: Event<any>[]): Event<void>;
180
export function any<T>(...events: Event<T>[]): Event<T> {
181
return (listener, thisArgs = null, disposables?) => {
182
const disposable = combinedDisposable(...events.map(event => event(e => listener.call(thisArgs, e))));
183
return addAndReturnDisposable(disposable, disposables);
184
};
185
}
186
187
/**
188
* *NOTE* that this function returns an `Event` and it MUST be called with a `DisposableStore` whenever the returned
189
* event is accessible to "third parties", e.g the event is a public property. Otherwise a leaked listener on the
190
* returned event causes this utility to leak a listener on the original event.
191
*/
192
export function reduce<I, O>(event: Event<I>, merge: (last: O | undefined, event: I) => O, initial?: O, disposable?: DisposableStore): Event<O> {
193
let output: O | undefined = initial;
194
195
return map<I, O>(event, e => {
196
output = merge(output, e);
197
return output;
198
}, disposable);
199
}
200
201
function snapshot<T>(event: Event<T>, disposable: DisposableStore | undefined): Event<T> {
202
let listener: IDisposable | undefined;
203
204
const options: EmitterOptions | undefined = {
205
onWillAddFirstListener() {
206
listener = event(emitter.fire, emitter);
207
},
208
onDidRemoveLastListener() {
209
listener?.dispose();
210
}
211
};
212
213
if (!disposable) {
214
_addLeakageTraceLogic(options);
215
}
216
217
const emitter = new Emitter<T>(options);
218
219
disposable?.add(emitter);
220
221
return emitter.event;
222
}
223
224
/**
225
* Adds the IDisposable to the store if it's set, and returns it. Useful to
226
* Event function implementation.
227
*/
228
function addAndReturnDisposable<T extends IDisposable>(d: T, store: DisposableStore | IDisposable[] | undefined): T {
229
if (store instanceof Array) {
230
store.push(d);
231
} else if (store) {
232
store.add(d);
233
}
234
return d;
235
}
236
237
/**
238
* Given an event, creates a new emitter that event that will debounce events based on {@link delay} and give an
239
* array event object of all events that fired.
240
*
241
* *NOTE* that this function returns an `Event` and it MUST be called with a `DisposableStore` whenever the returned
242
* event is accessible to "third parties", e.g the event is a public property. Otherwise a leaked listener on the
243
* returned event causes this utility to leak a listener on the original event.
244
*
245
* @param event The original event to debounce.
246
* @param merge A function that reduces all events into a single event.
247
* @param delay The number of milliseconds to debounce.
248
* @param leading Whether to fire a leading event without debouncing.
249
* @param flushOnListenerRemove Whether to fire all debounced events when a listener is removed. If this is not
250
* specified, some events could go missing. Use this if it's important that all events are processed, even if the
251
* listener gets disposed before the debounced event fires.
252
* @param leakWarningThreshold See {@link EmitterOptions.leakWarningThreshold}.
253
* @param disposable A disposable store to register the debounce emitter to.
254
*/
255
export function debounce<T>(event: Event<T>, merge: (last: T | undefined, event: T) => T, delay?: number | typeof MicrotaskDelay, leading?: boolean, flushOnListenerRemove?: boolean, leakWarningThreshold?: number, disposable?: DisposableStore): Event<T>;
256
export function debounce<I, O>(event: Event<I>, merge: (last: O | undefined, event: I) => O, delay?: number | typeof MicrotaskDelay, leading?: boolean, flushOnListenerRemove?: boolean, leakWarningThreshold?: number, disposable?: DisposableStore): Event<O>;
257
export function debounce<I, O>(event: Event<I>, merge: (last: O | undefined, event: I) => O, delay: number | typeof MicrotaskDelay = 100, leading = false, flushOnListenerRemove = false, leakWarningThreshold?: number, disposable?: DisposableStore): Event<O> {
258
let subscription: IDisposable;
259
let output: O | undefined = undefined;
260
let handle: Timeout | undefined | null = undefined;
261
let numDebouncedCalls = 0;
262
let doFire: (() => void) | undefined;
263
264
const options: EmitterOptions | undefined = {
265
leakWarningThreshold,
266
onWillAddFirstListener() {
267
subscription = event(cur => {
268
numDebouncedCalls++;
269
output = merge(output, cur);
270
271
if (leading && !handle) {
272
emitter.fire(output);
273
output = undefined;
274
}
275
276
doFire = () => {
277
const _output = output;
278
output = undefined;
279
handle = undefined;
280
if (!leading || numDebouncedCalls > 1) {
281
emitter.fire(_output!);
282
}
283
numDebouncedCalls = 0;
284
};
285
286
if (typeof delay === 'number') {
287
if (handle) {
288
clearTimeout(handle);
289
}
290
handle = setTimeout(doFire, delay);
291
} else {
292
if (handle === undefined) {
293
handle = null;
294
queueMicrotask(doFire);
295
}
296
}
297
});
298
},
299
onWillRemoveListener() {
300
if (flushOnListenerRemove && numDebouncedCalls > 0) {
301
doFire?.();
302
}
303
},
304
onDidRemoveLastListener() {
305
doFire = undefined;
306
subscription.dispose();
307
}
308
};
309
310
if (!disposable) {
311
_addLeakageTraceLogic(options);
312
}
313
314
const emitter = new Emitter<O>(options);
315
316
disposable?.add(emitter);
317
318
return emitter.event;
319
}
320
321
/**
322
* Debounces an event, firing after some delay (default=0) with an array of all event original objects.
323
*
324
* *NOTE* that this function returns an `Event` and it MUST be called with a `DisposableStore` whenever the returned
325
* event is accessible to "third parties", e.g the event is a public property. Otherwise a leaked listener on the
326
* returned event causes this utility to leak a listener on the original event.
327
*/
328
export function accumulate<T>(event: Event<T>, delay: number | typeof MicrotaskDelay = 0, disposable?: DisposableStore): Event<T[]> {
329
return Event.debounce<T, T[]>(event, (last, e) => {
330
if (!last) {
331
return [e];
332
}
333
last.push(e);
334
return last;
335
}, delay, undefined, true, undefined, disposable);
336
}
337
338
/**
339
* Filters an event such that some condition is _not_ met more than once in a row, effectively ensuring duplicate
340
* event objects from different sources do not fire the same event object.
341
*
342
* *NOTE* that this function returns an `Event` and it MUST be called with a `DisposableStore` whenever the returned
343
* event is accessible to "third parties", e.g the event is a public property. Otherwise a leaked listener on the
344
* returned event causes this utility to leak a listener on the original event.
345
*
346
* @param event The event source for the new event.
347
* @param equals The equality condition.
348
* @param disposable A disposable store to add the new EventEmitter to.
349
*
350
* @example
351
* ```
352
* // Fire only one time when a single window is opened or focused
353
* Event.latch(Event.any(onDidOpenWindow, onDidFocusWindow))
354
* ```
355
*/
356
export function latch<T>(event: Event<T>, equals: (a: T, b: T) => boolean = (a, b) => a === b, disposable?: DisposableStore): Event<T> {
357
let firstCall = true;
358
let cache: T;
359
360
return filter(event, value => {
361
const shouldEmit = firstCall || !equals(value, cache);
362
firstCall = false;
363
cache = value;
364
return shouldEmit;
365
}, disposable);
366
}
367
368
/**
369
* Splits an event whose parameter is a union type into 2 separate events for each type in the union.
370
*
371
* *NOTE* that this function returns an `Event` and it MUST be called with a `DisposableStore` whenever the returned
372
* event is accessible to "third parties", e.g the event is a public property. Otherwise a leaked listener on the
373
* returned event causes this utility to leak a listener on the original event.
374
*
375
* @example
376
* ```
377
* const event = new EventEmitter<number | undefined>().event;
378
* const [numberEvent, undefinedEvent] = Event.split(event, isUndefined);
379
* ```
380
*
381
* @param event The event source for the new event.
382
* @param isT A function that determines what event is of the first type.
383
* @param disposable A disposable store to add the new EventEmitter to.
384
*/
385
export function split<T, U>(event: Event<T | U>, isT: (e: T | U) => e is T, disposable?: DisposableStore): [Event<T>, Event<U>] {
386
return [
387
Event.filter(event, isT, disposable),
388
Event.filter(event, e => !isT(e), disposable) as Event<U>,
389
];
390
}
391
392
/**
393
* Buffers an event until it has a listener attached.
394
*
395
* *NOTE* that this function returns an `Event` and it MUST be called with a `DisposableStore` whenever the returned
396
* event is accessible to "third parties", e.g the event is a public property. Otherwise a leaked listener on the
397
* returned event causes this utility to leak a listener on the original event.
398
*
399
* @param event The event source for the new event.
400
* @param flushAfterTimeout Determines whether to flush the buffer after a timeout immediately or after a
401
* `setTimeout` when the first event listener is added.
402
* @param _buffer Internal: A source event array used for tests.
403
*
404
* @example
405
* ```
406
* // Start accumulating events, when the first listener is attached, flush
407
* // the event after a timeout such that multiple listeners attached before
408
* // the timeout would receive the event
409
* this.onInstallExtension = Event.buffer(service.onInstallExtension, true);
410
* ```
411
*/
412
export function buffer<T>(event: Event<T>, flushAfterTimeout = false, _buffer: T[] = [], disposable?: DisposableStore): Event<T> {
413
let buffer: T[] | null = _buffer.slice();
414
415
let listener: IDisposable | null = event(e => {
416
if (buffer) {
417
buffer.push(e);
418
} else {
419
emitter.fire(e);
420
}
421
});
422
423
if (disposable) {
424
disposable.add(listener);
425
}
426
427
const flush = () => {
428
buffer?.forEach(e => emitter.fire(e));
429
buffer = null;
430
};
431
432
const emitter = new Emitter<T>({
433
onWillAddFirstListener() {
434
if (!listener) {
435
listener = event(e => emitter.fire(e));
436
if (disposable) {
437
disposable.add(listener);
438
}
439
}
440
},
441
442
onDidAddFirstListener() {
443
if (buffer) {
444
if (flushAfterTimeout) {
445
setTimeout(flush);
446
} else {
447
flush();
448
}
449
}
450
},
451
452
onDidRemoveLastListener() {
453
if (listener) {
454
listener.dispose();
455
}
456
listener = null;
457
}
458
});
459
460
if (disposable) {
461
disposable.add(emitter);
462
}
463
464
return emitter.event;
465
}
466
/**
467
* Wraps the event in an {@link IChainableEvent}, allowing a more functional programming style.
468
*
469
* @example
470
* ```
471
* // Normal
472
* const onEnterPressNormal = Event.filter(
473
* Event.map(onKeyPress.event, e => new StandardKeyboardEvent(e)),
474
* e.keyCode === KeyCode.Enter
475
* ).event;
476
*
477
* // Using chain
478
* const onEnterPressChain = Event.chain(onKeyPress.event, $ => $
479
* .map(e => new StandardKeyboardEvent(e))
480
* .filter(e => e.keyCode === KeyCode.Enter)
481
* );
482
* ```
483
*/
484
export function chain<T, R>(event: Event<T>, sythensize: ($: IChainableSythensis<T>) => IChainableSythensis<R>): Event<R> {
485
const fn: Event<R> = (listener, thisArgs, disposables) => {
486
const cs = sythensize(new ChainableSynthesis()) as ChainableSynthesis;
487
return event(function (value) {
488
const result = cs.evaluate(value);
489
if (result !== HaltChainable) {
490
listener.call(thisArgs, result);
491
}
492
}, undefined, disposables);
493
};
494
495
return fn;
496
}
497
498
const HaltChainable = Symbol('HaltChainable');
499
500
class ChainableSynthesis implements IChainableSythensis<any> {
501
private readonly steps: ((input: any) => unknown)[] = [];
502
503
map<O>(fn: (i: any) => O): this {
504
this.steps.push(fn);
505
return this;
506
}
507
508
forEach(fn: (i: any) => void): this {
509
this.steps.push(v => {
510
fn(v);
511
return v;
512
});
513
return this;
514
}
515
516
filter(fn: (e: any) => boolean): this {
517
this.steps.push(v => fn(v) ? v : HaltChainable);
518
return this;
519
}
520
521
reduce<R>(merge: (last: R | undefined, event: any) => R, initial?: R | undefined): this {
522
let last = initial;
523
this.steps.push(v => {
524
last = merge(last, v);
525
return last;
526
});
527
return this;
528
}
529
530
latch(equals: (a: any, b: any) => boolean = (a, b) => a === b): ChainableSynthesis {
531
let firstCall = true;
532
let cache: any;
533
this.steps.push(value => {
534
const shouldEmit = firstCall || !equals(value, cache);
535
firstCall = false;
536
cache = value;
537
return shouldEmit ? value : HaltChainable;
538
});
539
540
return this;
541
}
542
543
public evaluate(value: any) {
544
for (const step of this.steps) {
545
value = step(value);
546
if (value === HaltChainable) {
547
break;
548
}
549
}
550
551
return value;
552
}
553
}
554
555
export interface IChainableSythensis<T> {
556
map<O>(fn: (i: T) => O): IChainableSythensis<O>;
557
forEach(fn: (i: T) => void): IChainableSythensis<T>;
558
filter<R extends T>(fn: (e: T) => e is R): IChainableSythensis<R>;
559
filter(fn: (e: T) => boolean): IChainableSythensis<T>;
560
reduce<R>(merge: (last: R, event: T) => R, initial: R): IChainableSythensis<R>;
561
reduce<R>(merge: (last: R | undefined, event: T) => R): IChainableSythensis<R>;
562
latch(equals?: (a: T, b: T) => boolean): IChainableSythensis<T>;
563
}
564
565
export interface NodeEventEmitter {
566
on(event: string | symbol, listener: Function): unknown;
567
removeListener(event: string | symbol, listener: Function): unknown;
568
}
569
570
/**
571
* Creates an {@link Event} from a node event emitter.
572
*/
573
export function fromNodeEventEmitter<T>(emitter: NodeEventEmitter, eventName: string, map: (...args: any[]) => T = id => id): Event<T> {
574
const fn = (...args: any[]) => result.fire(map(...args));
575
const onFirstListenerAdd = () => emitter.on(eventName, fn);
576
const onLastListenerRemove = () => emitter.removeListener(eventName, fn);
577
const result = new Emitter<T>({ onWillAddFirstListener: onFirstListenerAdd, onDidRemoveLastListener: onLastListenerRemove });
578
579
return result.event;
580
}
581
582
export interface DOMEventEmitter {
583
addEventListener(event: string | symbol, listener: Function): void;
584
removeEventListener(event: string | symbol, listener: Function): void;
585
}
586
587
/**
588
* Creates an {@link Event} from a DOM event emitter.
589
*/
590
export function fromDOMEventEmitter<T>(emitter: DOMEventEmitter, eventName: string, map: (...args: any[]) => T = id => id): Event<T> {
591
const fn = (...args: any[]) => result.fire(map(...args));
592
const onFirstListenerAdd = () => emitter.addEventListener(eventName, fn);
593
const onLastListenerRemove = () => emitter.removeEventListener(eventName, fn);
594
const result = new Emitter<T>({ onWillAddFirstListener: onFirstListenerAdd, onDidRemoveLastListener: onLastListenerRemove });
595
596
return result.event;
597
}
598
599
/**
600
* Creates a promise out of an event, using the {@link Event.once} helper.
601
*/
602
export function toPromise<T>(event: Event<T>, disposables?: IDisposable[] | DisposableStore): CancelablePromise<T> {
603
let cancelRef: () => void;
604
const promise = new Promise((resolve, reject) => {
605
const listener = once(event)(resolve, null, disposables);
606
// not resolved, matching the behavior of a normal disposal
607
cancelRef = () => listener.dispose();
608
}) as CancelablePromise<T>;
609
promise.cancel = cancelRef!;
610
611
return promise;
612
}
613
614
/**
615
* A convenience function for forwarding an event to another emitter which
616
* improves readability.
617
*
618
* This is similar to {@link Relay} but allows instantiating and forwarding
619
* on a single line and also allows for multiple source events.
620
* @param from The event to forward.
621
* @param to The emitter to forward the event to.
622
* @example
623
* Event.forward(event, emitter);
624
* // equivalent to
625
* event(e => emitter.fire(e));
626
* // equivalent to
627
* event(emitter.fire, emitter);
628
*/
629
export function forward<T>(from: Event<T>, to: Emitter<T>): IDisposable {
630
return from(e => to.fire(e));
631
}
632
633
/**
634
* Adds a listener to an event and calls the listener immediately with undefined as the event object.
635
*
636
* @example
637
* ```
638
* // Initialize the UI and update it when dataChangeEvent fires
639
* runAndSubscribe(dataChangeEvent, () => this._updateUI());
640
* ```
641
*/
642
export function runAndSubscribe<T>(event: Event<T>, handler: (e: T) => unknown, initial: T): IDisposable;
643
export function runAndSubscribe<T>(event: Event<T>, handler: (e: T | undefined) => unknown): IDisposable;
644
export function runAndSubscribe<T>(event: Event<T>, handler: (e: T | undefined) => unknown, initial?: T): IDisposable {
645
handler(initial);
646
return event(e => handler(e));
647
}
648
649
class EmitterObserver<T> implements IObserver {
650
651
readonly emitter: Emitter<T>;
652
653
private _counter = 0;
654
private _hasChanged = false;
655
656
constructor(readonly _observable: IObservable<T>, store: DisposableStore | undefined) {
657
const options: EmitterOptions = {
658
onWillAddFirstListener: () => {
659
_observable.addObserver(this);
660
661
// Communicate to the observable that we received its current value and would like to be notified about future changes.
662
this._observable.reportChanges();
663
},
664
onDidRemoveLastListener: () => {
665
_observable.removeObserver(this);
666
}
667
};
668
if (!store) {
669
_addLeakageTraceLogic(options);
670
}
671
this.emitter = new Emitter<T>(options);
672
if (store) {
673
store.add(this.emitter);
674
}
675
}
676
677
beginUpdate<T>(_observable: IObservable<T>): void {
678
// assert(_observable === this.obs);
679
this._counter++;
680
}
681
682
handlePossibleChange<T>(_observable: IObservable<T>): void {
683
// assert(_observable === this.obs);
684
}
685
686
handleChange<T, TChange>(_observable: IObservableWithChange<T, TChange>, _change: TChange): void {
687
// assert(_observable === this.obs);
688
this._hasChanged = true;
689
}
690
691
endUpdate<T>(_observable: IObservable<T>): void {
692
// assert(_observable === this.obs);
693
this._counter--;
694
if (this._counter === 0) {
695
this._observable.reportChanges();
696
if (this._hasChanged) {
697
this._hasChanged = false;
698
this.emitter.fire(this._observable.get());
699
}
700
}
701
}
702
}
703
704
/**
705
* Creates an event emitter that is fired when the observable changes.
706
* Each listeners subscribes to the emitter.
707
*/
708
export function fromObservable<T>(obs: IObservable<T>, store?: DisposableStore): Event<T> {
709
const observer = new EmitterObserver(obs, store);
710
return observer.emitter.event;
711
}
712
713
/**
714
* Each listener is attached to the observable directly.
715
*/
716
export function fromObservableLight(observable: IObservable<unknown>): Event<void> {
717
return (listener, thisArgs, disposables) => {
718
let count = 0;
719
let didChange = false;
720
const observer: IObserver = {
721
beginUpdate() {
722
count++;
723
},
724
endUpdate() {
725
count--;
726
if (count === 0) {
727
observable.reportChanges();
728
if (didChange) {
729
didChange = false;
730
listener.call(thisArgs);
731
}
732
}
733
},
734
handlePossibleChange() {
735
// noop
736
},
737
handleChange() {
738
didChange = true;
739
}
740
};
741
observable.addObserver(observer);
742
observable.reportChanges();
743
const disposable = {
744
dispose() {
745
observable.removeObserver(observer);
746
}
747
};
748
749
if (disposables instanceof DisposableStore) {
750
disposables.add(disposable);
751
} else if (Array.isArray(disposables)) {
752
disposables.push(disposable);
753
}
754
755
return disposable;
756
};
757
}
758
}
759
760
export interface EmitterOptions {
761
/**
762
* Optional function that's called *before* the very first listener is added
763
*/
764
onWillAddFirstListener?: Function;
765
/**
766
* Optional function that's called *after* the very first listener is added
767
*/
768
onDidAddFirstListener?: Function;
769
/**
770
* Optional function that's called after a listener is added
771
*/
772
onDidAddListener?: Function;
773
/**
774
* Optional function that's called *after* remove the very last listener
775
*/
776
onDidRemoveLastListener?: Function;
777
/**
778
* Optional function that's called *before* a listener is removed
779
*/
780
onWillRemoveListener?: Function;
781
/**
782
* Optional function that's called when a listener throws an error. Defaults to
783
* {@link onUnexpectedError}
784
*/
785
onListenerError?: (e: any) => void;
786
/**
787
* Number of listeners that are allowed before assuming a leak. Default to
788
* a globally configured value
789
*
790
* @see setGlobalLeakWarningThreshold
791
*/
792
leakWarningThreshold?: number;
793
/**
794
* Pass in a delivery queue, which is useful for ensuring
795
* in order event delivery across multiple emitters.
796
*/
797
deliveryQueue?: EventDeliveryQueue;
798
799
/** ONLY enable this during development */
800
_profName?: string;
801
}
802
803
804
export class EventProfiling {
805
806
static readonly all = new Set<EventProfiling>();
807
808
private static _idPool = 0;
809
810
readonly name: string;
811
public listenerCount: number = 0;
812
public invocationCount = 0;
813
public elapsedOverall = 0;
814
public durations: number[] = [];
815
816
private _stopWatch?: StopWatch;
817
818
constructor(name: string) {
819
this.name = `${name}_${EventProfiling._idPool++}`;
820
EventProfiling.all.add(this);
821
}
822
823
start(listenerCount: number): void {
824
this._stopWatch = new StopWatch();
825
this.listenerCount = listenerCount;
826
}
827
828
stop(): void {
829
if (this._stopWatch) {
830
const elapsed = this._stopWatch.elapsed();
831
this.durations.push(elapsed);
832
this.elapsedOverall += elapsed;
833
this.invocationCount += 1;
834
this._stopWatch = undefined;
835
}
836
}
837
}
838
839
let _globalLeakWarningThreshold = -1;
840
export function setGlobalLeakWarningThreshold(n: number): IDisposable {
841
const oldValue = _globalLeakWarningThreshold;
842
_globalLeakWarningThreshold = n;
843
return {
844
dispose() {
845
_globalLeakWarningThreshold = oldValue;
846
}
847
};
848
}
849
850
class LeakageMonitor {
851
852
private static _idPool = 1;
853
854
private _stacks: Map<string, number> | undefined;
855
private _warnCountdown: number = 0;
856
857
constructor(
858
private readonly _errorHandler: (err: Error) => void,
859
readonly threshold: number,
860
readonly name: string = (LeakageMonitor._idPool++).toString(16).padStart(3, '0')
861
) { }
862
863
dispose(): void {
864
this._stacks?.clear();
865
}
866
867
check(stack: Stacktrace, listenerCount: number): undefined | (() => void) {
868
869
const threshold = this.threshold;
870
if (threshold <= 0 || listenerCount < threshold) {
871
return undefined;
872
}
873
874
if (!this._stacks) {
875
this._stacks = new Map();
876
}
877
const count = (this._stacks.get(stack.value) || 0);
878
this._stacks.set(stack.value, count + 1);
879
this._warnCountdown -= 1;
880
881
if (this._warnCountdown <= 0) {
882
// only warn on first exceed and then every time the limit
883
// is exceeded by 50% again
884
this._warnCountdown = threshold * 0.5;
885
886
const [topStack, topCount] = this.getMostFrequentStack()!;
887
const message = `[${this.name}] potential listener LEAK detected, having ${listenerCount} listeners already. MOST frequent listener (${topCount}):`;
888
console.warn(message);
889
console.warn(topStack!);
890
891
const error = new ListenerLeakError(message, topStack);
892
this._errorHandler(error);
893
}
894
895
return () => {
896
const count = (this._stacks!.get(stack.value) || 0);
897
this._stacks!.set(stack.value, count - 1);
898
};
899
}
900
901
getMostFrequentStack(): [string, number] | undefined {
902
if (!this._stacks) {
903
return undefined;
904
}
905
let topStack: [string, number] | undefined;
906
let topCount: number = 0;
907
for (const [stack, count] of this._stacks) {
908
if (!topStack || topCount < count) {
909
topStack = [stack, count];
910
topCount = count;
911
}
912
}
913
return topStack;
914
}
915
}
916
917
class Stacktrace {
918
919
static create() {
920
const err = new Error();
921
return new Stacktrace(err.stack ?? '');
922
}
923
924
private constructor(readonly value: string) { }
925
926
print() {
927
console.warn(this.value.split('\n').slice(2).join('\n'));
928
}
929
}
930
931
// error that is logged when going over the configured listener threshold
932
export class ListenerLeakError extends Error {
933
constructor(message: string, stack: string) {
934
super(message);
935
this.name = 'ListenerLeakError';
936
this.stack = stack;
937
}
938
}
939
940
// SEVERE error that is logged when having gone way over the configured listener
941
// threshold so that the emitter refuses to accept more listeners
942
export class ListenerRefusalError extends Error {
943
constructor(message: string, stack: string) {
944
super(message);
945
this.name = 'ListenerRefusalError';
946
this.stack = stack;
947
}
948
}
949
950
let id = 0;
951
class UniqueContainer<T> {
952
stack?: Stacktrace;
953
public id = id++;
954
constructor(public readonly value: T) { }
955
}
956
const compactionThreshold = 2;
957
958
type ListenerContainer<T> = UniqueContainer<(data: T) => void>;
959
type ListenerOrListeners<T> = (ListenerContainer<T> | undefined)[] | ListenerContainer<T>;
960
961
const forEachListener = <T>(listeners: ListenerOrListeners<T>, fn: (c: ListenerContainer<T>) => void) => {
962
if (listeners instanceof UniqueContainer) {
963
fn(listeners);
964
} else {
965
for (let i = 0; i < listeners.length; i++) {
966
const l = listeners[i];
967
if (l) {
968
fn(l);
969
}
970
}
971
}
972
};
973
974
/**
975
* The Emitter can be used to expose an Event to the public
976
* to fire it from the insides.
977
* Sample:
978
class Document {
979
980
private readonly _onDidChange = new Emitter<(value:string)=>any>();
981
982
public onDidChange = this._onDidChange.event;
983
984
// getter-style
985
// get onDidChange(): Event<(value:string)=>any> {
986
// return this._onDidChange.event;
987
// }
988
989
private _doIt() {
990
//...
991
this._onDidChange.fire(value);
992
}
993
}
994
*/
995
export class Emitter<T> {
996
997
private readonly _options?: EmitterOptions;
998
private readonly _leakageMon?: LeakageMonitor;
999
private readonly _perfMon?: EventProfiling;
1000
private _disposed?: true;
1001
private _event?: Event<T>;
1002
1003
/**
1004
* A listener, or list of listeners. A single listener is the most common
1005
* for event emitters (#185789), so we optimize that special case to avoid
1006
* wrapping it in an array (just like Node.js itself.)
1007
*
1008
* A list of listeners never 'downgrades' back to a plain function if
1009
* listeners are removed, for two reasons:
1010
*
1011
* 1. That's complicated (especially with the deliveryQueue)
1012
* 2. A listener with >1 listener is likely to have >1 listener again at
1013
* some point, and swapping between arrays and functions may[citation needed]
1014
* introduce unnecessary work and garbage.
1015
*
1016
* The array listeners can be 'sparse', to avoid reallocating the array
1017
* whenever any listener is added or removed. If more than `1 / compactionThreshold`
1018
* of the array is empty, only then is it resized.
1019
*/
1020
protected _listeners?: ListenerOrListeners<T>;
1021
1022
/**
1023
* Always to be defined if _listeners is an array. It's no longer a true
1024
* queue, but holds the dispatching 'state'. If `fire()` is called on an
1025
* emitter, any work left in the _deliveryQueue is finished first.
1026
*/
1027
private _deliveryQueue?: EventDeliveryQueuePrivate;
1028
protected _size = 0;
1029
1030
constructor(options?: EmitterOptions) {
1031
this._options = options;
1032
this._leakageMon = (_globalLeakWarningThreshold > 0 || this._options?.leakWarningThreshold)
1033
? new LeakageMonitor(options?.onListenerError ?? onUnexpectedError, this._options?.leakWarningThreshold ?? _globalLeakWarningThreshold) :
1034
undefined;
1035
this._perfMon = this._options?._profName ? new EventProfiling(this._options._profName) : undefined;
1036
this._deliveryQueue = this._options?.deliveryQueue as EventDeliveryQueuePrivate | undefined;
1037
}
1038
1039
dispose() {
1040
if (!this._disposed) {
1041
this._disposed = true;
1042
1043
// It is bad to have listeners at the time of disposing an emitter, it is worst to have listeners keep the emitter
1044
// alive via the reference that's embedded in their disposables. Therefore we loop over all remaining listeners and
1045
// unset their subscriptions/disposables. Looping and blaming remaining listeners is done on next tick because the
1046
// the following programming pattern is very popular:
1047
//
1048
// const someModel = this._disposables.add(new ModelObject()); // (1) create and register model
1049
// this._disposables.add(someModel.onDidChange(() => { ... }); // (2) subscribe and register model-event listener
1050
// ...later...
1051
// this._disposables.dispose(); disposes (1) then (2): don't warn after (1) but after the "overall dispose" is done
1052
1053
if (this._deliveryQueue?.current === this) {
1054
this._deliveryQueue.reset();
1055
}
1056
if (this._listeners) {
1057
if (_enableDisposeWithListenerWarning) {
1058
const listeners = this._listeners;
1059
queueMicrotask(() => {
1060
forEachListener(listeners, l => l.stack?.print());
1061
});
1062
}
1063
1064
this._listeners = undefined;
1065
this._size = 0;
1066
}
1067
this._options?.onDidRemoveLastListener?.();
1068
this._leakageMon?.dispose();
1069
}
1070
}
1071
1072
/**
1073
* For the public to allow to subscribe
1074
* to events from this Emitter
1075
*/
1076
get event(): Event<T> {
1077
this._event ??= (callback: (e: T) => unknown, thisArgs?: any, disposables?: IDisposable[] | DisposableStore) => {
1078
if (this._leakageMon && this._size > this._leakageMon.threshold ** 2) {
1079
const message = `[${this._leakageMon.name}] REFUSES to accept new listeners because it exceeded its threshold by far (${this._size} vs ${this._leakageMon.threshold})`;
1080
console.warn(message);
1081
1082
const tuple = this._leakageMon.getMostFrequentStack() ?? ['UNKNOWN stack', -1];
1083
const error = new ListenerRefusalError(`${message}. HINT: Stack shows most frequent listener (${tuple[1]}-times)`, tuple[0]);
1084
const errorHandler = this._options?.onListenerError || onUnexpectedError;
1085
errorHandler(error);
1086
1087
return Disposable.None;
1088
}
1089
1090
if (this._disposed) {
1091
// todo: should we warn if a listener is added to a disposed emitter? This happens often
1092
return Disposable.None;
1093
}
1094
1095
if (thisArgs) {
1096
callback = callback.bind(thisArgs);
1097
}
1098
1099
const contained = new UniqueContainer(callback);
1100
1101
let removeMonitor: Function | undefined;
1102
let stack: Stacktrace | undefined;
1103
if (this._leakageMon && this._size >= Math.ceil(this._leakageMon.threshold * 0.2)) {
1104
// check and record this emitter for potential leakage
1105
contained.stack = Stacktrace.create();
1106
removeMonitor = this._leakageMon.check(contained.stack, this._size + 1);
1107
}
1108
1109
if (_enableDisposeWithListenerWarning) {
1110
contained.stack = stack ?? Stacktrace.create();
1111
}
1112
1113
if (!this._listeners) {
1114
this._options?.onWillAddFirstListener?.(this);
1115
this._listeners = contained;
1116
this._options?.onDidAddFirstListener?.(this);
1117
} else if (this._listeners instanceof UniqueContainer) {
1118
this._deliveryQueue ??= new EventDeliveryQueuePrivate();
1119
this._listeners = [this._listeners, contained];
1120
} else {
1121
this._listeners.push(contained);
1122
}
1123
this._options?.onDidAddListener?.(this);
1124
1125
this._size++;
1126
1127
1128
const result = toDisposable(() => {
1129
removeMonitor?.();
1130
this._removeListener(contained);
1131
});
1132
if (disposables instanceof DisposableStore) {
1133
disposables.add(result);
1134
} else if (Array.isArray(disposables)) {
1135
disposables.push(result);
1136
}
1137
1138
return result;
1139
};
1140
1141
return this._event;
1142
}
1143
1144
private _removeListener(listener: ListenerContainer<T>) {
1145
this._options?.onWillRemoveListener?.(this);
1146
1147
if (!this._listeners) {
1148
return; // expected if a listener gets disposed
1149
}
1150
1151
if (this._size === 1) {
1152
this._listeners = undefined;
1153
this._options?.onDidRemoveLastListener?.(this);
1154
this._size = 0;
1155
return;
1156
}
1157
1158
// size > 1 which requires that listeners be a list:
1159
const listeners = this._listeners as (ListenerContainer<T> | undefined)[];
1160
1161
const index = listeners.indexOf(listener);
1162
if (index === -1) {
1163
console.log('disposed?', this._disposed);
1164
console.log('size?', this._size);
1165
console.log('arr?', JSON.stringify(this._listeners));
1166
throw new Error('Attempted to dispose unknown listener');
1167
}
1168
1169
this._size--;
1170
listeners[index] = undefined;
1171
1172
const adjustDeliveryQueue = this._deliveryQueue!.current === this;
1173
if (this._size * compactionThreshold <= listeners.length) {
1174
let n = 0;
1175
for (let i = 0; i < listeners.length; i++) {
1176
if (listeners[i]) {
1177
listeners[n++] = listeners[i];
1178
} else if (adjustDeliveryQueue && n < this._deliveryQueue!.end) {
1179
this._deliveryQueue!.end--;
1180
if (n < this._deliveryQueue!.i) {
1181
this._deliveryQueue!.i--;
1182
}
1183
}
1184
}
1185
listeners.length = n;
1186
}
1187
}
1188
1189
private _deliver(listener: undefined | UniqueContainer<(value: T) => void>, value: T) {
1190
if (!listener) {
1191
return;
1192
}
1193
1194
const errorHandler = this._options?.onListenerError || onUnexpectedError;
1195
if (!errorHandler) {
1196
listener.value(value);
1197
return;
1198
}
1199
1200
try {
1201
listener.value(value);
1202
} catch (e) {
1203
errorHandler(e);
1204
}
1205
}
1206
1207
/** Delivers items in the queue. Assumes the queue is ready to go. */
1208
private _deliverQueue(dq: EventDeliveryQueuePrivate) {
1209
const listeners = dq.current!._listeners! as (ListenerContainer<T> | undefined)[];
1210
while (dq.i < dq.end) {
1211
// important: dq.i is incremented before calling deliver() because it might reenter deliverQueue()
1212
this._deliver(listeners[dq.i++], dq.value as T);
1213
}
1214
dq.reset();
1215
}
1216
1217
/**
1218
* To be kept private to fire an event to
1219
* subscribers
1220
*/
1221
fire(event: T): void {
1222
if (this._deliveryQueue?.current) {
1223
this._deliverQueue(this._deliveryQueue);
1224
this._perfMon?.stop(); // last fire() will have starting perfmon, stop it before starting the next dispatch
1225
}
1226
1227
this._perfMon?.start(this._size);
1228
1229
if (!this._listeners) {
1230
// no-op
1231
} else if (this._listeners instanceof UniqueContainer) {
1232
this._deliver(this._listeners, event);
1233
} else {
1234
const dq = this._deliveryQueue!;
1235
dq.enqueue(this, event, this._listeners.length);
1236
this._deliverQueue(dq);
1237
}
1238
1239
this._perfMon?.stop();
1240
}
1241
1242
hasListeners(): boolean {
1243
return this._size > 0;
1244
}
1245
}
1246
1247
export interface EventDeliveryQueue {
1248
_isEventDeliveryQueue: true;
1249
}
1250
1251
export const createEventDeliveryQueue = (): EventDeliveryQueue => new EventDeliveryQueuePrivate();
1252
1253
class EventDeliveryQueuePrivate implements EventDeliveryQueue {
1254
declare _isEventDeliveryQueue: true;
1255
1256
/**
1257
* Index in current's listener list.
1258
*/
1259
public i = -1;
1260
1261
/**
1262
* The last index in the listener's list to deliver.
1263
*/
1264
public end = 0;
1265
1266
/**
1267
* Emitter currently being dispatched on. Emitter._listeners is always an array.
1268
*/
1269
public current?: Emitter<any>;
1270
/**
1271
* Currently emitting value. Defined whenever `current` is.
1272
*/
1273
public value?: unknown;
1274
1275
public enqueue<T>(emitter: Emitter<T>, value: T, end: number) {
1276
this.i = 0;
1277
this.end = end;
1278
this.current = emitter;
1279
this.value = value;
1280
}
1281
1282
public reset() {
1283
this.i = this.end; // force any current emission loop to stop, mainly for during dispose
1284
this.current = undefined;
1285
this.value = undefined;
1286
}
1287
}
1288
1289
export interface IWaitUntil {
1290
token: CancellationToken;
1291
waitUntil(thenable: Promise<unknown>): void;
1292
}
1293
1294
export type IWaitUntilData<T> = Omit<Omit<T, 'waitUntil'>, 'token'>;
1295
1296
export class AsyncEmitter<T extends IWaitUntil> extends Emitter<T> {
1297
1298
private _asyncDeliveryQueue?: LinkedList<[(ev: T) => void, IWaitUntilData<T>]>;
1299
1300
async fireAsync(data: IWaitUntilData<T>, token: CancellationToken, promiseJoin?: (p: Promise<unknown>, listener: Function) => Promise<unknown>): Promise<void> {
1301
if (!this._listeners) {
1302
return;
1303
}
1304
1305
if (!this._asyncDeliveryQueue) {
1306
this._asyncDeliveryQueue = new LinkedList();
1307
}
1308
1309
forEachListener(this._listeners, listener => this._asyncDeliveryQueue!.push([listener.value, data]));
1310
1311
while (this._asyncDeliveryQueue.size > 0 && !token.isCancellationRequested) {
1312
1313
const [listener, data] = this._asyncDeliveryQueue.shift()!;
1314
const thenables: Promise<unknown>[] = [];
1315
1316
// eslint-disable-next-line local/code-no-dangerous-type-assertions
1317
const event = <T>{
1318
...data,
1319
token,
1320
waitUntil: (p: Promise<unknown>): void => {
1321
if (Object.isFrozen(thenables)) {
1322
throw new Error('waitUntil can NOT be called asynchronous');
1323
}
1324
if (promiseJoin) {
1325
p = promiseJoin(p, listener);
1326
}
1327
thenables.push(p);
1328
}
1329
};
1330
1331
try {
1332
listener(event);
1333
} catch (e) {
1334
onUnexpectedError(e);
1335
continue;
1336
}
1337
1338
// freeze thenables-collection to enforce sync-calls to
1339
// wait until and then wait for all thenables to resolve
1340
Object.freeze(thenables);
1341
1342
await Promise.allSettled(thenables).then(values => {
1343
for (const value of values) {
1344
if (value.status === 'rejected') {
1345
onUnexpectedError(value.reason);
1346
}
1347
}
1348
});
1349
}
1350
}
1351
}
1352
1353
1354
export class PauseableEmitter<T> extends Emitter<T> {
1355
1356
private _isPaused = 0;
1357
protected _eventQueue = new LinkedList<T>();
1358
private _mergeFn?: (input: T[]) => T;
1359
1360
public get isPaused(): boolean {
1361
return this._isPaused !== 0;
1362
}
1363
1364
constructor(options?: EmitterOptions & { merge?: (input: T[]) => T }) {
1365
super(options);
1366
this._mergeFn = options?.merge;
1367
}
1368
1369
pause(): void {
1370
this._isPaused++;
1371
}
1372
1373
resume(): void {
1374
if (this._isPaused !== 0 && --this._isPaused === 0) {
1375
if (this._mergeFn) {
1376
// use the merge function to create a single composite
1377
// event. make a copy in case firing pauses this emitter
1378
if (this._eventQueue.size > 0) {
1379
const events = Array.from(this._eventQueue);
1380
this._eventQueue.clear();
1381
super.fire(this._mergeFn(events));
1382
}
1383
1384
} else {
1385
// no merging, fire each event individually and test
1386
// that this emitter isn't paused halfway through
1387
while (!this._isPaused && this._eventQueue.size !== 0) {
1388
super.fire(this._eventQueue.shift()!);
1389
}
1390
}
1391
}
1392
}
1393
1394
override fire(event: T): void {
1395
if (this._size) {
1396
if (this._isPaused !== 0) {
1397
this._eventQueue.push(event);
1398
} else {
1399
super.fire(event);
1400
}
1401
}
1402
}
1403
}
1404
1405
export class DebounceEmitter<T> extends PauseableEmitter<T> {
1406
1407
private readonly _delay: number;
1408
private _handle: Timeout | undefined;
1409
1410
constructor(options: EmitterOptions & { merge: (input: T[]) => T; delay?: number }) {
1411
super(options);
1412
this._delay = options.delay ?? 100;
1413
}
1414
1415
override fire(event: T): void {
1416
if (!this._handle) {
1417
this.pause();
1418
this._handle = setTimeout(() => {
1419
this._handle = undefined;
1420
this.resume();
1421
}, this._delay);
1422
}
1423
super.fire(event);
1424
}
1425
}
1426
1427
/**
1428
* An emitter which queue all events and then process them at the
1429
* end of the event loop.
1430
*/
1431
export class MicrotaskEmitter<T> extends Emitter<T> {
1432
private _queuedEvents: T[] = [];
1433
private _mergeFn?: (input: T[]) => T;
1434
1435
constructor(options?: EmitterOptions & { merge?: (input: T[]) => T }) {
1436
super(options);
1437
this._mergeFn = options?.merge;
1438
}
1439
override fire(event: T): void {
1440
1441
if (!this.hasListeners()) {
1442
return;
1443
}
1444
1445
this._queuedEvents.push(event);
1446
if (this._queuedEvents.length === 1) {
1447
queueMicrotask(() => {
1448
if (this._mergeFn) {
1449
super.fire(this._mergeFn(this._queuedEvents));
1450
} else {
1451
this._queuedEvents.forEach(e => super.fire(e));
1452
}
1453
this._queuedEvents = [];
1454
});
1455
}
1456
}
1457
}
1458
1459
/**
1460
* An event emitter that multiplexes many events into a single event.
1461
*
1462
* @example Listen to the `onData` event of all `Thing`s, dynamically adding and removing `Thing`s
1463
* to the multiplexer as needed.
1464
*
1465
* ```typescript
1466
* const anythingDataMultiplexer = new EventMultiplexer<{ data: string }>();
1467
*
1468
* const thingListeners = DisposableMap<Thing, IDisposable>();
1469
*
1470
* thingService.onDidAddThing(thing => {
1471
* thingListeners.set(thing, anythingDataMultiplexer.add(thing.onData);
1472
* });
1473
* thingService.onDidRemoveThing(thing => {
1474
* thingListeners.deleteAndDispose(thing);
1475
* });
1476
*
1477
* anythingDataMultiplexer.event(e => {
1478
* console.log('Something fired data ' + e.data)
1479
* });
1480
* ```
1481
*/
1482
export class EventMultiplexer<T> implements IDisposable {
1483
1484
private readonly emitter: Emitter<T>;
1485
private hasListeners = false;
1486
private events: { event: Event<T>; listener: IDisposable | null }[] = [];
1487
1488
constructor() {
1489
this.emitter = new Emitter<T>({
1490
onWillAddFirstListener: () => this.onFirstListenerAdd(),
1491
onDidRemoveLastListener: () => this.onLastListenerRemove()
1492
});
1493
}
1494
1495
get event(): Event<T> {
1496
return this.emitter.event;
1497
}
1498
1499
add(event: Event<T>): IDisposable {
1500
const e = { event: event, listener: null };
1501
this.events.push(e);
1502
1503
if (this.hasListeners) {
1504
this.hook(e);
1505
}
1506
1507
const dispose = () => {
1508
if (this.hasListeners) {
1509
this.unhook(e);
1510
}
1511
1512
const idx = this.events.indexOf(e);
1513
this.events.splice(idx, 1);
1514
};
1515
1516
return toDisposable(createSingleCallFunction(dispose));
1517
}
1518
1519
private onFirstListenerAdd(): void {
1520
this.hasListeners = true;
1521
this.events.forEach(e => this.hook(e));
1522
}
1523
1524
private onLastListenerRemove(): void {
1525
this.hasListeners = false;
1526
this.events.forEach(e => this.unhook(e));
1527
}
1528
1529
private hook(e: { event: Event<T>; listener: IDisposable | null }): void {
1530
e.listener = e.event(r => this.emitter.fire(r));
1531
}
1532
1533
private unhook(e: { event: Event<T>; listener: IDisposable | null }): void {
1534
e.listener?.dispose();
1535
e.listener = null;
1536
}
1537
1538
dispose(): void {
1539
this.emitter.dispose();
1540
1541
for (const e of this.events) {
1542
e.listener?.dispose();
1543
}
1544
this.events = [];
1545
}
1546
}
1547
1548
export interface IDynamicListEventMultiplexer<TEventType> extends IDisposable {
1549
readonly event: Event<TEventType>;
1550
}
1551
export class DynamicListEventMultiplexer<TItem, TEventType> implements IDynamicListEventMultiplexer<TEventType> {
1552
private readonly _store = new DisposableStore();
1553
1554
readonly event: Event<TEventType>;
1555
1556
constructor(
1557
items: TItem[],
1558
onAddItem: Event<TItem>,
1559
onRemoveItem: Event<TItem>,
1560
getEvent: (item: TItem) => Event<TEventType>
1561
) {
1562
const multiplexer = this._store.add(new EventMultiplexer<TEventType>());
1563
const itemListeners = this._store.add(new DisposableMap<TItem, IDisposable>());
1564
1565
function addItem(instance: TItem) {
1566
itemListeners.set(instance, multiplexer.add(getEvent(instance)));
1567
}
1568
1569
// Existing items
1570
for (const instance of items) {
1571
addItem(instance);
1572
}
1573
1574
// Added items
1575
this._store.add(onAddItem(instance => {
1576
addItem(instance);
1577
}));
1578
1579
// Removed items
1580
this._store.add(onRemoveItem(instance => {
1581
itemListeners.deleteAndDispose(instance);
1582
}));
1583
1584
this.event = multiplexer.event;
1585
}
1586
1587
dispose() {
1588
this._store.dispose();
1589
}
1590
}
1591
1592
/**
1593
* The EventBufferer is useful in situations in which you want
1594
* to delay firing your events during some code.
1595
* You can wrap that code and be sure that the event will not
1596
* be fired during that wrap.
1597
*
1598
* ```
1599
* const emitter: Emitter;
1600
* const delayer = new EventDelayer();
1601
* const delayedEvent = delayer.wrapEvent(emitter.event);
1602
*
1603
* delayedEvent(console.log);
1604
*
1605
* delayer.bufferEvents(() => {
1606
* emitter.fire(); // event will not be fired yet
1607
* });
1608
*
1609
* // event will only be fired at this point
1610
* ```
1611
*/
1612
export class EventBufferer {
1613
1614
private data: { buffers: Function[] }[] = [];
1615
1616
wrapEvent<T>(event: Event<T>): Event<T>;
1617
wrapEvent<T>(event: Event<T>, reduce: (last: T | undefined, event: T) => T): Event<T>;
1618
wrapEvent<T, O>(event: Event<T>, reduce: (last: O | undefined, event: T) => O, initial: O): Event<O>;
1619
wrapEvent<T, O>(event: Event<T>, reduce?: (last: T | O | undefined, event: T) => T | O, initial?: O): Event<O | T> {
1620
return (listener, thisArgs?, disposables?) => {
1621
return event(i => {
1622
const data = this.data[this.data.length - 1];
1623
1624
// Non-reduce scenario
1625
if (!reduce) {
1626
// Buffering case
1627
if (data) {
1628
data.buffers.push(() => listener.call(thisArgs, i));
1629
} else {
1630
// Not buffering case
1631
listener.call(thisArgs, i);
1632
}
1633
return;
1634
}
1635
1636
// Reduce scenario
1637
const reduceData = data as typeof data & {
1638
/**
1639
* The accumulated items that will be reduced.
1640
*/
1641
items?: T[];
1642
/**
1643
* The reduced result cached to be shared with other listeners.
1644
*/
1645
reducedResult?: T | O;
1646
};
1647
1648
// Not buffering case
1649
if (!reduceData) {
1650
// TODO: Is there a way to cache this reduce call for all listeners?
1651
listener.call(thisArgs, reduce(initial, i));
1652
return;
1653
}
1654
1655
// Buffering case
1656
reduceData.items ??= [];
1657
reduceData.items.push(i);
1658
if (reduceData.buffers.length === 0) {
1659
// Include a single buffered function that will reduce all events when we're done buffering events
1660
data.buffers.push(() => {
1661
// cache the reduced result so that the value can be shared across all listeners
1662
reduceData.reducedResult ??= initial
1663
? reduceData.items!.reduce(reduce as (last: O | undefined, event: T) => O, initial)
1664
: reduceData.items!.reduce(reduce as (last: T | undefined, event: T) => T);
1665
listener.call(thisArgs, reduceData.reducedResult);
1666
});
1667
}
1668
}, undefined, disposables);
1669
};
1670
}
1671
1672
bufferEvents<R = void>(fn: () => R): R {
1673
const data = { buffers: new Array<Function>() };
1674
this.data.push(data);
1675
const r = fn();
1676
this.data.pop();
1677
data.buffers.forEach(flush => flush());
1678
return r;
1679
}
1680
}
1681
1682
/**
1683
* A Relay is an event forwarder which functions as a replugabble event pipe.
1684
* Once created, you can connect an input event to it and it will simply forward
1685
* events from that input event through its own `event` property. The `input`
1686
* can be changed at any point in time.
1687
*/
1688
export class Relay<T> implements IDisposable {
1689
1690
private listening = false;
1691
private inputEvent: Event<T> = Event.None;
1692
private inputEventListener: IDisposable = Disposable.None;
1693
1694
private readonly emitter = new Emitter<T>({
1695
onDidAddFirstListener: () => {
1696
this.listening = true;
1697
this.inputEventListener = this.inputEvent(this.emitter.fire, this.emitter);
1698
},
1699
onDidRemoveLastListener: () => {
1700
this.listening = false;
1701
this.inputEventListener.dispose();
1702
}
1703
});
1704
1705
readonly event: Event<T> = this.emitter.event;
1706
1707
set input(event: Event<T>) {
1708
this.inputEvent = event;
1709
1710
if (this.listening) {
1711
this.inputEventListener.dispose();
1712
this.inputEventListener = event(this.emitter.fire, this.emitter);
1713
}
1714
}
1715
1716
dispose() {
1717
this.inputEventListener.dispose();
1718
this.emitter.dispose();
1719
}
1720
}
1721
1722
export interface IValueWithChangeEvent<T> {
1723
readonly onDidChange: Event<void>;
1724
get value(): T;
1725
}
1726
1727
export class ValueWithChangeEvent<T> implements IValueWithChangeEvent<T> {
1728
public static const<T>(value: T): IValueWithChangeEvent<T> {
1729
return new ConstValueWithChangeEvent(value);
1730
}
1731
1732
private readonly _onDidChange = new Emitter<void>();
1733
readonly onDidChange: Event<void> = this._onDidChange.event;
1734
1735
constructor(private _value: T) { }
1736
1737
get value(): T {
1738
return this._value;
1739
}
1740
1741
set value(value: T) {
1742
if (value !== this._value) {
1743
this._value = value;
1744
this._onDidChange.fire(undefined);
1745
}
1746
}
1747
}
1748
1749
class ConstValueWithChangeEvent<T> implements IValueWithChangeEvent<T> {
1750
public readonly onDidChange: Event<void> = Event.None;
1751
1752
constructor(readonly value: T) { }
1753
}
1754
1755
/**
1756
* @param handleItem Is called for each item in the set (but only the first time the item is seen in the set).
1757
* The returned disposable is disposed if the item is no longer in the set.
1758
*/
1759
export function trackSetChanges<T>(getData: () => ReadonlySet<T>, onDidChangeData: Event<unknown>, handleItem: (d: T) => IDisposable): IDisposable {
1760
const map = new DisposableMap<T, IDisposable>();
1761
let oldData = new Set(getData());
1762
for (const d of oldData) {
1763
map.set(d, handleItem(d));
1764
}
1765
1766
const store = new DisposableStore();
1767
store.add(onDidChangeData(() => {
1768
const newData = getData();
1769
const diff = diffSets(oldData, newData);
1770
for (const r of diff.removed) {
1771
map.deleteAndDispose(r);
1772
}
1773
for (const a of diff.added) {
1774
map.set(a, handleItem(a));
1775
}
1776
oldData = new Set(newData);
1777
}));
1778
store.add(map);
1779
return store;
1780
}
1781
1782