Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/extensions/copilot/src/extension/chronicle/vscode-node/remoteSessionExporter.ts
13399 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
import { IAuthenticationService } from '../../../platform/authentication/common/authentication';
7
import { ICopilotTokenManager } from '../../../platform/authentication/common/copilotTokenManager';
8
import { IChatSessionService } from '../../../platform/chat/common/chatSessionService';
9
import { ConfigKey, IConfigurationService } from '../../../platform/configuration/common/configurationService';
10
import { CopilotChatAttr, GenAiAttr, GenAiOperationName } from '../../../platform/otel/common/genAiAttributes';
11
import { IExperimentationService } from '../../../platform/telemetry/common/nullExperimentationService';
12
import { type ICompletedSpanData, IOTelService } from '../../../platform/otel/common/otelService';
13
import { getGitHubRepoInfoFromContext, IGitService } from '../../../platform/git/common/gitService';
14
import { IGithubRepositoryService } from '../../../platform/github/common/githubService';
15
import { Disposable, DisposableStore } from '../../../util/vs/base/common/lifecycle';
16
import { autorun } from '../../../util/vs/base/common/observableInternal';
17
import { IExtensionContribution } from '../../common/contributions';
18
import { CircuitBreaker } from '../common/circuitBreaker';
19
import {
20
createSessionTranslationState,
21
makeShutdownEvent,
22
translateSpan,
23
type SessionTranslationState,
24
} from '../common/eventTranslator';
25
import type { GitHubRepository, CloudSessionIds, SessionEvent, WorkingDirectoryContext } from '../common/cloudSessionTypes';
26
import { filterSecretsFromObj, addSecretValues } from '../common/secretFilter';
27
import { SessionIndexingPreference, type SessionIndexingLevel } from '../common/sessionIndexingPreference';
28
import { IFetcherService } from '../../../platform/networking/common/fetcherService';
29
import { ITelemetryService } from '../../../platform/telemetry/common/telemetry';
30
import { CloudSessionApiClient } from '../node/cloudSessionApiClient';
31
32
// ── Configuration ───────────────────────────────────────────────────────────────
33
34
/** How often to flush buffered events to the cloud (ms). */
35
const BATCH_INTERVAL_MS = 500;
36
37
/** Faster drain interval when buffer is above soft cap. */
38
const FAST_BATCH_INTERVAL_MS = 200;
39
40
/** Max events per flush request. */
41
const MAX_EVENTS_PER_FLUSH = 500;
42
43
/** Hard cap on buffered events (drop oldest beyond this). */
44
const MAX_BUFFER_SIZE = 1_000;
45
46
/** Soft cap — switch to faster drain. */
47
const SOFT_BUFFER_CAP = 500;
48
49
/**
50
* Exports VS Code chat session events to the cloud in real-time.
51
*
52
* - Listens to OTel spans, translates to cloud SessionEvent format
53
* - Buffers events and flushes in batches every 500ms
54
* - Circuit breaker prevents cascading failures when the cloud is unavailable
55
* - Lazy initialization: no work until the first real chat interaction
56
*
57
* All cloud operations are fire-and-forget — never blocks or slows the chat session.
58
*/
59
export class RemoteSessionExporter extends Disposable implements IExtensionContribution {
60
61
// ── Per-session state ────────────────────────────────────────────────────────
62
63
/** Per-session cloud IDs (created lazily on first interaction). */
64
private readonly _cloudSessions = new Map<string, CloudSessionIds>();
65
66
/** Per-session translation state (parentId chaining, session.start tracking). */
67
private readonly _translationStates = new Map<string, SessionTranslationState>();
68
69
/** Sessions that failed cloud initialization — don't retry. */
70
private readonly _disabledSessions = new Set<string>();
71
72
/** Sessions currently initializing (prevent concurrent init). */
73
private readonly _initializingSessions = new Set<string>();
74
75
// ── Shared state ─────────────────────────────────────────────────────────────
76
77
/** Buffered events tagged with their chat session ID for correct routing. */
78
private readonly _eventBuffer: Array<{ chatSessionId: string; event: SessionEvent }> = [];
79
private readonly _cloudClient: CloudSessionApiClient;
80
private readonly _circuitBreaker: CircuitBreaker;
81
82
private _flushTimer: ReturnType<typeof setInterval> | undefined;
83
private _isFlushing = false;
84
private _firstCloudWriteLogged = false;
85
86
/** The session source of the first initialized session (for firstWrite telemetry). */
87
private _firstCloudWriteSessionSource: string | undefined;
88
89
/** Resolved lazily on first use. */
90
private _repository: GitHubRepository | undefined;
91
private _repositoryResolved = false;
92
93
/** User's session indexing preference (resolved once per repo). */
94
private readonly _indexingPreference: SessionIndexingPreference;
95
96
constructor(
97
@IOTelService private readonly _otelService: IOTelService,
98
@IChatSessionService private readonly _chatSessionService: IChatSessionService,
99
@ICopilotTokenManager private readonly _tokenManager: ICopilotTokenManager,
100
@IAuthenticationService private readonly _authService: IAuthenticationService,
101
@IGitService private readonly _gitService: IGitService,
102
@IGithubRepositoryService private readonly _githubRepoService: IGithubRepositoryService,
103
@IConfigurationService private readonly _configService: IConfigurationService,
104
@IExperimentationService private readonly _expService: IExperimentationService,
105
@ITelemetryService private readonly _telemetryService: ITelemetryService,
106
@IFetcherService private readonly _fetcherService: IFetcherService,
107
) {
108
super();
109
110
this._indexingPreference = new SessionIndexingPreference(this._configService);
111
this._cloudClient = new CloudSessionApiClient(this._tokenManager, this._authService, this._fetcherService);
112
this._circuitBreaker = new CircuitBreaker({
113
failureThreshold: 5,
114
resetTimeoutMs: 1_000,
115
maxResetTimeoutMs: 30_000,
116
});
117
118
// Register known auth tokens as dynamic secrets for filtering
119
this._registerAuthSecrets();
120
121
// Only set up span listener when both local index and cloud sync are enabled.
122
// Uses autorun to react if settings change at runtime.
123
const localEnabled = this._configService.getExperimentBasedConfigObservable(ConfigKey.LocalIndexEnabled, this._expService);
124
const cloudEnabled = this._configService.getConfigObservable(ConfigKey.TeamInternal.SessionSearchCloudSyncEnabled);
125
const spanListenerStore = this._register(new DisposableStore());
126
this._register(autorun(reader => {
127
spanListenerStore.clear();
128
if (!localEnabled.read(reader) || !cloudEnabled.read(reader)) {
129
return;
130
}
131
132
// Listen to completed OTel spans — deferred off the callback
133
spanListenerStore.add(this._otelService.onDidCompleteSpan(span => {
134
queueMicrotask(() => this._handleSpan(span));
135
}));
136
137
// Clean up on session disposal
138
spanListenerStore.add(this._chatSessionService.onDidDisposeChatSession(sessionId => {
139
this._handleSessionDispose(sessionId);
140
}));
141
}));
142
}
143
144
override dispose(): void {
145
if (this._flushTimer !== undefined) {
146
clearInterval(this._flushTimer);
147
this._flushTimer = undefined;
148
}
149
150
// Best-effort final flush with timeout
151
const pending = this._eventBuffer.length;
152
if (pending > 0) {
153
// Fire-and-forget — cannot block dispose
154
this._flushBatch().catch(() => { /* best effort */ });
155
}
156
157
this._cloudSessions.clear();
158
this._translationStates.clear();
159
this._disabledSessions.clear();
160
this._initializingSessions.clear();
161
162
super.dispose();
163
}
164
165
// ── Span handling ────────────────────────────────────────────────────────────
166
167
private _handleSpan(span: ICompletedSpanData): void {
168
try {
169
const sessionId = this._getSessionId(span);
170
const operationName = span.attributes[GenAiAttr.OPERATION_NAME] as string | undefined;
171
if (!sessionId || this._disabledSessions.has(sessionId)) {
172
return;
173
}
174
175
// Only start tracking on invoke_agent (real user interaction)
176
if (!this._cloudSessions.has(sessionId) && !this._initializingSessions.has(sessionId)) {
177
if (operationName !== GenAiOperationName.INVOKE_AGENT) {
178
return;
179
}
180
// Trigger lazy initialization — don't await, buffer events in the meantime
181
this._initializeSession(sessionId, span);
182
}
183
184
// Translate span to cloud events
185
const state = this._getOrCreateTranslationState(sessionId);
186
const context = this._extractContext(span);
187
const events = translateSpan(span, state, context);
188
189
if (events.length > 0) {
190
this._bufferEvents(sessionId, events);
191
this._ensureFlushTimer();
192
}
193
} catch {
194
// Non-fatal — individual span processing failure
195
}
196
}
197
198
private _getSessionId(span: ICompletedSpanData): string | undefined {
199
return (span.attributes[CopilotChatAttr.CHAT_SESSION_ID] as string | undefined)
200
?? (span.attributes[GenAiAttr.CONVERSATION_ID] as string | undefined)
201
?? (span.attributes[CopilotChatAttr.SESSION_ID] as string | undefined);
202
}
203
204
private _getOrCreateTranslationState(sessionId: string): SessionTranslationState {
205
let state = this._translationStates.get(sessionId);
206
if (!state) {
207
state = createSessionTranslationState();
208
this._translationStates.set(sessionId, state);
209
}
210
return state;
211
}
212
213
private _extractContext(span: ICompletedSpanData): WorkingDirectoryContext | undefined {
214
const branch = span.attributes[CopilotChatAttr.REPO_HEAD_BRANCH_NAME] as string | undefined;
215
const remoteUrl = span.attributes[CopilotChatAttr.REPO_REMOTE_URL] as string | undefined;
216
const commitHash = span.attributes[CopilotChatAttr.REPO_HEAD_COMMIT_HASH] as string | undefined;
217
if (!branch && !remoteUrl) {
218
return undefined;
219
}
220
return {
221
repository: remoteUrl,
222
branch,
223
headCommit: commitHash,
224
};
225
}
226
227
// ── Secret registration ─────────────────────────────────────────────────────
228
229
/**
230
* Register known authentication tokens as dynamic secrets so they are
231
* redacted from any event data sent to the cloud.
232
*/
233
private _registerAuthSecrets(): void {
234
// GitHub OAuth token
235
const githubToken = this._authService.anyGitHubSession?.accessToken;
236
if (githubToken) {
237
addSecretValues(githubToken);
238
}
239
240
// Copilot proxy token (async — register when available)
241
this._tokenManager.getCopilotToken().then(token => {
242
if (token.token) {
243
addSecretValues(token.token);
244
}
245
}).catch(() => { /* non-fatal */ });
246
}
247
248
// ── Lazy session initialization ──────────────────────────────────────────────
249
250
private async _initializeSession(sessionId: string, triggerSpan: ICompletedSpanData): Promise<void> {
251
this._initializingSessions.add(sessionId);
252
253
try {
254
const sessionSource = (triggerSpan.attributes[GenAiAttr.AGENT_NAME] as string | undefined) ?? 'unknown';
255
256
// Track the source of the very first session for firstWrite telemetry
257
if (!this._firstCloudWriteSessionSource) {
258
this._firstCloudWriteSessionSource = sessionSource;
259
}
260
const repo = await this._resolveRepository();
261
if (!repo) {
262
this._disabledSessions.add(sessionId);
263
return;
264
}
265
266
// Only export remotely if the user has cloud consent for this repo
267
const repoNwo = `${repo.owner}/${repo.repo}`;
268
269
if (!this._indexingPreference.hasCloudConsent(repoNwo)) {
270
this._disabledSessions.add(sessionId);
271
return;
272
}
273
await this._createCloudSession(sessionId, repo, this._indexingPreference.getStorageLevel(repoNwo));
274
/* __GDPR__
275
"chronicle.cloudSync" : {
276
"owner": "vijayu",
277
"comment": "Tracks cloud sync operations (session init, creation, flush, errors)",
278
"operation": { "classification": "SystemMetaData", "purpose": "PerformanceAndHealth", "comment": "The operation performed." },
279
"sessionSource": { "classification": "SystemMetaData", "purpose": "PerformanceAndHealth", "comment": "The agent name/source for the session, or unknown if unavailable." },
280
"success": { "classification": "SystemMetaData", "purpose": "PerformanceAndHealth", "comment": "Whether the operation succeeded." },
281
"error": { "classification": "CallstackOrException", "purpose": "PerformanceAndHealth", "comment": "Truncated error message if failed." },
282
"indexingLevel": { "classification": "SystemMetaData", "purpose": "PerformanceAndHealth", "comment": "The indexing level for the session." },
283
"droppedEvents": { "classification": "SystemMetaData", "purpose": "PerformanceAndHealth", "isMeasurement": true, "comment": "Number of events in a failed batch." }
284
}
285
*/
286
this._telemetryService.sendMSFTTelemetryEvent('chronicle.cloudSync', {
287
operation: 'sessionInit',
288
success: 'true',
289
sessionSource,
290
});
291
} catch (err) {
292
293
this._telemetryService.sendMSFTTelemetryErrorEvent('chronicle.cloudSync', {
294
operation: 'sessionInit',
295
success: 'false',
296
error: err instanceof Error ? err.message.substring(0, 100) : 'unknown',
297
}, {});
298
this._disabledSessions.add(sessionId);
299
} finally {
300
this._initializingSessions.delete(sessionId);
301
}
302
}
303
304
/**
305
* Called when the storage level setting changes.
306
* Creates cloud sessions for any pending sessions if cloud sync is now enabled.
307
*/
308
async notifyConsent(level: SessionIndexingLevel): Promise<void> {
309
if (level === 'local') {
310
for (const sessionId of this._translationStates.keys()) {
311
if (!this._cloudSessions.has(sessionId)) {
312
this._disabledSessions.add(sessionId);
313
}
314
}
315
return;
316
}
317
318
const repo = this._repository;
319
if (!repo) {
320
return;
321
}
322
323
for (const sessionId of this._translationStates.keys()) {
324
if (!this._cloudSessions.has(sessionId) && !this._disabledSessions.has(sessionId)) {
325
await this._createCloudSession(sessionId, repo, level);
326
}
327
}
328
}
329
330
private async _createCloudSession(
331
sessionId: string,
332
repo: GitHubRepository,
333
indexingLevel: SessionIndexingLevel,
334
): Promise<void> {
335
const result = await this._cloudClient.createSession(
336
repo.repoIds.ownerId,
337
repo.repoIds.repoId,
338
sessionId,
339
indexingLevel === 'repo_and_user' ? 'repo_and_user' : 'user',
340
);
341
342
if (!result.ok) {
343
344
this._telemetryService.sendMSFTTelemetryErrorEvent('chronicle.cloudSync', {
345
operation: 'createCloudSession',
346
success: 'false',
347
error: result.reason?.substring(0, 100) ?? 'unknown',
348
}, {});
349
this._disabledSessions.add(sessionId);
350
return;
351
}
352
353
if (!result.response.task_id) {
354
this._telemetryService.sendMSFTTelemetryErrorEvent('chronicle.cloudSync', {
355
operation: 'createCloudSession',
356
success: 'false',
357
error: 'missing_task_id',
358
}, {});
359
this._disabledSessions.add(sessionId);
360
return;
361
}
362
363
const cloudIds: CloudSessionIds = {
364
cloudSessionId: result.response.id,
365
cloudTaskId: result.response.task_id,
366
};
367
368
this._cloudSessions.set(sessionId, cloudIds);
369
370
this._telemetryService.sendMSFTTelemetryEvent('chronicle.cloudSync', {
371
operation: 'createCloudSession',
372
success: 'true',
373
indexingLevel,
374
});
375
}
376
377
/**
378
* Resolve the GitHub repository context (cached after first resolution).
379
* Uses the active git repository to get owner/repo names, then resolves
380
* numeric IDs via the GitHub REST API.
381
*/
382
private async _resolveRepository(): Promise<GitHubRepository | undefined> {
383
if (this._repositoryResolved) {
384
return this._repository;
385
}
386
this._repositoryResolved = true;
387
388
try {
389
const repoContext = this._gitService.activeRepository?.get();
390
if (!repoContext) {
391
return undefined;
392
}
393
394
const repoInfo = getGitHubRepoInfoFromContext(repoContext);
395
if (!repoInfo) {
396
return undefined;
397
}
398
399
const { id: repoId } = repoInfo;
400
const apiResponse = await this._githubRepoService.getRepositoryInfo(repoId.org, repoId.repo);
401
402
this._repository = {
403
owner: repoId.org,
404
repo: repoId.repo,
405
repoIds: {
406
ownerId: apiResponse.owner.id,
407
repoId: apiResponse.id,
408
},
409
};
410
return this._repository;
411
} catch (err) {
412
413
this._telemetryService.sendMSFTTelemetryErrorEvent('chronicle.cloudSync', {
414
operation: 'resolveRepository',
415
success: 'false',
416
error: err instanceof Error ? err.message.substring(0, 100) : 'unknown',
417
}, {});
418
return undefined;
419
}
420
}
421
422
// ── Session disposal ─────────────────────────────────────────────────────────
423
424
private _handleSessionDispose(sessionId: string): void {
425
const state = this._translationStates.get(sessionId);
426
if (state && this._cloudSessions.has(sessionId)) {
427
const event = makeShutdownEvent(state);
428
this._bufferEvents(sessionId, [event]);
429
}
430
431
this._cloudSessions.delete(sessionId);
432
this._translationStates.delete(sessionId);
433
this._disabledSessions.delete(sessionId);
434
this._initializingSessions.delete(sessionId);
435
}
436
437
// ── Buffering ────────────────────────────────────────────────────────────────
438
439
private _bufferEvents(chatSessionId: string, events: SessionEvent[]): void {
440
for (const event of events) {
441
this._eventBuffer.push({ chatSessionId, event });
442
}
443
444
// Hard cap — drop oldest events
445
if (this._eventBuffer.length > MAX_BUFFER_SIZE) {
446
const dropped = this._eventBuffer.length - MAX_BUFFER_SIZE;
447
this._eventBuffer.splice(0, dropped);
448
}
449
}
450
451
// ── Flush timer ──────────────────────────────────────────────────────────────
452
453
private _ensureFlushTimer(): void {
454
if (this._flushTimer !== undefined) {
455
return;
456
}
457
458
const interval = this._eventBuffer.length > SOFT_BUFFER_CAP
459
? FAST_BATCH_INTERVAL_MS
460
: BATCH_INTERVAL_MS;
461
462
this._flushTimer = setInterval(() => {
463
this._flushBatch().catch(err => {
464
465
this._telemetryService.sendMSFTTelemetryErrorEvent('chronicle.cloudSync', {
466
operation: 'flush',
467
success: 'false',
468
error: err instanceof Error ? err.message.substring(0, 100) : 'unknown',
469
}, {});
470
});
471
}, interval);
472
}
473
474
private _stopFlushTimer(): void {
475
if (this._flushTimer !== undefined) {
476
clearInterval(this._flushTimer);
477
this._flushTimer = undefined;
478
}
479
}
480
481
// ── Batch flush ──────────────────────────────────────────────────────────────
482
483
private async _flushBatch(): Promise<void> {
484
if (this._isFlushing) {
485
return;
486
}
487
488
if (this._eventBuffer.length === 0) {
489
if (this._cloudSessions.size === 0) {
490
this._stopFlushTimer();
491
}
492
return;
493
}
494
495
if (!this._circuitBreaker.canRequest()) {
496
if (this._eventBuffer.length > MAX_BUFFER_SIZE) {
497
const dropped = this._eventBuffer.length - MAX_BUFFER_SIZE;
498
this._eventBuffer.splice(0, dropped);
499
}
500
return;
501
}
502
503
this._isFlushing = true;
504
const batch = this._eventBuffer.splice(0, MAX_EVENTS_PER_FLUSH);
505
506
try {
507
// Group events by chat session ID for correct cloud session routing
508
const eventsBySession = new Map<string, SessionEvent[]>();
509
const orphanedEntries: typeof batch = [];
510
511
for (const entry of batch) {
512
const cloudIds = this._cloudSessions.get(entry.chatSessionId);
513
if (cloudIds) {
514
const arr = eventsBySession.get(cloudIds.cloudSessionId) ?? [];
515
arr.push(entry.event);
516
eventsBySession.set(cloudIds.cloudSessionId, arr);
517
} else {
518
orphanedEntries.push(entry);
519
}
520
}
521
522
// Re-queue events with no cloud session (session not initialized yet),
523
// but drop events for sessions that have been disabled (init failed).
524
if (orphanedEntries.length > 0) {
525
const requeue = orphanedEntries.filter(e =>
526
!this._disabledSessions.has(e.chatSessionId)
527
&& (this._initializingSessions.has(e.chatSessionId) || this._cloudSessions.has(e.chatSessionId))
528
);
529
if (requeue.length > 0) {
530
this._eventBuffer.unshift(...requeue);
531
}
532
}
533
534
// Submit each session's events to the correct cloud session
535
let allSuccess = true;
536
for (const [cloudSessionId, events] of eventsBySession) {
537
const filteredEvents = events.map(e => filterSecretsFromObj(e));
538
const success = await this._cloudClient.submitSessionEvents(cloudSessionId, filteredEvents);
539
if (!success) {
540
allSuccess = false;
541
}
542
}
543
544
if (allSuccess && eventsBySession.size > 0) {
545
this._circuitBreaker.recordSuccess();
546
547
if (!this._firstCloudWriteLogged) {
548
this._firstCloudWriteLogged = true;
549
550
this._telemetryService.sendMSFTTelemetryEvent('chronicle.cloudSync', {
551
operation: 'firstWrite',
552
sessionSource: this._firstCloudWriteSessionSource ?? 'unknown',
553
}, {});
554
}
555
} else if (!allSuccess) {
556
this._circuitBreaker.recordFailure();
557
}
558
} catch (err) {
559
// Re-queue on unexpected error
560
this._eventBuffer.unshift(...batch);
561
this._circuitBreaker.recordFailure();
562
563
this._telemetryService.sendMSFTTelemetryErrorEvent('chronicle.cloudSync', {
564
operation: 'flushBatch',
565
success: 'false',
566
error: err instanceof Error ? err.message.substring(0, 100) : 'unknown',
567
}, { droppedEvents: batch.length });
568
} finally {
569
this._isFlushing = false;
570
}
571
572
if (this._eventBuffer.length > SOFT_BUFFER_CAP && this._flushTimer !== undefined) {
573
this._stopFlushTimer();
574
this._ensureFlushTimer();
575
}
576
}
577
578
}
579
580