Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/extensions/copilot/test/pipeline/pipeline.ts
13389 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
import { fork } from 'child_process';
7
import * as fs from 'fs';
8
import * as os from 'os';
9
import * as path from 'path';
10
import { createExtensionUnitTestingServices } from '../../src/extension/test/node/services';
11
import { ConfigKey, IConfigurationService } from '../../src/platform/configuration/common/configurationService';
12
import { ResponseFormat } from '../../src/platform/inlineEdits/common/dataTypes/xtabPromptOptions';
13
import { Limiter } from '../../src/util/vs/base/common/async';
14
import { applyConfigFile, loadConfigFile } from '../base/simulationContext';
15
import { NesDatagen, SimulationOptions } from '../base/simulationOptions';
16
import { assembleSample, ISample, resolveOutputPath, writeSamples } from './output';
17
import { loadAndParseInput } from './parseInput';
18
import { generatePromptFromRecording, IGeneratedPrompt } from './promptStep';
19
import { parseSuggestedEdit, processAllRows } from './replayRecording';
20
import { generateAllResponses, generateResponse, IResponseGenerationInput } from './responseStep';
21
22
function logErrors(errors: readonly { error: string }[], verbose: boolean, log: (...ps: any[]) => void): void {
23
if (errors.length > 0 && verbose) {
24
for (const err of errors) {
25
console.log(` ${err.error}`);
26
}
27
}
28
}
29
30
function formatElapsed(startTime: number): string {
31
const elapsed = ((Date.now() - startTime) / 1000).toFixed(1);
32
return `${elapsed}s`;
33
}
34
35
export type RunPipelineOptions = {
36
readonly nesDatagen: NesDatagen | undefined;
37
/**
38
* path to config file
39
*/
40
readonly configFile: string | undefined;
41
readonly verbose: number | boolean | undefined;
42
readonly parallelism: number;
43
};
44
45
export async function runInputPipeline(opts: RunPipelineOptions, log = console.log.bind(console)): Promise<void> {
46
const nesDatagenOpts = opts.nesDatagen!;
47
const inputPath = nesDatagenOpts.input;
48
if (!opts.configFile) {
49
throw new Error('nes-datagen requires --config-file');
50
}
51
const configs = loadConfigFile(opts.configFile);
52
const verbose = !!opts.verbose;
53
const concurrency = opts.parallelism;
54
const rowOffset = nesDatagenOpts.rowOffset;
55
56
log(`\n=== Pipeline ===`);
57
log(` Input: ${inputPath}`);
58
log(` Concurrency: ${concurrency}`);
59
60
// Step 1: Parse input
61
const { rows, errors } = await loadAndParseInput(inputPath, verbose);
62
log(` [1/5] Input parsed: ${rows.length} rows, ${errors.length} errors`);
63
logErrors(errors, verbose, log);
64
65
// Step 2: Replay recordings
66
const { processed, errors: replayErrors } = processAllRows(rows);
67
log(` [2/5] Recordings replayed: ${processed.length} ok, ${replayErrors.length} errors`);
68
logErrors(replayErrors.map(e => ({
69
error: `[sample ${e.rowIndex + rowOffset}, ${rows[e.rowIndex]?.activeDocumentLanguageId ?? '?'}] ${e.error}`,
70
})), verbose, log);
71
72
// Step 3: Generate prompts
73
const serviceCollection = createExtensionUnitTestingServices();
74
const testAccessor = serviceCollection.createTestingAccessor();
75
76
try {
77
const configService = testAccessor.get(IConfigurationService);
78
79
await applyConfigFile(configService, configs);
80
81
// Disable interactive debounce for batch mode
82
await configService.setConfig(ConfigKey.TeamInternal.InlineEditsDebounce, 0);
83
await configService.setConfig(ConfigKey.TeamInternal.InlineEditsCacheDelay, 0);
84
await configService.setConfig(ConfigKey.TeamInternal.InlineEditsExtraDebounceEndOfLine, 0);
85
await configService.setConfig(ConfigKey.TeamInternal.InlineEditsExtraDebounceInlineSuggestion, 0);
86
87
const modelConfig = configService.getConfig(ConfigKey.Advanced.InlineEditsXtabProviderModelConfiguration);
88
const responseFormat = ResponseFormat.fromPromptingStrategy(modelConfig?.promptingStrategy);
89
90
log(` Local model configuration: ${JSON.stringify(modelConfig)}`);
91
92
const prompts: { originalRowIndex: number; prompt: IGeneratedPrompt }[] = [];
93
const promptErrors: { originalRowIndex: number; error: string }[] = [];
94
let promptsCompleted = 0;
95
const promptStartTime = Date.now();
96
97
const limiter = new Limiter<void>(concurrency);
98
await Promise.all(processed.map(p =>
99
limiter.queue(async () => {
100
const globalIdx = p.originalRowIndex + rowOffset;
101
const result = await generatePromptFromRecording(testAccessor, p.recordingInfo);
102
if ('error' in result) {
103
promptErrors.push({ originalRowIndex: p.originalRowIndex, error: `[sample ${globalIdx}, ${p.row.activeDocumentLanguageId}, ${p.activeFilePath}] ${result.error}` });
104
} else {
105
prompts.push({ originalRowIndex: p.originalRowIndex, prompt: result });
106
}
107
promptsCompleted++;
108
if (verbose && (promptsCompleted % 50 === 0 || promptsCompleted === processed.length)) {
109
console.log(` Progress: ${promptsCompleted}/${processed.length} (${formatElapsed(promptStartTime)})`);
110
}
111
})
112
));
113
114
log(` [3/5] Prompts generated: ${prompts.length} ok, ${promptErrors.length} errors (${formatElapsed(promptStartTime)})`);
115
logErrors(promptErrors, verbose, log);
116
117
// Step 4: Generate responses
118
const processedByOriginalIndex = new Map(processed.map(p => [p.originalRowIndex, p]));
119
const responseInputs: IResponseGenerationInput[] = [];
120
121
for (const { originalRowIndex, prompt } of prompts) {
122
const p = processedByOriginalIndex.get(originalRowIndex);
123
if (!p) {
124
continue;
125
}
126
responseInputs.push({
127
index: originalRowIndex,
128
oracleEdits: p.nextUserEdit?.edit,
129
docContent: p.activeDocument.value.get().value,
130
filePath: p.activeFilePath,
131
userPrompt: prompt.user,
132
});
133
}
134
135
const { responses, errors: responseErrors } = generateAllResponses(responseFormat, responseInputs);
136
log(` [4/5] Responses generated: ${responses.length} ok, ${responseErrors.length} errors`);
137
logErrors(responseErrors.map(e => {
138
const p = processedByOriginalIndex.get(e.index);
139
return { error: `[sample ${e.index + rowOffset}, ${p?.row.activeDocumentLanguageId ?? '?'}] ${e.error}` };
140
}), verbose, log);
141
142
// Step 5: Write output
143
const responseByIndex = new Map(responses.map(r => [r.index, r.response]));
144
const outputPath = resolveOutputPath(inputPath, nesDatagenOpts.output);
145
const samples: ISample[] = [];
146
147
for (const { originalRowIndex: index, prompt } of prompts) {
148
const response = responseByIndex.get(index);
149
if (!response) {
150
continue;
151
}
152
const p = processedByOriginalIndex.get(index);
153
if (!p) {
154
continue;
155
}
156
const suggestedEdit = parseSuggestedEdit(p.row.postProcessingOutcome.suggestedEdit);
157
const modelEdits = suggestedEdit ? [suggestedEdit] as const : undefined;
158
const modelResult = generateResponse(responseFormat, modelEdits, p.activeDocument.value.get().value, p.activeFilePath, prompt.user);
159
const formattedModelResponse = 'error' in modelResult ? '' : modelResult.assistant;
160
samples.push(assembleSample(index + rowOffset, prompt, response, p, responseFormat, formattedModelResponse));
161
}
162
163
const writeResult = await writeSamples(outputPath, samples);
164
log(` [5/5] Output written: ${writeResult.written} samples → ${writeResult.outputPath}`);
165
if (writeResult.skipped > 0) {
166
log(` Structural validation dropped ${writeResult.skipped} samples`);
167
if (verbose) {
168
const grouped = new Map<string, number>();
169
for (const s of writeResult.skipReasons) {
170
grouped.set(s.reason, (grouped.get(s.reason) ?? 0) + 1);
171
}
172
for (const [reason, count] of grouped) {
173
log(` ${reason} (×${count})`);
174
}
175
}
176
}
177
178
// Summary
179
log(`\n Pipeline: Input(${rows.length}) → Replay(${processed.length}) → Prompt(${prompts.length}) → Response(${responses.length}) → Output(${writeResult.written})`);
180
} finally {
181
for (const p of processed) {
182
p.replayer.dispose();
183
}
184
testAccessor.dispose();
185
}
186
}
187
188
/**
189
* Run the pipeline in parallel by splitting input across N child processes.
190
* Each child runs the single-process pipeline on its chunk independently.
191
*/
192
export async function runInputPipelineParallel(opts: SimulationOptions): Promise<void> {
193
const nesDatagenOpts = opts.nesDatagen!;
194
const inputPath = nesDatagenOpts.input;
195
const verbose = !!opts.verbose;
196
197
const contents = await fs.promises.readFile(inputPath, 'utf8');
198
const records = JSON.parse(contents) as unknown[];
199
const numWorkers = Math.max(1, Math.min(os.cpus().length, opts.parallelism, Math.ceil(records.length / 25)));
200
201
console.log(`\n=== Pipeline (parallel: ${numWorkers} workers) ===`);
202
console.log(` Input: ${inputPath} (${records.length} rows)\n`);
203
204
if (records.length === 0) {
205
console.log(` No records to process.`);
206
return;
207
}
208
209
const chunkSize = Math.ceil(records.length / numWorkers);
210
const tmpDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), 'nes-pipeline-'));
211
212
try {
213
const workerPromises: Promise<void>[] = [];
214
const resultPaths: string[] = [];
215
216
for (let w = 0; w < numWorkers; w++) {
217
const start = w * chunkSize;
218
const chunk = records.slice(start, start + chunkSize);
219
if (chunk.length === 0) {
220
continue;
221
}
222
223
const chunkPath = path.join(tmpDir, `chunk_${w}.json`);
224
const resultPath = path.join(tmpDir, `result_${w}.json`);
225
resultPaths.push(resultPath);
226
227
await fs.promises.writeFile(chunkPath, JSON.stringify(chunk));
228
229
const args = [
230
'nes-datagen',
231
'--input', chunkPath,
232
'--config-file', opts.configFile!,
233
'--out', resultPath,
234
'--row-offset', String(start),
235
'--parallelism', String(opts.parallelism),
236
'--worker',
237
];
238
if (verbose) {
239
args.push('--verbose');
240
}
241
242
const workerIdx = w;
243
workerPromises.push(new Promise<void>((resolve, reject) => {
244
const child = fork(process.argv[1], args, { stdio: 'pipe' });
245
246
// Always drain child output to prevent pipe buffer deadlocks
247
child.stdout?.on('data', verbose ? (data: Buffer) => {
248
const lines = data.toString().split('\n').filter(l => l.trim());
249
for (const line of lines) {
250
console.log(` [W${workerIdx}] ${line}`);
251
}
252
} : () => { });
253
child.stderr?.on('data', verbose ? (data: Buffer) => {
254
const lines = data.toString().split('\n').filter(l => l.trim());
255
for (const line of lines) {
256
console.error(` [W${workerIdx}] ${line}`);
257
}
258
} : () => { });
259
260
child.on('exit', (code) => {
261
if (code === 0) {
262
console.log(` Worker ${workerIdx + 1}/${numWorkers} completed (${chunk.length} rows)`);
263
resolve();
264
} else {
265
reject(new Error(`Worker ${workerIdx} exited with code ${code}`));
266
}
267
});
268
child.on('error', reject);
269
}));
270
}
271
272
const startTime = Date.now();
273
await Promise.all(workerPromises);
274
const elapsed = formatElapsed(startTime);
275
console.log(`\n All ${numWorkers} workers completed in ${elapsed}`);
276
277
// Merge results
278
const allSamples: ISample[] = [];
279
for (const resultPath of resultPaths) {
280
try {
281
const content = await fs.promises.readFile(resultPath, 'utf8');
282
const samples = JSON.parse(content) as ISample[];
283
allSamples.push(...samples);
284
} catch {
285
console.error(` Warning: could not read result file ${resultPath}`);
286
}
287
}
288
289
const outputPath = resolveOutputPath(inputPath, nesDatagenOpts.output);
290
const writeResult = await writeSamples(outputPath, allSamples);
291
console.log(` Output: ${writeResult.written} samples → ${writeResult.outputPath} (${elapsed})`);
292
} finally {
293
await fs.promises.rm(tmpDir, { recursive: true, force: true });
294
}
295
}
296
297