Path: blob/main/extensions/copilot/src/extension/prompts/node/inline/utils/streaming.ts
13406 views
/*---------------------------------------------------------------------------------------------1* Copyright (c) Microsoft Corporation. All rights reserved.2* Licensed under the MIT License. See License.txt in the project root for license information.3*--------------------------------------------------------------------------------------------*/45import { AsyncIterableObject } from '../../../../../util/vs/base/common/async';67export async function* replaceStringInStream(stream: AsyncIterable<string>, searchValue: string, replaceValue: string): AsyncIterable<string> {8let buffer = '';910const searchValuePrefixes = getPrefixes(searchValue);11searchValuePrefixes.reverse(); // longest first1213for await (const chunk of stream) {14buffer += chunk;1516let searchIndex: number;17let lastIndex = 0;1819let textToYield = '';2021// Process the buffer in chunks, but retain potential partial matches at the end.22while ((searchIndex = buffer.indexOf(searchValue, lastIndex)) !== -1) {23textToYield += buffer.slice(lastIndex, searchIndex) + replaceValue;24lastIndex = searchIndex + searchValue.length;25}2627// Retain the remaining buffer that could be part of the next `searchValue`.28if (lastIndex !== 0) {29buffer = buffer.slice(lastIndex);30}3132// At this point, buffer does not contain any `searchValue` anymore.33// However, there could be a future chunk c, such that `buffer + c.substring(0, searchValue.length - 1)` contains it,34// so we cannot yield the full buffer.3536// longest prefix first37for (const p of searchValuePrefixes) {38if (buffer.endsWith(p)) {39const idx = buffer.length - p.length;4041textToYield += buffer.slice(0, idx);42buffer = buffer.slice(idx);4344break;45}46}4748if (textToYield.length > 0) {49yield textToYield;50}51}5253// Yield any remaining buffer content that didn't match `searchValue`.54if (buffer.length > 0) {55yield buffer;56}57}5859function getPrefixes(value: string): string[] {60const prefixes: string[] = [];61for (let i = 0; i <= value.length; i++) {62prefixes.push(value.substring(0, i));63}64return prefixes;65}6667export type StreamPipe<T> = (stream: AsyncIterable<T>) => AsyncIterable<T>;6869export namespace StreamPipe {70export function identity<T>(): StreamPipe<T> {71return stream => stream;72}7374export function discard<T>(): StreamPipe<T> {75return _stream => AsyncIterableObject.EMPTY;76}7778export function chain<T>(...pipes: StreamPipe<T>[]): StreamPipe<T> {79return stream => pipes.reduce((s, pipe) => pipe(s), stream);80}81}8283export function forEachStreamed<T>(stream: AsyncIterable<T>, fn: (item: T) => void): Promise<void> {84return (async () => {85for await (const item of stream) {86fn(item);87}88})();89}909192