Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/extensions/copilot/src/util/vs/base/common/stream.ts
13405 views
1
//!!! DO NOT modify, this file was COPIED from 'microsoft/vscode'
2
3
/*---------------------------------------------------------------------------------------------
4
* Copyright (c) Microsoft Corporation. All rights reserved.
5
* Licensed under the MIT License. See License.txt in the project root for license information.
6
*--------------------------------------------------------------------------------------------*/
7
8
import { CancellationToken } from './cancellation';
9
import { onUnexpectedError } from './errors';
10
import { DisposableStore, toDisposable } from './lifecycle';
11
12
/**
13
* The payload that flows in readable stream events.
14
*/
15
export type ReadableStreamEventPayload<T> = T | Error | 'end';
16
17
export interface ReadableStreamEvents<T> {
18
19
/**
20
* The 'data' event is emitted whenever the stream is
21
* relinquishing ownership of a chunk of data to a consumer.
22
*
23
* NOTE: PLEASE UNDERSTAND THAT ADDING A DATA LISTENER CAN
24
* TURN THE STREAM INTO FLOWING MODE. IT IS THEREFOR THE
25
* LAST LISTENER THAT SHOULD BE ADDED AND NOT THE FIRST
26
*
27
* Use `listenStream` as a helper method to listen to
28
* stream events in the right order.
29
*/
30
on(event: 'data', callback: (data: T) => void): void;
31
32
/**
33
* Emitted when any error occurs.
34
*/
35
on(event: 'error', callback: (err: Error) => void): void;
36
37
/**
38
* The 'end' event is emitted when there is no more data
39
* to be consumed from the stream. The 'end' event will
40
* not be emitted unless the data is completely consumed.
41
*/
42
on(event: 'end', callback: () => void): void;
43
}
44
45
/**
46
* A interface that emulates the API shape of a node.js readable
47
* stream for use in native and web environments.
48
*/
49
export interface ReadableStream<T> extends ReadableStreamEvents<T> {
50
51
/**
52
* Stops emitting any events until resume() is called.
53
*/
54
pause(): void;
55
56
/**
57
* Starts emitting events again after pause() was called.
58
*/
59
resume(): void;
60
61
/**
62
* Destroys the stream and stops emitting any event.
63
*/
64
destroy(): void;
65
66
/**
67
* Allows to remove a listener that was previously added.
68
*/
69
removeListener(event: string, callback: Function): void;
70
}
71
72
/**
73
* A interface that emulates the API shape of a node.js readable
74
* for use in native and web environments.
75
*/
76
export interface Readable<T> {
77
78
/**
79
* Read data from the underlying source. Will return
80
* null to indicate that no more data can be read.
81
*/
82
read(): T | null;
83
}
84
85
export function isReadable<T>(obj: unknown): obj is Readable<T> {
86
const candidate = obj as Readable<T> | undefined;
87
if (!candidate) {
88
return false;
89
}
90
91
return typeof candidate.read === 'function';
92
}
93
94
/**
95
* A interface that emulates the API shape of a node.js writeable
96
* stream for use in native and web environments.
97
*/
98
export interface WriteableStream<T> extends ReadableStream<T> {
99
100
/**
101
* Writing data to the stream will trigger the on('data')
102
* event listener if the stream is flowing and buffer the
103
* data otherwise until the stream is flowing.
104
*
105
* If a `highWaterMark` is configured and writing to the
106
* stream reaches this mark, a promise will be returned
107
* that should be awaited on before writing more data.
108
* Otherwise there is a risk of buffering a large number
109
* of data chunks without consumer.
110
*/
111
write(data: T): void | Promise<void>;
112
113
/**
114
* Signals an error to the consumer of the stream via the
115
* on('error') handler if the stream is flowing.
116
*
117
* NOTE: call `end` to signal that the stream has ended,
118
* this DOES NOT happen automatically from `error`.
119
*/
120
error(error: Error): void;
121
122
/**
123
* Signals the end of the stream to the consumer. If the
124
* result is provided, will trigger the on('data') event
125
* listener if the stream is flowing and buffer the data
126
* otherwise until the stream is flowing.
127
*/
128
end(result?: T): void;
129
}
130
131
/**
132
* A stream that has a buffer already read. Returns the original stream
133
* that was read as well as the chunks that got read.
134
*
135
* The `ended` flag indicates if the stream has been fully consumed.
136
*/
137
export interface ReadableBufferedStream<T> {
138
139
/**
140
* The original stream that is being read.
141
*/
142
stream: ReadableStream<T>;
143
144
/**
145
* An array of chunks already read from this stream.
146
*/
147
buffer: T[];
148
149
/**
150
* Signals if the stream has ended or not. If not, consumers
151
* should continue to read from the stream until consumed.
152
*/
153
ended: boolean;
154
}
155
156
export function isReadableStream<T>(obj: unknown): obj is ReadableStream<T> {
157
const candidate = obj as ReadableStream<T> | undefined;
158
if (!candidate) {
159
return false;
160
}
161
162
return [candidate.on, candidate.pause, candidate.resume, candidate.destroy].every(fn => typeof fn === 'function');
163
}
164
165
export function isReadableBufferedStream<T>(obj: unknown): obj is ReadableBufferedStream<T> {
166
const candidate = obj as ReadableBufferedStream<T> | undefined;
167
if (!candidate) {
168
return false;
169
}
170
171
return isReadableStream(candidate.stream) && Array.isArray(candidate.buffer) && typeof candidate.ended === 'boolean';
172
}
173
174
export interface IReducer<T, R = T> {
175
(data: T[]): R;
176
}
177
178
export interface IDataTransformer<Original, Transformed> {
179
(data: Original): Transformed;
180
}
181
182
export interface IErrorTransformer {
183
(error: Error): Error;
184
}
185
186
export interface ITransformer<Original, Transformed> {
187
data: IDataTransformer<Original, Transformed>;
188
error?: IErrorTransformer;
189
}
190
191
export function newWriteableStream<T>(reducer: IReducer<T> | null, options?: WriteableStreamOptions): WriteableStream<T> {
192
return new WriteableStreamImpl<T>(reducer, options);
193
}
194
195
export interface WriteableStreamOptions {
196
197
/**
198
* The number of objects to buffer before WriteableStream#write()
199
* signals back that the buffer is full. Can be used to reduce
200
* the memory pressure when the stream is not flowing.
201
*/
202
highWaterMark?: number;
203
}
204
205
class WriteableStreamImpl<T> implements WriteableStream<T> {
206
207
private readonly state = {
208
flowing: false,
209
ended: false,
210
destroyed: false
211
};
212
213
private readonly buffer = {
214
data: [] as T[],
215
error: [] as Error[]
216
};
217
218
private readonly listeners = {
219
data: [] as { (data: T): void }[],
220
error: [] as { (error: Error): void }[],
221
end: [] as { (): void }[]
222
};
223
224
private readonly pendingWritePromises: Function[] = [];
225
226
/**
227
* @param reducer a function that reduces the buffered data into a single object;
228
* because some objects can be complex and non-reducible, we also
229
* allow passing the explicit `null` value to skip the reduce step
230
* @param options stream options
231
*/
232
constructor(private reducer: IReducer<T> | null, private options?: WriteableStreamOptions) { }
233
234
pause(): void {
235
if (this.state.destroyed) {
236
return;
237
}
238
239
this.state.flowing = false;
240
}
241
242
resume(): void {
243
if (this.state.destroyed) {
244
return;
245
}
246
247
if (!this.state.flowing) {
248
this.state.flowing = true;
249
250
// emit buffered events
251
this.flowData();
252
this.flowErrors();
253
this.flowEnd();
254
}
255
}
256
257
write(data: T): void | Promise<void> {
258
if (this.state.destroyed) {
259
return;
260
}
261
262
// flowing: directly send the data to listeners
263
if (this.state.flowing) {
264
this.emitData(data);
265
}
266
267
// not yet flowing: buffer data until flowing
268
else {
269
this.buffer.data.push(data);
270
271
// highWaterMark: if configured, signal back when buffer reached limits
272
if (typeof this.options?.highWaterMark === 'number' && this.buffer.data.length > this.options.highWaterMark) {
273
return new Promise(resolve => this.pendingWritePromises.push(resolve));
274
}
275
}
276
}
277
278
error(error: Error): void {
279
if (this.state.destroyed) {
280
return;
281
}
282
283
// flowing: directly send the error to listeners
284
if (this.state.flowing) {
285
this.emitError(error);
286
}
287
288
// not yet flowing: buffer errors until flowing
289
else {
290
this.buffer.error.push(error);
291
}
292
}
293
294
end(result?: T): void {
295
if (this.state.destroyed) {
296
return;
297
}
298
299
// end with data if provided
300
if (typeof result !== 'undefined') {
301
this.write(result);
302
}
303
304
// flowing: send end event to listeners
305
if (this.state.flowing) {
306
this.emitEnd();
307
308
this.destroy();
309
}
310
311
// not yet flowing: remember state
312
else {
313
this.state.ended = true;
314
}
315
}
316
317
private emitData(data: T): void {
318
this.listeners.data.slice(0).forEach(listener => listener(data)); // slice to avoid listener mutation from delivering event
319
}
320
321
private emitError(error: Error): void {
322
if (this.listeners.error.length === 0) {
323
onUnexpectedError(error); // nobody listened to this error so we log it as unexpected
324
} else {
325
this.listeners.error.slice(0).forEach(listener => listener(error)); // slice to avoid listener mutation from delivering event
326
}
327
}
328
329
private emitEnd(): void {
330
this.listeners.end.slice(0).forEach(listener => listener()); // slice to avoid listener mutation from delivering event
331
}
332
333
on(event: 'data', callback: (data: T) => void): void;
334
on(event: 'error', callback: (err: Error) => void): void;
335
on(event: 'end', callback: () => void): void;
336
on(event: 'data' | 'error' | 'end', callback: ((data: T) => void) | ((err: Error) => void) | (() => void)): void {
337
if (this.state.destroyed) {
338
return;
339
}
340
341
switch (event) {
342
case 'data':
343
this.listeners.data.push(callback as (data: T) => void);
344
345
// switch into flowing mode as soon as the first 'data'
346
// listener is added and we are not yet in flowing mode
347
this.resume();
348
349
break;
350
351
case 'end':
352
this.listeners.end.push(callback as () => void);
353
354
// emit 'end' event directly if we are flowing
355
// and the end has already been reached
356
//
357
// finish() when it went through
358
if (this.state.flowing && this.flowEnd()) {
359
this.destroy();
360
}
361
362
break;
363
364
case 'error':
365
this.listeners.error.push(callback as (err: Error) => void);
366
367
// emit buffered 'error' events unless done already
368
// now that we know that we have at least one listener
369
if (this.state.flowing) {
370
this.flowErrors();
371
}
372
373
break;
374
}
375
}
376
377
removeListener(event: string, callback: Function): void {
378
if (this.state.destroyed) {
379
return;
380
}
381
382
let listeners: unknown[] | undefined = undefined;
383
384
switch (event) {
385
case 'data':
386
listeners = this.listeners.data;
387
break;
388
389
case 'end':
390
listeners = this.listeners.end;
391
break;
392
393
case 'error':
394
listeners = this.listeners.error;
395
break;
396
}
397
398
if (listeners) {
399
const index = listeners.indexOf(callback);
400
if (index >= 0) {
401
listeners.splice(index, 1);
402
}
403
}
404
}
405
406
private flowData(): void {
407
// if buffer is empty, nothing to do
408
if (this.buffer.data.length === 0) {
409
return;
410
}
411
412
// if buffer data can be reduced into a single object,
413
// emit the reduced data
414
if (typeof this.reducer === 'function') {
415
const fullDataBuffer = this.reducer(this.buffer.data);
416
417
this.emitData(fullDataBuffer);
418
} else {
419
// otherwise emit each buffered data instance individually
420
for (const data of this.buffer.data) {
421
this.emitData(data);
422
}
423
}
424
425
this.buffer.data.length = 0;
426
427
// when the buffer is empty, resolve all pending writers
428
const pendingWritePromises = [...this.pendingWritePromises];
429
this.pendingWritePromises.length = 0;
430
pendingWritePromises.forEach(pendingWritePromise => pendingWritePromise());
431
}
432
433
private flowErrors(): void {
434
if (this.listeners.error.length > 0) {
435
for (const error of this.buffer.error) {
436
this.emitError(error);
437
}
438
439
this.buffer.error.length = 0;
440
}
441
}
442
443
private flowEnd(): boolean {
444
if (this.state.ended) {
445
this.emitEnd();
446
447
return this.listeners.end.length > 0;
448
}
449
450
return false;
451
}
452
453
destroy(): void {
454
if (!this.state.destroyed) {
455
this.state.destroyed = true;
456
this.state.ended = true;
457
458
this.buffer.data.length = 0;
459
this.buffer.error.length = 0;
460
461
this.listeners.data.length = 0;
462
this.listeners.error.length = 0;
463
this.listeners.end.length = 0;
464
465
this.pendingWritePromises.length = 0;
466
}
467
}
468
}
469
470
/**
471
* Helper to fully read a T readable into a T.
472
*/
473
export function consumeReadable<T>(readable: Readable<T>, reducer: IReducer<T>): T {
474
const chunks: T[] = [];
475
476
let chunk: T | null;
477
while ((chunk = readable.read()) !== null) {
478
chunks.push(chunk);
479
}
480
481
return reducer(chunks);
482
}
483
484
/**
485
* Helper to read a T readable up to a maximum of chunks. If the limit is
486
* reached, will return a readable instead to ensure all data can still
487
* be read.
488
*/
489
export function peekReadable<T>(readable: Readable<T>, reducer: IReducer<T>, maxChunks: number): T | Readable<T> {
490
const chunks: T[] = [];
491
492
let chunk: T | null | undefined = undefined;
493
while ((chunk = readable.read()) !== null && chunks.length < maxChunks) {
494
chunks.push(chunk);
495
}
496
497
// If the last chunk is null, it means we reached the end of
498
// the readable and return all the data at once
499
if (chunk === null && chunks.length > 0) {
500
return reducer(chunks);
501
}
502
503
// Otherwise, we still have a chunk, it means we reached the maxChunks
504
// value and as such we return a new Readable that first returns
505
// the existing read chunks and then continues with reading from
506
// the underlying readable.
507
return {
508
read: () => {
509
510
// First consume chunks from our array
511
if (chunks.length > 0) {
512
return chunks.shift()!;
513
}
514
515
// Then ensure to return our last read chunk
516
if (typeof chunk !== 'undefined') {
517
const lastReadChunk = chunk;
518
519
// explicitly use undefined here to indicate that we consumed
520
// the chunk, which could have either been null or valued.
521
chunk = undefined;
522
523
return lastReadChunk;
524
}
525
526
// Finally delegate back to the Readable
527
return readable.read();
528
}
529
};
530
}
531
532
/**
533
* Helper to fully read a T stream into a T or consuming
534
* a stream fully, awaiting all the events without caring
535
* about the data.
536
*/
537
export function consumeStream<T, R = T>(stream: ReadableStreamEvents<T>, reducer: IReducer<T, R>): Promise<R>;
538
export function consumeStream(stream: ReadableStreamEvents<unknown>): Promise<undefined>;
539
export function consumeStream<T, R = T>(stream: ReadableStreamEvents<T>, reducer?: IReducer<T, R>): Promise<R | undefined> {
540
return new Promise((resolve, reject) => {
541
const chunks: T[] = [];
542
543
listenStream(stream, {
544
onData: chunk => {
545
if (reducer) {
546
chunks.push(chunk);
547
}
548
},
549
onError: error => {
550
if (reducer) {
551
reject(error);
552
} else {
553
resolve(undefined);
554
}
555
},
556
onEnd: () => {
557
if (reducer) {
558
resolve(reducer(chunks));
559
} else {
560
resolve(undefined);
561
}
562
}
563
});
564
});
565
}
566
567
export interface IStreamListener<T> {
568
569
/**
570
* The 'data' event is emitted whenever the stream is
571
* relinquishing ownership of a chunk of data to a consumer.
572
*/
573
onData(data: T): void;
574
575
/**
576
* Emitted when any error occurs.
577
*/
578
onError(err: Error): void;
579
580
/**
581
* The 'end' event is emitted when there is no more data
582
* to be consumed from the stream. The 'end' event will
583
* not be emitted unless the data is completely consumed.
584
*/
585
onEnd(): void;
586
}
587
588
/**
589
* Helper to listen to all events of a T stream in proper order.
590
*/
591
export function listenStream<T>(stream: ReadableStreamEvents<T>, listener: IStreamListener<T>, token?: CancellationToken): void {
592
593
stream.on('error', error => {
594
if (!token?.isCancellationRequested) {
595
listener.onError(error);
596
}
597
});
598
599
stream.on('end', () => {
600
if (!token?.isCancellationRequested) {
601
listener.onEnd();
602
}
603
});
604
605
// Adding the `data` listener will turn the stream
606
// into flowing mode. As such it is important to
607
// add this listener last (DO NOT CHANGE!)
608
stream.on('data', data => {
609
if (!token?.isCancellationRequested) {
610
listener.onData(data);
611
}
612
});
613
}
614
615
/**
616
* Helper to peek up to `maxChunks` into a stream. The return type signals if
617
* the stream has ended or not. If not, caller needs to add a `data` listener
618
* to continue reading.
619
*/
620
export function peekStream<T>(stream: ReadableStream<T>, maxChunks: number): Promise<ReadableBufferedStream<T>> {
621
return new Promise((resolve, reject) => {
622
const streamListeners = new DisposableStore();
623
const buffer: T[] = [];
624
625
// Data Listener
626
const dataListener = (chunk: T) => {
627
628
// Add to buffer
629
buffer.push(chunk);
630
631
// We reached maxChunks and thus need to return
632
if (buffer.length > maxChunks) {
633
634
// Dispose any listeners and ensure to pause the
635
// stream so that it can be consumed again by caller
636
streamListeners.dispose();
637
stream.pause();
638
639
return resolve({ stream, buffer, ended: false });
640
}
641
};
642
643
// Error Listener
644
const errorListener = (error: Error) => {
645
streamListeners.dispose();
646
647
return reject(error);
648
};
649
650
// End Listener
651
const endListener = () => {
652
streamListeners.dispose();
653
654
return resolve({ stream, buffer, ended: true });
655
};
656
657
streamListeners.add(toDisposable(() => stream.removeListener('error', errorListener)));
658
stream.on('error', errorListener);
659
660
streamListeners.add(toDisposable(() => stream.removeListener('end', endListener)));
661
stream.on('end', endListener);
662
663
// Important: leave the `data` listener last because
664
// this can turn the stream into flowing mode and we
665
// want `error` events to be received as well.
666
streamListeners.add(toDisposable(() => stream.removeListener('data', dataListener)));
667
stream.on('data', dataListener);
668
});
669
}
670
671
/**
672
* Helper to create a readable stream from an existing T.
673
*/
674
export function toStream<T>(t: T, reducer: IReducer<T>): ReadableStream<T> {
675
const stream = newWriteableStream<T>(reducer);
676
677
stream.end(t);
678
679
return stream;
680
}
681
682
/**
683
* Helper to create an empty stream
684
*/
685
export function emptyStream(): ReadableStream<never> {
686
const stream = newWriteableStream<never>(() => { throw new Error('not supported'); });
687
stream.end();
688
689
return stream;
690
}
691
692
/**
693
* Helper to convert a T into a Readable<T>.
694
*/
695
export function toReadable<T>(t: T): Readable<T> {
696
let consumed = false;
697
698
return {
699
read: () => {
700
if (consumed) {
701
return null;
702
}
703
704
consumed = true;
705
706
return t;
707
}
708
};
709
}
710
711
/**
712
* Helper to transform a readable stream into another stream.
713
*/
714
export function transform<Original, Transformed>(stream: ReadableStreamEvents<Original>, transformer: ITransformer<Original, Transformed>, reducer: IReducer<Transformed>): ReadableStream<Transformed> {
715
const target = newWriteableStream<Transformed>(reducer);
716
717
listenStream(stream, {
718
onData: data => target.write(transformer.data(data)),
719
onError: error => target.error(transformer.error ? transformer.error(error) : error),
720
onEnd: () => target.end()
721
});
722
723
return target;
724
}
725
726
/**
727
* Helper to take an existing readable that will
728
* have a prefix injected to the beginning.
729
*/
730
export function prefixedReadable<T>(prefix: T, readable: Readable<T>, reducer: IReducer<T>): Readable<T> {
731
let prefixHandled = false;
732
733
return {
734
read: () => {
735
const chunk = readable.read();
736
737
// Handle prefix only once
738
if (!prefixHandled) {
739
prefixHandled = true;
740
741
// If we have also a read-result, make
742
// sure to reduce it to a single result
743
if (chunk !== null) {
744
return reducer([prefix, chunk]);
745
}
746
747
// Otherwise, just return prefix directly
748
return prefix;
749
}
750
751
return chunk;
752
}
753
};
754
}
755
756
/**
757
* Helper to take an existing stream that will
758
* have a prefix injected to the beginning.
759
*/
760
export function prefixedStream<T>(prefix: T, stream: ReadableStream<T>, reducer: IReducer<T>): ReadableStream<T> {
761
let prefixHandled = false;
762
763
const target = newWriteableStream<T>(reducer);
764
765
listenStream(stream, {
766
onData: data => {
767
768
// Handle prefix only once
769
if (!prefixHandled) {
770
prefixHandled = true;
771
772
return target.write(reducer([prefix, data]));
773
}
774
775
return target.write(data);
776
},
777
onError: error => target.error(error),
778
onEnd: () => {
779
780
// Handle prefix only once
781
if (!prefixHandled) {
782
prefixHandled = true;
783
784
target.write(prefix);
785
}
786
787
target.end();
788
}
789
});
790
791
return target;
792
}
793
794