Path: blob/main/extensions/copilot/src/util/vs/base/common/stream.ts
13405 views
//!!! DO NOT modify, this file was COPIED from 'microsoft/vscode'12/*---------------------------------------------------------------------------------------------3* Copyright (c) Microsoft Corporation. All rights reserved.4* Licensed under the MIT License. See License.txt in the project root for license information.5*--------------------------------------------------------------------------------------------*/67import { CancellationToken } from './cancellation';8import { onUnexpectedError } from './errors';9import { DisposableStore, toDisposable } from './lifecycle';1011/**12* The payload that flows in readable stream events.13*/14export type ReadableStreamEventPayload<T> = T | Error | 'end';1516export interface ReadableStreamEvents<T> {1718/**19* The 'data' event is emitted whenever the stream is20* relinquishing ownership of a chunk of data to a consumer.21*22* NOTE: PLEASE UNDERSTAND THAT ADDING A DATA LISTENER CAN23* TURN THE STREAM INTO FLOWING MODE. IT IS THEREFOR THE24* LAST LISTENER THAT SHOULD BE ADDED AND NOT THE FIRST25*26* Use `listenStream` as a helper method to listen to27* stream events in the right order.28*/29on(event: 'data', callback: (data: T) => void): void;3031/**32* Emitted when any error occurs.33*/34on(event: 'error', callback: (err: Error) => void): void;3536/**37* The 'end' event is emitted when there is no more data38* to be consumed from the stream. The 'end' event will39* not be emitted unless the data is completely consumed.40*/41on(event: 'end', callback: () => void): void;42}4344/**45* A interface that emulates the API shape of a node.js readable46* stream for use in native and web environments.47*/48export interface ReadableStream<T> extends ReadableStreamEvents<T> {4950/**51* Stops emitting any events until resume() is called.52*/53pause(): void;5455/**56* Starts emitting events again after pause() was called.57*/58resume(): void;5960/**61* Destroys the stream and stops emitting any event.62*/63destroy(): void;6465/**66* Allows to remove a listener that was previously added.67*/68removeListener(event: string, callback: Function): void;69}7071/**72* A interface that emulates the API shape of a node.js readable73* for use in native and web environments.74*/75export interface Readable<T> {7677/**78* Read data from the underlying source. Will return79* null to indicate that no more data can be read.80*/81read(): T | null;82}8384export function isReadable<T>(obj: unknown): obj is Readable<T> {85const candidate = obj as Readable<T> | undefined;86if (!candidate) {87return false;88}8990return typeof candidate.read === 'function';91}9293/**94* A interface that emulates the API shape of a node.js writeable95* stream for use in native and web environments.96*/97export interface WriteableStream<T> extends ReadableStream<T> {9899/**100* Writing data to the stream will trigger the on('data')101* event listener if the stream is flowing and buffer the102* data otherwise until the stream is flowing.103*104* If a `highWaterMark` is configured and writing to the105* stream reaches this mark, a promise will be returned106* that should be awaited on before writing more data.107* Otherwise there is a risk of buffering a large number108* of data chunks without consumer.109*/110write(data: T): void | Promise<void>;111112/**113* Signals an error to the consumer of the stream via the114* on('error') handler if the stream is flowing.115*116* NOTE: call `end` to signal that the stream has ended,117* this DOES NOT happen automatically from `error`.118*/119error(error: Error): void;120121/**122* Signals the end of the stream to the consumer. If the123* result is provided, will trigger the on('data') event124* listener if the stream is flowing and buffer the data125* otherwise until the stream is flowing.126*/127end(result?: T): void;128}129130/**131* A stream that has a buffer already read. Returns the original stream132* that was read as well as the chunks that got read.133*134* The `ended` flag indicates if the stream has been fully consumed.135*/136export interface ReadableBufferedStream<T> {137138/**139* The original stream that is being read.140*/141stream: ReadableStream<T>;142143/**144* An array of chunks already read from this stream.145*/146buffer: T[];147148/**149* Signals if the stream has ended or not. If not, consumers150* should continue to read from the stream until consumed.151*/152ended: boolean;153}154155export function isReadableStream<T>(obj: unknown): obj is ReadableStream<T> {156const candidate = obj as ReadableStream<T> | undefined;157if (!candidate) {158return false;159}160161return [candidate.on, candidate.pause, candidate.resume, candidate.destroy].every(fn => typeof fn === 'function');162}163164export function isReadableBufferedStream<T>(obj: unknown): obj is ReadableBufferedStream<T> {165const candidate = obj as ReadableBufferedStream<T> | undefined;166if (!candidate) {167return false;168}169170return isReadableStream(candidate.stream) && Array.isArray(candidate.buffer) && typeof candidate.ended === 'boolean';171}172173export interface IReducer<T, R = T> {174(data: T[]): R;175}176177export interface IDataTransformer<Original, Transformed> {178(data: Original): Transformed;179}180181export interface IErrorTransformer {182(error: Error): Error;183}184185export interface ITransformer<Original, Transformed> {186data: IDataTransformer<Original, Transformed>;187error?: IErrorTransformer;188}189190export function newWriteableStream<T>(reducer: IReducer<T> | null, options?: WriteableStreamOptions): WriteableStream<T> {191return new WriteableStreamImpl<T>(reducer, options);192}193194export interface WriteableStreamOptions {195196/**197* The number of objects to buffer before WriteableStream#write()198* signals back that the buffer is full. Can be used to reduce199* the memory pressure when the stream is not flowing.200*/201highWaterMark?: number;202}203204class WriteableStreamImpl<T> implements WriteableStream<T> {205206private readonly state = {207flowing: false,208ended: false,209destroyed: false210};211212private readonly buffer = {213data: [] as T[],214error: [] as Error[]215};216217private readonly listeners = {218data: [] as { (data: T): void }[],219error: [] as { (error: Error): void }[],220end: [] as { (): void }[]221};222223private readonly pendingWritePromises: Function[] = [];224225/**226* @param reducer a function that reduces the buffered data into a single object;227* because some objects can be complex and non-reducible, we also228* allow passing the explicit `null` value to skip the reduce step229* @param options stream options230*/231constructor(private reducer: IReducer<T> | null, private options?: WriteableStreamOptions) { }232233pause(): void {234if (this.state.destroyed) {235return;236}237238this.state.flowing = false;239}240241resume(): void {242if (this.state.destroyed) {243return;244}245246if (!this.state.flowing) {247this.state.flowing = true;248249// emit buffered events250this.flowData();251this.flowErrors();252this.flowEnd();253}254}255256write(data: T): void | Promise<void> {257if (this.state.destroyed) {258return;259}260261// flowing: directly send the data to listeners262if (this.state.flowing) {263this.emitData(data);264}265266// not yet flowing: buffer data until flowing267else {268this.buffer.data.push(data);269270// highWaterMark: if configured, signal back when buffer reached limits271if (typeof this.options?.highWaterMark === 'number' && this.buffer.data.length > this.options.highWaterMark) {272return new Promise(resolve => this.pendingWritePromises.push(resolve));273}274}275}276277error(error: Error): void {278if (this.state.destroyed) {279return;280}281282// flowing: directly send the error to listeners283if (this.state.flowing) {284this.emitError(error);285}286287// not yet flowing: buffer errors until flowing288else {289this.buffer.error.push(error);290}291}292293end(result?: T): void {294if (this.state.destroyed) {295return;296}297298// end with data if provided299if (typeof result !== 'undefined') {300this.write(result);301}302303// flowing: send end event to listeners304if (this.state.flowing) {305this.emitEnd();306307this.destroy();308}309310// not yet flowing: remember state311else {312this.state.ended = true;313}314}315316private emitData(data: T): void {317this.listeners.data.slice(0).forEach(listener => listener(data)); // slice to avoid listener mutation from delivering event318}319320private emitError(error: Error): void {321if (this.listeners.error.length === 0) {322onUnexpectedError(error); // nobody listened to this error so we log it as unexpected323} else {324this.listeners.error.slice(0).forEach(listener => listener(error)); // slice to avoid listener mutation from delivering event325}326}327328private emitEnd(): void {329this.listeners.end.slice(0).forEach(listener => listener()); // slice to avoid listener mutation from delivering event330}331332on(event: 'data', callback: (data: T) => void): void;333on(event: 'error', callback: (err: Error) => void): void;334on(event: 'end', callback: () => void): void;335on(event: 'data' | 'error' | 'end', callback: ((data: T) => void) | ((err: Error) => void) | (() => void)): void {336if (this.state.destroyed) {337return;338}339340switch (event) {341case 'data':342this.listeners.data.push(callback as (data: T) => void);343344// switch into flowing mode as soon as the first 'data'345// listener is added and we are not yet in flowing mode346this.resume();347348break;349350case 'end':351this.listeners.end.push(callback as () => void);352353// emit 'end' event directly if we are flowing354// and the end has already been reached355//356// finish() when it went through357if (this.state.flowing && this.flowEnd()) {358this.destroy();359}360361break;362363case 'error':364this.listeners.error.push(callback as (err: Error) => void);365366// emit buffered 'error' events unless done already367// now that we know that we have at least one listener368if (this.state.flowing) {369this.flowErrors();370}371372break;373}374}375376removeListener(event: string, callback: Function): void {377if (this.state.destroyed) {378return;379}380381let listeners: unknown[] | undefined = undefined;382383switch (event) {384case 'data':385listeners = this.listeners.data;386break;387388case 'end':389listeners = this.listeners.end;390break;391392case 'error':393listeners = this.listeners.error;394break;395}396397if (listeners) {398const index = listeners.indexOf(callback);399if (index >= 0) {400listeners.splice(index, 1);401}402}403}404405private flowData(): void {406// if buffer is empty, nothing to do407if (this.buffer.data.length === 0) {408return;409}410411// if buffer data can be reduced into a single object,412// emit the reduced data413if (typeof this.reducer === 'function') {414const fullDataBuffer = this.reducer(this.buffer.data);415416this.emitData(fullDataBuffer);417} else {418// otherwise emit each buffered data instance individually419for (const data of this.buffer.data) {420this.emitData(data);421}422}423424this.buffer.data.length = 0;425426// when the buffer is empty, resolve all pending writers427const pendingWritePromises = [...this.pendingWritePromises];428this.pendingWritePromises.length = 0;429pendingWritePromises.forEach(pendingWritePromise => pendingWritePromise());430}431432private flowErrors(): void {433if (this.listeners.error.length > 0) {434for (const error of this.buffer.error) {435this.emitError(error);436}437438this.buffer.error.length = 0;439}440}441442private flowEnd(): boolean {443if (this.state.ended) {444this.emitEnd();445446return this.listeners.end.length > 0;447}448449return false;450}451452destroy(): void {453if (!this.state.destroyed) {454this.state.destroyed = true;455this.state.ended = true;456457this.buffer.data.length = 0;458this.buffer.error.length = 0;459460this.listeners.data.length = 0;461this.listeners.error.length = 0;462this.listeners.end.length = 0;463464this.pendingWritePromises.length = 0;465}466}467}468469/**470* Helper to fully read a T readable into a T.471*/472export function consumeReadable<T>(readable: Readable<T>, reducer: IReducer<T>): T {473const chunks: T[] = [];474475let chunk: T | null;476while ((chunk = readable.read()) !== null) {477chunks.push(chunk);478}479480return reducer(chunks);481}482483/**484* Helper to read a T readable up to a maximum of chunks. If the limit is485* reached, will return a readable instead to ensure all data can still486* be read.487*/488export function peekReadable<T>(readable: Readable<T>, reducer: IReducer<T>, maxChunks: number): T | Readable<T> {489const chunks: T[] = [];490491let chunk: T | null | undefined = undefined;492while ((chunk = readable.read()) !== null && chunks.length < maxChunks) {493chunks.push(chunk);494}495496// If the last chunk is null, it means we reached the end of497// the readable and return all the data at once498if (chunk === null && chunks.length > 0) {499return reducer(chunks);500}501502// Otherwise, we still have a chunk, it means we reached the maxChunks503// value and as such we return a new Readable that first returns504// the existing read chunks and then continues with reading from505// the underlying readable.506return {507read: () => {508509// First consume chunks from our array510if (chunks.length > 0) {511return chunks.shift()!;512}513514// Then ensure to return our last read chunk515if (typeof chunk !== 'undefined') {516const lastReadChunk = chunk;517518// explicitly use undefined here to indicate that we consumed519// the chunk, which could have either been null or valued.520chunk = undefined;521522return lastReadChunk;523}524525// Finally delegate back to the Readable526return readable.read();527}528};529}530531/**532* Helper to fully read a T stream into a T or consuming533* a stream fully, awaiting all the events without caring534* about the data.535*/536export function consumeStream<T, R = T>(stream: ReadableStreamEvents<T>, reducer: IReducer<T, R>): Promise<R>;537export function consumeStream(stream: ReadableStreamEvents<unknown>): Promise<undefined>;538export function consumeStream<T, R = T>(stream: ReadableStreamEvents<T>, reducer?: IReducer<T, R>): Promise<R | undefined> {539return new Promise((resolve, reject) => {540const chunks: T[] = [];541542listenStream(stream, {543onData: chunk => {544if (reducer) {545chunks.push(chunk);546}547},548onError: error => {549if (reducer) {550reject(error);551} else {552resolve(undefined);553}554},555onEnd: () => {556if (reducer) {557resolve(reducer(chunks));558} else {559resolve(undefined);560}561}562});563});564}565566export interface IStreamListener<T> {567568/**569* The 'data' event is emitted whenever the stream is570* relinquishing ownership of a chunk of data to a consumer.571*/572onData(data: T): void;573574/**575* Emitted when any error occurs.576*/577onError(err: Error): void;578579/**580* The 'end' event is emitted when there is no more data581* to be consumed from the stream. The 'end' event will582* not be emitted unless the data is completely consumed.583*/584onEnd(): void;585}586587/**588* Helper to listen to all events of a T stream in proper order.589*/590export function listenStream<T>(stream: ReadableStreamEvents<T>, listener: IStreamListener<T>, token?: CancellationToken): void {591592stream.on('error', error => {593if (!token?.isCancellationRequested) {594listener.onError(error);595}596});597598stream.on('end', () => {599if (!token?.isCancellationRequested) {600listener.onEnd();601}602});603604// Adding the `data` listener will turn the stream605// into flowing mode. As such it is important to606// add this listener last (DO NOT CHANGE!)607stream.on('data', data => {608if (!token?.isCancellationRequested) {609listener.onData(data);610}611});612}613614/**615* Helper to peek up to `maxChunks` into a stream. The return type signals if616* the stream has ended or not. If not, caller needs to add a `data` listener617* to continue reading.618*/619export function peekStream<T>(stream: ReadableStream<T>, maxChunks: number): Promise<ReadableBufferedStream<T>> {620return new Promise((resolve, reject) => {621const streamListeners = new DisposableStore();622const buffer: T[] = [];623624// Data Listener625const dataListener = (chunk: T) => {626627// Add to buffer628buffer.push(chunk);629630// We reached maxChunks and thus need to return631if (buffer.length > maxChunks) {632633// Dispose any listeners and ensure to pause the634// stream so that it can be consumed again by caller635streamListeners.dispose();636stream.pause();637638return resolve({ stream, buffer, ended: false });639}640};641642// Error Listener643const errorListener = (error: Error) => {644streamListeners.dispose();645646return reject(error);647};648649// End Listener650const endListener = () => {651streamListeners.dispose();652653return resolve({ stream, buffer, ended: true });654};655656streamListeners.add(toDisposable(() => stream.removeListener('error', errorListener)));657stream.on('error', errorListener);658659streamListeners.add(toDisposable(() => stream.removeListener('end', endListener)));660stream.on('end', endListener);661662// Important: leave the `data` listener last because663// this can turn the stream into flowing mode and we664// want `error` events to be received as well.665streamListeners.add(toDisposable(() => stream.removeListener('data', dataListener)));666stream.on('data', dataListener);667});668}669670/**671* Helper to create a readable stream from an existing T.672*/673export function toStream<T>(t: T, reducer: IReducer<T>): ReadableStream<T> {674const stream = newWriteableStream<T>(reducer);675676stream.end(t);677678return stream;679}680681/**682* Helper to create an empty stream683*/684export function emptyStream(): ReadableStream<never> {685const stream = newWriteableStream<never>(() => { throw new Error('not supported'); });686stream.end();687688return stream;689}690691/**692* Helper to convert a T into a Readable<T>.693*/694export function toReadable<T>(t: T): Readable<T> {695let consumed = false;696697return {698read: () => {699if (consumed) {700return null;701}702703consumed = true;704705return t;706}707};708}709710/**711* Helper to transform a readable stream into another stream.712*/713export function transform<Original, Transformed>(stream: ReadableStreamEvents<Original>, transformer: ITransformer<Original, Transformed>, reducer: IReducer<Transformed>): ReadableStream<Transformed> {714const target = newWriteableStream<Transformed>(reducer);715716listenStream(stream, {717onData: data => target.write(transformer.data(data)),718onError: error => target.error(transformer.error ? transformer.error(error) : error),719onEnd: () => target.end()720});721722return target;723}724725/**726* Helper to take an existing readable that will727* have a prefix injected to the beginning.728*/729export function prefixedReadable<T>(prefix: T, readable: Readable<T>, reducer: IReducer<T>): Readable<T> {730let prefixHandled = false;731732return {733read: () => {734const chunk = readable.read();735736// Handle prefix only once737if (!prefixHandled) {738prefixHandled = true;739740// If we have also a read-result, make741// sure to reduce it to a single result742if (chunk !== null) {743return reducer([prefix, chunk]);744}745746// Otherwise, just return prefix directly747return prefix;748}749750return chunk;751}752};753}754755/**756* Helper to take an existing stream that will757* have a prefix injected to the beginning.758*/759export function prefixedStream<T>(prefix: T, stream: ReadableStream<T>, reducer: IReducer<T>): ReadableStream<T> {760let prefixHandled = false;761762const target = newWriteableStream<T>(reducer);763764listenStream(stream, {765onData: data => {766767// Handle prefix only once768if (!prefixHandled) {769prefixHandled = true;770771return target.write(reducer([prefix, data]));772}773774return target.write(data);775},776onError: error => target.error(error),777onEnd: () => {778779// Handle prefix only once780if (!prefixHandled) {781prefixHandled = true;782783target.write(prefix);784}785786target.end();787}788});789790return target;791}792793794