Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/extensions/copilot/src/platform/networking/common/fetcherService.ts
13401 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
import { createServiceIdentifier } from '../../../util/common/services';
7
import { Event } from '../../../util/vs/base/common/event';
8
9
export const IFetcherService = createServiceIdentifier<IFetcherService>('IFetcherService');
10
11
/** Use as the callSite value to suppress fetch telemetry for a request (e.g. from the telemetry service itself). */
12
export const NO_FETCH_TELEMETRY = 'NO_FETCH_TELEMETRY';
13
14
export interface IFetcherService {
15
readonly _serviceBrand: undefined;
16
readonly onDidFetch: Event<FetchEvent>;
17
readonly onDidCompleteFetch: Event<FetchTelemetryEvent>;
18
getUserAgentLibrary(): string;
19
fetch(url: string, options: FetchOptions): Promise<Response>;
20
createWebSocket(url: string, options?: WebSocketConnectOptions): WebSocketConnection;
21
disconnectAll(): Promise<unknown>;
22
makeAbortController(): IAbortController;
23
isAbortError(e: any): boolean;
24
isInternetDisconnectedError(e: any): boolean;
25
isFetcherError(e: any): boolean;
26
isNetworkProcessCrashedError(e: any): boolean;
27
getUserMessageForFetcherError(err: any): string;
28
fetchWithPagination<T>(baseUrl: string, options: PaginationOptions<T>): Promise<T[]>;
29
}
30
31
export type FetchEvent = {
32
internalId: string;
33
timestamp: number;
34
outcome: 'success';
35
phase: 'requestResponse';
36
fetcher: FetcherId;
37
hostname: string;
38
statusCode: number;
39
} | {
40
internalId: string;
41
timestamp: number;
42
outcome: 'success';
43
phase: 'responseStreaming';
44
fetcher: FetcherId;
45
hostname: string;
46
bytesReceived: number;
47
} | {
48
internalId: string;
49
timestamp: number;
50
outcome: 'error' | 'cancel';
51
phase: 'requestResponse';
52
fetcher: FetcherId;
53
hostname: string;
54
reason: any;
55
} | {
56
internalId: string;
57
timestamp: number;
58
outcome: 'error' | 'cancel';
59
phase: 'responseStreaming';
60
fetcher: FetcherId;
61
hostname: string;
62
reason: any;
63
bytesReceived: number;
64
};
65
66
export type ReportFetchEvent = (outcome: FetchEvent) => void;
67
68
export interface FetchTelemetryEvent {
69
callSite: string;
70
hostname: string;
71
latencyMs: number;
72
statusCode: number | undefined;
73
success: boolean;
74
}
75
76
/** A basic version of http://developer.mozilla.org/en-US/docs/Web/API/Response */
77
export class Response {
78
ok = this.status >= 200 && this.status < 300;
79
readonly body: DestroyableStream<Uint8Array>;
80
private _bytesReceived = 0;
81
82
get bytesReceived(): number {
83
return this._bytesReceived;
84
}
85
86
constructor(
87
readonly status: number,
88
readonly statusText: string,
89
readonly headers: IHeaders,
90
body: ReadableStream<Uint8Array> | null,
91
readonly fetcher: FetcherId,
92
private readonly _reportEvent: ReportFetchEvent,
93
private readonly _internalId: string,
94
private readonly _hostname: string,
95
) {
96
const transformer = {
97
transform: (chunk: Uint8Array, controller: TransformStreamDefaultController<Uint8Array>) => {
98
this._bytesReceived += chunk.length;
99
controller.enqueue(chunk);
100
},
101
flush: () => {
102
this._reportEvent({ internalId: this._internalId, timestamp: Date.now(), outcome: 'success', phase: 'responseStreaming', fetcher: this.fetcher, hostname: this._hostname, bytesReceived: this._bytesReceived });
103
},
104
cancel: (reason: any) => {
105
const outcome = reason && !isAbortError(reason) ? 'error' as const : 'cancel' as const;
106
this._reportEvent({ internalId: this._internalId, timestamp: Date.now(), outcome, phase: 'responseStreaming', fetcher: this.fetcher, hostname: this._hostname, reason, bytesReceived: this._bytesReceived });
107
}
108
};
109
const countingStream = new TransformStream<Uint8Array, Uint8Array>(transformer);
110
const inputStream = body ?? new ReadableStream({ start(c) { c.close(); } });
111
this.body = new DestroyableStream(inputStream.pipeThrough(countingStream));
112
}
113
114
static fromText(status: number, statusText: string, headers: IHeaders, body: string, fetcher: FetcherId): Response {
115
return new Response(
116
status,
117
statusText,
118
headers,
119
new ReadableStream({
120
start(controller) {
121
controller.enqueue(new TextEncoder().encode(body));
122
controller.close();
123
}
124
}),
125
fetcher,
126
() => { },
127
'in-memory',
128
'in-memory',
129
);
130
}
131
132
async text(): Promise<string> {
133
const chunks: Uint8Array[] = [];
134
for await (const chunk of this.body) {
135
chunks.push(chunk);
136
}
137
const totalLength = chunks.reduce((sum, chunk) => sum + chunk.length, 0);
138
const result = new Uint8Array(totalLength);
139
let offset = 0;
140
for (const chunk of chunks) {
141
result.set(chunk, offset);
142
offset += chunk.length;
143
}
144
return new TextDecoder().decode(result);
145
}
146
147
async json(): Promise<any> {
148
return JSON.parse(await this.text());
149
}
150
}
151
152
export type FetcherId = 'electron-fetch' | 'node-fetch' | 'node-http' | 'test-stub' | 'helix-fetch';
153
154
/** These are the options we currently use, for ease of reference. */
155
export interface FetchOptions {
156
/** Identifies the call site for telemetry tracking. Use {@link NO_FETCH_TELEMETRY} to suppress. */
157
callSite: string;
158
headers?: { [name: string]: string };
159
body?: string;
160
timeout?: number;
161
/**
162
* If `json` is provided, it will be stringified using `JSON.stringify` and sent as the body with
163
* the `Content-Type` header set to `application/json`.
164
*/
165
json?: unknown;
166
method?: 'GET' | 'POST' | 'PUT';
167
signal?: IAbortSignal;
168
retryFallbacks?: boolean;
169
expectJSON?: boolean;
170
useFetcher?: FetcherId;
171
suppressIntegrationId?: boolean;
172
}
173
174
export interface PaginationOptions<T> extends FetchOptions {
175
pageSize?: number;
176
startPage?: number;
177
getItemsFromResponse: (data: any) => T[];
178
buildUrl: (baseUrl: string, pageSize: number, page: number) => string;
179
}
180
181
export interface WebSocketConnectOptions {
182
headers?: { [name: string]: string };
183
}
184
185
export interface WebSocketConnection {
186
readonly webSocket: WebSocket;
187
readonly responseHeaders: IHeaders;
188
readonly responseStatusCode: number | undefined;
189
readonly responseStatusText: string | undefined;
190
readonly networkError: Error | undefined;
191
}
192
193
export interface IAbortSignal {
194
readonly aborted: boolean;
195
addEventListener(type: 'abort', listener: (this: AbortSignal) => void): void;
196
removeEventListener(type: 'abort', listener: (this: AbortSignal) => void): void;
197
}
198
199
export interface IAbortController {
200
readonly signal: IAbortSignal;
201
abort(): void;
202
}
203
204
export interface IHeaders extends Iterable<[string, string]> {
205
get(name: string): string | null;
206
}
207
208
export class HeadersImpl implements IHeaders {
209
constructor(private readonly _record: Readonly<Record<string, string | string[] | undefined>>) { }
210
211
static fromMap(map: ReadonlyMap<string, string>): HeadersImpl {
212
return new HeadersImpl(Object.fromEntries(map));
213
}
214
215
get(name: string): string | null {
216
const result = this._record[name];
217
return Array.isArray(result) ? result[0] : result ?? null;
218
}
219
220
[Symbol.iterator](): Iterator<[string, string]> {
221
const keys = Object.keys(this._record);
222
let index = 0;
223
return {
224
next: (): IteratorResult<[string, string]> => {
225
if (index >= keys.length) {
226
return { done: true, value: undefined };
227
}
228
const key = keys[index++];
229
return { done: false, value: [key, this.get(key)!] };
230
}
231
};
232
}
233
}
234
235
/**
236
* Wraps a ReadableStream to allow cancellation even while a `for await` loop
237
* holds the stream locked. Use `destroy()` to safely cancel from an external
238
* callback (e.g., `onReturn`) - it cancels through the reader if locked.
239
*
240
* When `pipeThrough()` is called, destroy() will forward to the piped stream.
241
*/
242
export class DestroyableStream<T> implements AsyncIterable<T> {
243
private reader: ReadableStreamDefaultReader<T> | undefined;
244
private pipedHead: DestroyableStream<unknown> | undefined;
245
246
constructor(private readonly stream: ReadableStream<T>) { }
247
248
/**
249
* Returns the underlying ReadableStream for APIs that require it
250
* (e.g., Readable.fromWeb). Use with caution as operations on the
251
* returned stream bypass the DestroyableStream's reader tracking.
252
*/
253
toReadableStream(): ReadableStream<T> {
254
return this.stream;
255
}
256
257
/**
258
* Pipes this stream through a transform stream.
259
* Returns a new DestroyableStream wrapping the transformed stream.
260
* Calling destroy() on this stream will forward to the piped stream.
261
*/
262
pipeThrough<U>(transform: { readable: ReadableStream<U>; writable: WritableStream<T> }): DestroyableStream<U> {
263
const piped = new DestroyableStream(this.stream.pipeThrough(transform));
264
this.pipedHead = piped;
265
return piped;
266
}
267
268
async *[Symbol.asyncIterator](): AsyncGenerator<T, void, undefined> {
269
this.reader = this.stream.getReader();
270
try {
271
while (true) {
272
const { done, value } = await this.reader.read();
273
if (done) {
274
break;
275
}
276
yield value;
277
}
278
} finally {
279
this.reader.releaseLock();
280
this.reader = undefined;
281
}
282
}
283
284
destroy(): Promise<void> {
285
// Forward to piped stream if pipeThrough was called
286
if (this.pipedHead) {
287
return this.pipedHead.destroy();
288
}
289
if (this.reader) {
290
// Cancels the underlying stream and releases the lock
291
return this.reader.cancel();
292
} else {
293
// If stream was consumed and unlocked, cancel() is a no-op
294
return this.stream.cancel();
295
}
296
}
297
}
298
299
export async function jsonVerboseError(resp: Response) {
300
const text = await resp.text();
301
try {
302
return JSON.parse(text);
303
} catch (err) {
304
const lines = text.split('\n');
305
const errText = lines.length > 50 ? [...lines.slice(0, 25), '[...]', ...lines.slice(lines.length - 25)].join('\n') : text;
306
err.message = `${err.message}. Response: ${errText}`;
307
throw err;
308
}
309
}
310
311
export function isAbortError(e: any): boolean {
312
// see https://github.com/nodejs/node/issues/38361#issuecomment-1683839467
313
return e && e.name === 'AbortError';
314
}
315
316
export function safeGetHostname(url: string): string {
317
try {
318
return new URL(url).hostname;
319
} catch {
320
return 'unknown';
321
}
322
}
323
324