Path: blob/main/extensions/copilot/src/platform/networking/common/fetcherService.ts
13401 views
/*---------------------------------------------------------------------------------------------1* Copyright (c) Microsoft Corporation. All rights reserved.2* Licensed under the MIT License. See License.txt in the project root for license information.3*--------------------------------------------------------------------------------------------*/45import { createServiceIdentifier } from '../../../util/common/services';6import { Event } from '../../../util/vs/base/common/event';78export const IFetcherService = createServiceIdentifier<IFetcherService>('IFetcherService');910/** Use as the callSite value to suppress fetch telemetry for a request (e.g. from the telemetry service itself). */11export const NO_FETCH_TELEMETRY = 'NO_FETCH_TELEMETRY';1213export interface IFetcherService {14readonly _serviceBrand: undefined;15readonly onDidFetch: Event<FetchEvent>;16readonly onDidCompleteFetch: Event<FetchTelemetryEvent>;17getUserAgentLibrary(): string;18fetch(url: string, options: FetchOptions): Promise<Response>;19createWebSocket(url: string, options?: WebSocketConnectOptions): WebSocketConnection;20disconnectAll(): Promise<unknown>;21makeAbortController(): IAbortController;22isAbortError(e: any): boolean;23isInternetDisconnectedError(e: any): boolean;24isFetcherError(e: any): boolean;25isNetworkProcessCrashedError(e: any): boolean;26getUserMessageForFetcherError(err: any): string;27fetchWithPagination<T>(baseUrl: string, options: PaginationOptions<T>): Promise<T[]>;28}2930export type FetchEvent = {31internalId: string;32timestamp: number;33outcome: 'success';34phase: 'requestResponse';35fetcher: FetcherId;36hostname: string;37statusCode: number;38} | {39internalId: string;40timestamp: number;41outcome: 'success';42phase: 'responseStreaming';43fetcher: FetcherId;44hostname: string;45bytesReceived: number;46} | {47internalId: string;48timestamp: number;49outcome: 'error' | 'cancel';50phase: 'requestResponse';51fetcher: FetcherId;52hostname: string;53reason: any;54} | {55internalId: string;56timestamp: number;57outcome: 'error' | 'cancel';58phase: 'responseStreaming';59fetcher: FetcherId;60hostname: string;61reason: any;62bytesReceived: number;63};6465export type ReportFetchEvent = (outcome: FetchEvent) => void;6667export interface FetchTelemetryEvent {68callSite: string;69hostname: string;70latencyMs: number;71statusCode: number | undefined;72success: boolean;73}7475/** A basic version of http://developer.mozilla.org/en-US/docs/Web/API/Response */76export class Response {77ok = this.status >= 200 && this.status < 300;78readonly body: DestroyableStream<Uint8Array>;79private _bytesReceived = 0;8081get bytesReceived(): number {82return this._bytesReceived;83}8485constructor(86readonly status: number,87readonly statusText: string,88readonly headers: IHeaders,89body: ReadableStream<Uint8Array> | null,90readonly fetcher: FetcherId,91private readonly _reportEvent: ReportFetchEvent,92private readonly _internalId: string,93private readonly _hostname: string,94) {95const transformer = {96transform: (chunk: Uint8Array, controller: TransformStreamDefaultController<Uint8Array>) => {97this._bytesReceived += chunk.length;98controller.enqueue(chunk);99},100flush: () => {101this._reportEvent({ internalId: this._internalId, timestamp: Date.now(), outcome: 'success', phase: 'responseStreaming', fetcher: this.fetcher, hostname: this._hostname, bytesReceived: this._bytesReceived });102},103cancel: (reason: any) => {104const outcome = reason && !isAbortError(reason) ? 'error' as const : 'cancel' as const;105this._reportEvent({ internalId: this._internalId, timestamp: Date.now(), outcome, phase: 'responseStreaming', fetcher: this.fetcher, hostname: this._hostname, reason, bytesReceived: this._bytesReceived });106}107};108const countingStream = new TransformStream<Uint8Array, Uint8Array>(transformer);109const inputStream = body ?? new ReadableStream({ start(c) { c.close(); } });110this.body = new DestroyableStream(inputStream.pipeThrough(countingStream));111}112113static fromText(status: number, statusText: string, headers: IHeaders, body: string, fetcher: FetcherId): Response {114return new Response(115status,116statusText,117headers,118new ReadableStream({119start(controller) {120controller.enqueue(new TextEncoder().encode(body));121controller.close();122}123}),124fetcher,125() => { },126'in-memory',127'in-memory',128);129}130131async text(): Promise<string> {132const chunks: Uint8Array[] = [];133for await (const chunk of this.body) {134chunks.push(chunk);135}136const totalLength = chunks.reduce((sum, chunk) => sum + chunk.length, 0);137const result = new Uint8Array(totalLength);138let offset = 0;139for (const chunk of chunks) {140result.set(chunk, offset);141offset += chunk.length;142}143return new TextDecoder().decode(result);144}145146async json(): Promise<any> {147return JSON.parse(await this.text());148}149}150151export type FetcherId = 'electron-fetch' | 'node-fetch' | 'node-http' | 'test-stub' | 'helix-fetch';152153/** These are the options we currently use, for ease of reference. */154export interface FetchOptions {155/** Identifies the call site for telemetry tracking. Use {@link NO_FETCH_TELEMETRY} to suppress. */156callSite: string;157headers?: { [name: string]: string };158body?: string;159timeout?: number;160/**161* If `json` is provided, it will be stringified using `JSON.stringify` and sent as the body with162* the `Content-Type` header set to `application/json`.163*/164json?: unknown;165method?: 'GET' | 'POST' | 'PUT';166signal?: IAbortSignal;167retryFallbacks?: boolean;168expectJSON?: boolean;169useFetcher?: FetcherId;170suppressIntegrationId?: boolean;171}172173export interface PaginationOptions<T> extends FetchOptions {174pageSize?: number;175startPage?: number;176getItemsFromResponse: (data: any) => T[];177buildUrl: (baseUrl: string, pageSize: number, page: number) => string;178}179180export interface WebSocketConnectOptions {181headers?: { [name: string]: string };182}183184export interface WebSocketConnection {185readonly webSocket: WebSocket;186readonly responseHeaders: IHeaders;187readonly responseStatusCode: number | undefined;188readonly responseStatusText: string | undefined;189readonly networkError: Error | undefined;190}191192export interface IAbortSignal {193readonly aborted: boolean;194addEventListener(type: 'abort', listener: (this: AbortSignal) => void): void;195removeEventListener(type: 'abort', listener: (this: AbortSignal) => void): void;196}197198export interface IAbortController {199readonly signal: IAbortSignal;200abort(): void;201}202203export interface IHeaders extends Iterable<[string, string]> {204get(name: string): string | null;205}206207export class HeadersImpl implements IHeaders {208constructor(private readonly _record: Readonly<Record<string, string | string[] | undefined>>) { }209210static fromMap(map: ReadonlyMap<string, string>): HeadersImpl {211return new HeadersImpl(Object.fromEntries(map));212}213214get(name: string): string | null {215const result = this._record[name];216return Array.isArray(result) ? result[0] : result ?? null;217}218219[Symbol.iterator](): Iterator<[string, string]> {220const keys = Object.keys(this._record);221let index = 0;222return {223next: (): IteratorResult<[string, string]> => {224if (index >= keys.length) {225return { done: true, value: undefined };226}227const key = keys[index++];228return { done: false, value: [key, this.get(key)!] };229}230};231}232}233234/**235* Wraps a ReadableStream to allow cancellation even while a `for await` loop236* holds the stream locked. Use `destroy()` to safely cancel from an external237* callback (e.g., `onReturn`) - it cancels through the reader if locked.238*239* When `pipeThrough()` is called, destroy() will forward to the piped stream.240*/241export class DestroyableStream<T> implements AsyncIterable<T> {242private reader: ReadableStreamDefaultReader<T> | undefined;243private pipedHead: DestroyableStream<unknown> | undefined;244245constructor(private readonly stream: ReadableStream<T>) { }246247/**248* Returns the underlying ReadableStream for APIs that require it249* (e.g., Readable.fromWeb). Use with caution as operations on the250* returned stream bypass the DestroyableStream's reader tracking.251*/252toReadableStream(): ReadableStream<T> {253return this.stream;254}255256/**257* Pipes this stream through a transform stream.258* Returns a new DestroyableStream wrapping the transformed stream.259* Calling destroy() on this stream will forward to the piped stream.260*/261pipeThrough<U>(transform: { readable: ReadableStream<U>; writable: WritableStream<T> }): DestroyableStream<U> {262const piped = new DestroyableStream(this.stream.pipeThrough(transform));263this.pipedHead = piped;264return piped;265}266267async *[Symbol.asyncIterator](): AsyncGenerator<T, void, undefined> {268this.reader = this.stream.getReader();269try {270while (true) {271const { done, value } = await this.reader.read();272if (done) {273break;274}275yield value;276}277} finally {278this.reader.releaseLock();279this.reader = undefined;280}281}282283destroy(): Promise<void> {284// Forward to piped stream if pipeThrough was called285if (this.pipedHead) {286return this.pipedHead.destroy();287}288if (this.reader) {289// Cancels the underlying stream and releases the lock290return this.reader.cancel();291} else {292// If stream was consumed and unlocked, cancel() is a no-op293return this.stream.cancel();294}295}296}297298export async function jsonVerboseError(resp: Response) {299const text = await resp.text();300try {301return JSON.parse(text);302} catch (err) {303const lines = text.split('\n');304const errText = lines.length > 50 ? [...lines.slice(0, 25), '[...]', ...lines.slice(lines.length - 25)].join('\n') : text;305err.message = `${err.message}. Response: ${errText}`;306throw err;307}308}309310export function isAbortError(e: any): boolean {311// see https://github.com/nodejs/node/issues/38361#issuecomment-1683839467312return e && e.name === 'AbortError';313}314315export function safeGetHostname(url: string): string {316try {317return new URL(url).hostname;318} catch {319return 'unknown';320}321}322323324