Path: blob/main/extensions/copilot/src/util/node/worker.ts
13397 views
/*---------------------------------------------------------------------------------------------1* Copyright (c) Microsoft Corporation. All rights reserved.2* Licensed under the MIT License. See License.txt in the project root for license information.3*--------------------------------------------------------------------------------------------*/45import { Worker, WorkerOptions } from 'worker_threads';67export type RpcRequest = { id: number; fn: string; args: any[] };89export type RpcResponse = { id: number; res?: any; err?: Error };1011/**12* Holds promises for RPC requests and resolves them when the call completes.13*/14export class RcpResponseHandler {15private nextId = 1;1617private readonly handlers = new Map<number, { resolve: (res: any) => void; reject: (err: Error) => void }>();1819public createHandler<T>(): { id: number; result: Promise<T> } {20const id = this.nextId++;21let resolve: (res: any) => void;22let reject: (err: Error) => void;23const result = new Promise<any>((res, rej) => {24resolve = res;25reject = rej;26});27this.handlers.set(id, { resolve: resolve!, reject: reject! });28return { id, result };29}3031public handleResponse(response: RpcResponse) {32const handler = this.handlers.get(response.id);33if (!handler) {34return;35}3637this.handlers.delete(response.id);38if (response.err) {39handler.reject(response.err);40} else {41handler.resolve(response.res);42}43}4445/**46* Handle an unexpected error by logging it and rejecting all handlers.47*/48public handleError(err: Error) {49for (const handler of this.handlers.values()) {50handler.reject(err);51}52this.handlers.clear();53}5455public clear() {56this.handlers.clear();57}58}5960export type RpcProxy<ProxyType> = {61[K in keyof ProxyType]: ProxyType[K] extends ((...args: infer Args) => infer R) ? (...args: Args) => Promise<Awaited<R>> : never;62};6364export function createRpcProxy<ProxyType>(remoteCall: (name: string, args: any[]) => Promise<any>): RpcProxy<ProxyType> {65const handler = {66get: (target: any, name: PropertyKey) => {67if (typeof name === 'string' && !target[name]) {68target[name] = (...myArgs: any[]) => {69return remoteCall(name, myArgs);70};71}72return target[name];73}74};75return new Proxy(Object.create(null), handler);76}7778export class WorkerWithRpcProxy<WorkerProxyType, HostProxyType = {}> {79private readonly worker: Worker;80private readonly responseHandler = new RcpResponseHandler();8182public readonly proxy: RpcProxy<WorkerProxyType>;8384constructor(workerPath: string, workerOptions?: WorkerOptions, host?: HostProxyType) {85this.worker = new Worker(workerPath, workerOptions);86this.worker.on('message', async (msg: RpcRequest | RpcResponse) => {87if ('fn' in msg) {88try {89const response = await (host as any)?.[msg.fn].apply(host, msg.args);90this.worker.postMessage({ id: msg.id, res: response } satisfies RpcResponse);91} catch (err) {92this.worker.postMessage({ id: msg.id, err } satisfies RpcResponse);93}94} else {95this.responseHandler.handleResponse(msg);96}97});98this.worker.on('error', (err) => this.handleError(err));99100this.worker.on('exit', code => {101if (code !== 0) {102this.handleError(new Error(`Worker thread exited with code ${code}.`));103}104});105106this.proxy = createRpcProxy((fn: string, args: any[]): Promise<any> => {107if (!this.worker) {108throw new Error(`Worker was terminated!`);109}110111const { id, result } = this.responseHandler.createHandler<any>();112this.worker.postMessage({ id, fn, args } satisfies RpcRequest);113return result;114});115}116117terminate() {118this.worker.removeAllListeners();119this.worker.terminate();120this.responseHandler.clear();121}122123/**124* Handle an unexpected error by logging it and rejecting all handlers.125*/126private handleError(err: Error) {127this.responseHandler.handleError(err);128}129}130131132