Path: blob/main/extensions/copilot/test/pipeline/pipeline.ts
13389 views
/*---------------------------------------------------------------------------------------------1* Copyright (c) Microsoft Corporation. All rights reserved.2* Licensed under the MIT License. See License.txt in the project root for license information.3*--------------------------------------------------------------------------------------------*/45import { fork } from 'child_process';6import * as fs from 'fs';7import * as os from 'os';8import * as path from 'path';9import { createExtensionUnitTestingServices } from '../../src/extension/test/node/services';10import { ConfigKey, IConfigurationService } from '../../src/platform/configuration/common/configurationService';11import { ResponseFormat } from '../../src/platform/inlineEdits/common/dataTypes/xtabPromptOptions';12import { Limiter } from '../../src/util/vs/base/common/async';13import { applyConfigFile, loadConfigFile } from '../base/simulationContext';14import { NesDatagen, SimulationOptions } from '../base/simulationOptions';15import { assembleSample, ISample, resolveOutputPath, writeSamples } from './output';16import { loadAndParseInput } from './parseInput';17import { generatePromptFromRecording, IGeneratedPrompt } from './promptStep';18import { parseSuggestedEdit, processAllRows } from './replayRecording';19import { generateAllResponses, generateResponse, IResponseGenerationInput } from './responseStep';2021function logErrors(errors: readonly { error: string }[], verbose: boolean, log: (...ps: any[]) => void): void {22if (errors.length > 0 && verbose) {23for (const err of errors) {24console.log(` ${err.error}`);25}26}27}2829function formatElapsed(startTime: number): string {30const elapsed = ((Date.now() - startTime) / 1000).toFixed(1);31return `${elapsed}s`;32}3334export type RunPipelineOptions = {35readonly nesDatagen: NesDatagen | undefined;36/**37* path to config file38*/39readonly configFile: string | undefined;40readonly verbose: number | boolean | undefined;41readonly parallelism: number;42};4344export async function runInputPipeline(opts: RunPipelineOptions, log = console.log.bind(console)): Promise<void> {45const nesDatagenOpts = opts.nesDatagen!;46const inputPath = nesDatagenOpts.input;47if (!opts.configFile) {48throw new Error('nes-datagen requires --config-file');49}50const configs = loadConfigFile(opts.configFile);51const verbose = !!opts.verbose;52const concurrency = opts.parallelism;53const rowOffset = nesDatagenOpts.rowOffset;5455log(`\n=== Pipeline ===`);56log(` Input: ${inputPath}`);57log(` Concurrency: ${concurrency}`);5859// Step 1: Parse input60const { rows, errors } = await loadAndParseInput(inputPath, verbose);61log(` [1/5] Input parsed: ${rows.length} rows, ${errors.length} errors`);62logErrors(errors, verbose, log);6364// Step 2: Replay recordings65const { processed, errors: replayErrors } = processAllRows(rows);66log(` [2/5] Recordings replayed: ${processed.length} ok, ${replayErrors.length} errors`);67logErrors(replayErrors.map(e => ({68error: `[sample ${e.rowIndex + rowOffset}, ${rows[e.rowIndex]?.activeDocumentLanguageId ?? '?'}] ${e.error}`,69})), verbose, log);7071// Step 3: Generate prompts72const serviceCollection = createExtensionUnitTestingServices();73const testAccessor = serviceCollection.createTestingAccessor();7475try {76const configService = testAccessor.get(IConfigurationService);7778await applyConfigFile(configService, configs);7980// Disable interactive debounce for batch mode81await configService.setConfig(ConfigKey.TeamInternal.InlineEditsDebounce, 0);82await configService.setConfig(ConfigKey.TeamInternal.InlineEditsCacheDelay, 0);83await configService.setConfig(ConfigKey.TeamInternal.InlineEditsExtraDebounceEndOfLine, 0);84await configService.setConfig(ConfigKey.TeamInternal.InlineEditsExtraDebounceInlineSuggestion, 0);8586const modelConfig = configService.getConfig(ConfigKey.Advanced.InlineEditsXtabProviderModelConfiguration);87const responseFormat = ResponseFormat.fromPromptingStrategy(modelConfig?.promptingStrategy);8889log(` Local model configuration: ${JSON.stringify(modelConfig)}`);9091const prompts: { originalRowIndex: number; prompt: IGeneratedPrompt }[] = [];92const promptErrors: { originalRowIndex: number; error: string }[] = [];93let promptsCompleted = 0;94const promptStartTime = Date.now();9596const limiter = new Limiter<void>(concurrency);97await Promise.all(processed.map(p =>98limiter.queue(async () => {99const globalIdx = p.originalRowIndex + rowOffset;100const result = await generatePromptFromRecording(testAccessor, p.recordingInfo);101if ('error' in result) {102promptErrors.push({ originalRowIndex: p.originalRowIndex, error: `[sample ${globalIdx}, ${p.row.activeDocumentLanguageId}, ${p.activeFilePath}] ${result.error}` });103} else {104prompts.push({ originalRowIndex: p.originalRowIndex, prompt: result });105}106promptsCompleted++;107if (verbose && (promptsCompleted % 50 === 0 || promptsCompleted === processed.length)) {108console.log(` Progress: ${promptsCompleted}/${processed.length} (${formatElapsed(promptStartTime)})`);109}110})111));112113log(` [3/5] Prompts generated: ${prompts.length} ok, ${promptErrors.length} errors (${formatElapsed(promptStartTime)})`);114logErrors(promptErrors, verbose, log);115116// Step 4: Generate responses117const processedByOriginalIndex = new Map(processed.map(p => [p.originalRowIndex, p]));118const responseInputs: IResponseGenerationInput[] = [];119120for (const { originalRowIndex, prompt } of prompts) {121const p = processedByOriginalIndex.get(originalRowIndex);122if (!p) {123continue;124}125responseInputs.push({126index: originalRowIndex,127oracleEdits: p.nextUserEdit?.edit,128docContent: p.activeDocument.value.get().value,129filePath: p.activeFilePath,130userPrompt: prompt.user,131});132}133134const { responses, errors: responseErrors } = generateAllResponses(responseFormat, responseInputs);135log(` [4/5] Responses generated: ${responses.length} ok, ${responseErrors.length} errors`);136logErrors(responseErrors.map(e => {137const p = processedByOriginalIndex.get(e.index);138return { error: `[sample ${e.index + rowOffset}, ${p?.row.activeDocumentLanguageId ?? '?'}] ${e.error}` };139}), verbose, log);140141// Step 5: Write output142const responseByIndex = new Map(responses.map(r => [r.index, r.response]));143const outputPath = resolveOutputPath(inputPath, nesDatagenOpts.output);144const samples: ISample[] = [];145146for (const { originalRowIndex: index, prompt } of prompts) {147const response = responseByIndex.get(index);148if (!response) {149continue;150}151const p = processedByOriginalIndex.get(index);152if (!p) {153continue;154}155const suggestedEdit = parseSuggestedEdit(p.row.postProcessingOutcome.suggestedEdit);156const modelEdits = suggestedEdit ? [suggestedEdit] as const : undefined;157const modelResult = generateResponse(responseFormat, modelEdits, p.activeDocument.value.get().value, p.activeFilePath, prompt.user);158const formattedModelResponse = 'error' in modelResult ? '' : modelResult.assistant;159samples.push(assembleSample(index + rowOffset, prompt, response, p, responseFormat, formattedModelResponse));160}161162const writeResult = await writeSamples(outputPath, samples);163log(` [5/5] Output written: ${writeResult.written} samples → ${writeResult.outputPath}`);164if (writeResult.skipped > 0) {165log(` Structural validation dropped ${writeResult.skipped} samples`);166if (verbose) {167const grouped = new Map<string, number>();168for (const s of writeResult.skipReasons) {169grouped.set(s.reason, (grouped.get(s.reason) ?? 0) + 1);170}171for (const [reason, count] of grouped) {172log(` ${reason} (×${count})`);173}174}175}176177// Summary178log(`\n Pipeline: Input(${rows.length}) → Replay(${processed.length}) → Prompt(${prompts.length}) → Response(${responses.length}) → Output(${writeResult.written})`);179} finally {180for (const p of processed) {181p.replayer.dispose();182}183testAccessor.dispose();184}185}186187/**188* Run the pipeline in parallel by splitting input across N child processes.189* Each child runs the single-process pipeline on its chunk independently.190*/191export async function runInputPipelineParallel(opts: SimulationOptions): Promise<void> {192const nesDatagenOpts = opts.nesDatagen!;193const inputPath = nesDatagenOpts.input;194const verbose = !!opts.verbose;195196const contents = await fs.promises.readFile(inputPath, 'utf8');197const records = JSON.parse(contents) as unknown[];198const numWorkers = Math.max(1, Math.min(os.cpus().length, opts.parallelism, Math.ceil(records.length / 25)));199200console.log(`\n=== Pipeline (parallel: ${numWorkers} workers) ===`);201console.log(` Input: ${inputPath} (${records.length} rows)\n`);202203if (records.length === 0) {204console.log(` No records to process.`);205return;206}207208const chunkSize = Math.ceil(records.length / numWorkers);209const tmpDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), 'nes-pipeline-'));210211try {212const workerPromises: Promise<void>[] = [];213const resultPaths: string[] = [];214215for (let w = 0; w < numWorkers; w++) {216const start = w * chunkSize;217const chunk = records.slice(start, start + chunkSize);218if (chunk.length === 0) {219continue;220}221222const chunkPath = path.join(tmpDir, `chunk_${w}.json`);223const resultPath = path.join(tmpDir, `result_${w}.json`);224resultPaths.push(resultPath);225226await fs.promises.writeFile(chunkPath, JSON.stringify(chunk));227228const args = [229'nes-datagen',230'--input', chunkPath,231'--config-file', opts.configFile!,232'--out', resultPath,233'--row-offset', String(start),234'--parallelism', String(opts.parallelism),235'--worker',236];237if (verbose) {238args.push('--verbose');239}240241const workerIdx = w;242workerPromises.push(new Promise<void>((resolve, reject) => {243const child = fork(process.argv[1], args, { stdio: 'pipe' });244245// Always drain child output to prevent pipe buffer deadlocks246child.stdout?.on('data', verbose ? (data: Buffer) => {247const lines = data.toString().split('\n').filter(l => l.trim());248for (const line of lines) {249console.log(` [W${workerIdx}] ${line}`);250}251} : () => { });252child.stderr?.on('data', verbose ? (data: Buffer) => {253const lines = data.toString().split('\n').filter(l => l.trim());254for (const line of lines) {255console.error(` [W${workerIdx}] ${line}`);256}257} : () => { });258259child.on('exit', (code) => {260if (code === 0) {261console.log(` Worker ${workerIdx + 1}/${numWorkers} completed (${chunk.length} rows)`);262resolve();263} else {264reject(new Error(`Worker ${workerIdx} exited with code ${code}`));265}266});267child.on('error', reject);268}));269}270271const startTime = Date.now();272await Promise.all(workerPromises);273const elapsed = formatElapsed(startTime);274console.log(`\n All ${numWorkers} workers completed in ${elapsed}`);275276// Merge results277const allSamples: ISample[] = [];278for (const resultPath of resultPaths) {279try {280const content = await fs.promises.readFile(resultPath, 'utf8');281const samples = JSON.parse(content) as ISample[];282allSamples.push(...samples);283} catch {284console.error(` Warning: could not read result file ${resultPath}`);285}286}287288const outputPath = resolveOutputPath(inputPath, nesDatagenOpts.output);289const writeResult = await writeSamples(outputPath, allSamples);290console.log(` Output: ${writeResult.written} samples → ${writeResult.outputPath} (${elapsed})`);291} finally {292await fs.promises.rm(tmpDir, { recursive: true, force: true });293}294}295296297