Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/extensions/copilot/src/extension/chronicle/vscode-node/sessionStoreTracker.ts
13399 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
import * as vscode from 'vscode';
7
import { IChatSessionService } from '../../../platform/chat/common/chatSessionService';
8
import { ConfigKey, IConfigurationService } from '../../../platform/configuration/common/configurationService';
9
import { type FileRow, type RefRow, type SessionRow, type TurnRow, ISessionStore } from '../../../platform/chronicle/common/sessionStore';
10
import { IExperimentationService } from '../../../platform/telemetry/common/nullExperimentationService';
11
import { CopilotChatAttr, GenAiAttr, GenAiOperationName } from '../../../platform/otel/common/genAiAttributes';
12
import { type ICompletedSpanData, IOTelService } from '../../../platform/otel/common/otelService';
13
import { Disposable, DisposableStore } from '../../../util/vs/base/common/lifecycle';
14
import { autorun } from '../../../util/vs/base/common/observableInternal';
15
import { ITelemetryService } from '../../../platform/telemetry/common/telemetry';
16
import { IExtensionContribution } from '../../common/contributions';
17
import {
18
MAX_ASSISTANT_RESPONSE_LENGTH,
19
MAX_SUMMARY_LENGTH,
20
extractAssistantResponse,
21
extractFilePath,
22
extractRefsFromMcpTool,
23
extractRefsFromTerminal,
24
extractRepoFromMcpTool,
25
extractToolArgs,
26
isGitHubMcpTool,
27
isTerminalTool,
28
truncateForStore,
29
} from '../common/sessionStoreTracking';
30
31
/** How often to flush buffered writes to SQLite (ms). */
32
const FLUSH_INTERVAL_MS = 3_000;
33
34
/** Minimum interval between session upserts for the same session (ms). */
35
const SESSION_UPSERT_COOLDOWN_MS = 30_000;
36
37
/**
38
* Buffered write operations waiting to be flushed to SQLite.
39
*/
40
interface WriteBuffer {
41
/** Session upserts keyed by session ID — later writes merge into earlier ones. */
42
sessions: Map<string, SessionRow>;
43
files: FileRow[];
44
refs: RefRow[];
45
turns: TurnRow[];
46
}
47
48
/**
49
* Populates the Chronicle session store from VS Code session lifecycle events.
50
*
51
* Optimizations:
52
* 1. **Write batching**: All writes are buffered and flushed every 3s in a single transaction.
53
* 2. **Deferred processing**: Span handling is deferred via queueMicrotask to avoid blocking.
54
* 3. **Duplicate suppression**: Session upserts with no new data are skipped via cooldown cache.
55
*/
56
export class SessionStoreTracker extends Disposable implements IExtensionContribution {
57
58
/** Track which sessions have been initialized in the store. */
59
private readonly _initializedSessions = new Set<string>();
60
61
/** Pending writes waiting for the next flush. */
62
private readonly _buffer: WriteBuffer = {
63
sessions: new Map(),
64
files: [],
65
refs: [],
66
turns: [],
67
};
68
69
/** Flush timer handle. */
70
private _flushTimer: ReturnType<typeof setInterval> | undefined;
71
72
/** Last time each session had a timestamp-only upsert flushed (ms since epoch). */
73
private readonly _lastSessionTimestamp = new Map<string, number>();
74
75
/** Per-session turn counter to avoid collisions between buffered writes and DB state. */
76
private readonly _turnCounters = new Map<string, number>();
77
78
/** Tool spans received before session was initialized, keyed by session ID. */
79
private readonly _pendingToolSpans = new Map<string, ICompletedSpanData[]>();
80
81
constructor(
82
@ISessionStore private readonly _sessionStore: ISessionStore,
83
@IOTelService private readonly _otelService: IOTelService,
84
@IChatSessionService private readonly _chatSessionService: IChatSessionService,
85
@IConfigurationService private readonly _configService: IConfigurationService,
86
@IExperimentationService private readonly _expService: IExperimentationService,
87
@ITelemetryService private readonly _telemetryService: ITelemetryService,
88
) {
89
super();
90
91
// Only set up span listener and flush timer when the feature is enabled.
92
// Uses autorun to react if the setting changes at runtime.
93
const featureEnabled = this._configService.getExperimentBasedConfigObservable(ConfigKey.LocalIndexEnabled, this._expService);
94
const spanListenerStore = this._register(new DisposableStore());
95
this._register(autorun(reader => {
96
spanListenerStore.clear();
97
if (!featureEnabled.read(reader)) {
98
return;
99
}
100
101
// Warm up the DB eagerly so schema issues surface early
102
try {
103
this._sessionStore.getStats();
104
} catch (err) {
105
/* __GDPR__
106
"chronicle.localStore" : {
107
"owner": "vijayu",
108
"comment": "Tracks local session store operations (init, write, flush errors)",
109
"operation": { "classification": "SystemMetaData", "purpose": "PerformanceAndHealth", "comment": "The operation performed." },
110
"sessionSource": { "classification": "SystemMetaData", "purpose": "PerformanceAndHealth", "comment": "The agent name/source for the session, or unknown if unavailable." },
111
"success": { "classification": "SystemMetaData", "purpose": "PerformanceAndHealth", "comment": "Whether the operation succeeded." },
112
"error": { "classification": "CallstackOrException", "purpose": "PerformanceAndHealth", "comment": "Truncated error message if failed." },
113
"opsCount": { "classification": "SystemMetaData", "purpose": "PerformanceAndHealth", "isMeasurement": true, "comment": "Number of buffered operations in a failed flush." },
114
"filesCount": { "classification": "SystemMetaData", "purpose": "PerformanceAndHealth", "isMeasurement": true, "comment": "Number of files tracked in first write." },
115
"refsCount": { "classification": "SystemMetaData", "purpose": "PerformanceAndHealth", "isMeasurement": true, "comment": "Number of refs tracked in first write." },
116
"pendingSpansProcessed": { "classification": "SystemMetaData", "purpose": "PerformanceAndHealth", "isMeasurement": true, "comment": "Number of pending tool spans processed on session init." }
117
}
118
*/
119
this._telemetryService.sendMSFTTelemetryErrorEvent('chronicle.localStore', {
120
operation: 'dbInit',
121
success: 'false',
122
error: err instanceof Error ? err.message.substring(0, 100) : 'unknown',
123
}, {});
124
}
125
126
// Start periodic flush
127
this._flushTimer = setInterval(() => this._flush(), FLUSH_INTERVAL_MS);
128
spanListenerStore.add({ dispose: () => { if (this._flushTimer) { clearInterval(this._flushTimer); this._flushTimer = undefined; } } });
129
130
// Listen to completed OTel spans for tool calls and session activity
131
spanListenerStore.add(this._otelService.onDidCompleteSpan(span => {
132
queueMicrotask(() => this._handleSpan(span));
133
}));
134
135
// Flush and clean up on session disposal
136
spanListenerStore.add(this._chatSessionService.onDidDisposeChatSession(sessionId => {
137
this._initializedSessions.delete(sessionId);
138
this._lastSessionTimestamp.delete(sessionId);
139
this._turnCounters.delete(sessionId);
140
this._pendingToolSpans.delete(sessionId);
141
}));
142
}));
143
}
144
145
override dispose(): void {
146
// Flush any remaining buffered writes before shutdown
147
if (this._flushTimer !== undefined) {
148
clearInterval(this._flushTimer);
149
this._flushTimer = undefined;
150
}
151
this._flush();
152
super.dispose();
153
}
154
155
// ── Span handling (produces buffered writes, no direct DB calls) ─────
156
157
private _handleSpan(span: ICompletedSpanData): void {
158
try {
159
const sessionId = this._getSessionId(span);
160
const operationName = span.attributes[GenAiAttr.OPERATION_NAME] as string | undefined;
161
if (!sessionId) {
162
return;
163
}
164
165
// Only track sessions that have an invoke_agent span (real user interactions).
166
// Skip internal LLM calls (title generation, progress messages, etc.)
167
if (!this._initializedSessions.has(sessionId)) {
168
// Queue tool spans to process after session initialization
169
// (tool spans complete before their parent invoke_agent span)
170
if (operationName === GenAiOperationName.EXECUTE_TOOL) {
171
let pending = this._pendingToolSpans.get(sessionId);
172
if (!pending) {
173
pending = [];
174
this._pendingToolSpans.set(sessionId, pending);
175
}
176
pending.push(span);
177
return;
178
}
179
if (operationName !== GenAiOperationName.INVOKE_AGENT) {
180
return;
181
}
182
this._initSession(sessionId, span);
183
}
184
185
// Extract metadata from any span that carries workspace/user info
186
this._backfillFromSpanAttributes(sessionId, span);
187
188
// Track turns from invoke_agent spans
189
if (operationName === GenAiOperationName.INVOKE_AGENT) {
190
this._handleAgentSpan(sessionId, span);
191
}
192
193
// Track tool executions
194
if (operationName === GenAiOperationName.EXECUTE_TOOL) {
195
this._handleToolSpan(sessionId, span);
196
}
197
198
// Lightweight timestamp bump — throttled by cooldown
199
this._bufferSessionTimestamp(sessionId);
200
} catch {
201
// Non-fatal — individual span processing failure
202
}
203
}
204
205
private _getSessionId(span: ICompletedSpanData): string | undefined {
206
return (span.attributes[CopilotChatAttr.CHAT_SESSION_ID] as string | undefined)
207
?? (span.attributes[GenAiAttr.CONVERSATION_ID] as string | undefined)
208
?? (span.attributes[CopilotChatAttr.SESSION_ID] as string | undefined);
209
}
210
211
private _initSession(sessionId: string, span: ICompletedSpanData): void {
212
this._initializedSessions.add(sessionId);
213
214
const sessionSource = (span.attributes[GenAiAttr.AGENT_NAME] as string | undefined) ?? 'unknown';
215
const cwd = vscode.workspace.workspaceFolders?.[0]?.uri.fsPath;
216
this._bufferSessionUpsert({ id: sessionId, host_type: 'vscode', agent_name: sessionSource, ...(cwd ? { cwd } : {}) });
217
218
// Track the source of the very first session for firstWrite telemetry
219
if (!this._firstWriteSessionSource) {
220
this._firstWriteSessionSource = sessionSource;
221
}
222
223
// Process any tool spans that arrived before session was initialized
224
const pendingSpans = this._pendingToolSpans.get(sessionId);
225
const pendingCount = pendingSpans?.length ?? 0;
226
if (pendingSpans) {
227
this._pendingToolSpans.delete(sessionId);
228
for (const toolSpan of pendingSpans) {
229
this._handleToolSpan(sessionId, toolSpan);
230
}
231
}
232
233
this._telemetryService.sendMSFTTelemetryEvent('chronicle.localStore', {
234
operation: 'sessionInit',
235
sessionSource,
236
}, {
237
pendingSpansProcessed: pendingCount,
238
});
239
}
240
241
private _backfillFromSpanAttributes(sessionId: string, span: ICompletedSpanData): void {
242
const branch = span.attributes[CopilotChatAttr.REPO_HEAD_BRANCH_NAME] as string | undefined;
243
const remoteUrl = span.attributes[CopilotChatAttr.REPO_REMOTE_URL] as string | undefined;
244
const userRequest = span.attributes[CopilotChatAttr.USER_REQUEST] as string | undefined;
245
246
if (branch || remoteUrl || userRequest) {
247
const summary = truncateForStore(userRequest, MAX_SUMMARY_LENGTH);
248
249
this._bufferSessionUpsert({
250
id: sessionId,
251
...(branch ? { branch } : {}),
252
...(remoteUrl ? { repository: remoteUrl } : {}),
253
...(summary ? { summary } : {}),
254
});
255
}
256
}
257
258
private _handleToolSpan(sessionId: string, span: ICompletedSpanData): void {
259
const toolName = span.attributes[GenAiAttr.TOOL_NAME] as string | undefined;
260
if (!toolName) {
261
return;
262
}
263
264
const turnIndex = span.attributes[CopilotChatAttr.TURN_INDEX] as number | undefined;
265
const toolArgs = extractToolArgs(span);
266
267
// Extract file path
268
const filePath = extractFilePath(toolName, toolArgs);
269
if (filePath) {
270
this._buffer.files.push({
271
session_id: sessionId,
272
file_path: filePath,
273
tool_name: toolName,
274
turn_index: turnIndex,
275
});
276
}
277
278
// Track refs from GitHub MCP server tools
279
if (isGitHubMcpTool(toolName)) {
280
const refs = extractRefsFromMcpTool(toolName, toolArgs);
281
for (const ref of refs) {
282
this._buffer.refs.push({ session_id: sessionId, ...ref, turn_index: turnIndex });
283
}
284
285
const repo = extractRepoFromMcpTool(toolArgs);
286
if (repo) {
287
this._bufferSessionUpsert({ id: sessionId, repository: repo });
288
}
289
}
290
291
// Track refs from terminal/shell tool
292
if (isTerminalTool(toolName)) {
293
const resultText = span.attributes['gen_ai.tool.result'] as string | undefined;
294
const refs = extractRefsFromTerminal(toolArgs, resultText);
295
for (const ref of refs) {
296
this._buffer.refs.push({ session_id: sessionId, ...ref, turn_index: turnIndex });
297
}
298
}
299
}
300
301
private _handleAgentSpan(sessionId: string, span: ICompletedSpanData): void {
302
const userRequest = span.attributes[CopilotChatAttr.USER_REQUEST] as string | undefined;
303
304
// Extract user messages from span events
305
const userMessages: { turnIndex: number; content: string }[] = [];
306
let turnCounter = 0;
307
308
for (const event of span.events) {
309
if (event.name === 'user_message') {
310
const content = event.attributes?.['content'] as string | undefined;
311
if (content) {
312
userMessages.push({ turnIndex: turnCounter, content });
313
}
314
turnCounter++;
315
}
316
}
317
318
if (userMessages.length === 0 && userRequest) {
319
userMessages.push({ turnIndex: 0, content: userRequest });
320
}
321
322
// Use the first user message as the session summary if one hasn't been set yet
323
const existingSession = this._buffer.sessions.get(sessionId);
324
if (!existingSession?.summary) {
325
const firstMessage = userMessages[0]?.content ?? userRequest;
326
const summary = truncateForStore(firstMessage, MAX_SUMMARY_LENGTH);
327
if (summary) {
328
this._bufferSessionUpsert({ id: sessionId, summary });
329
}
330
}
331
332
// Extract assistant response from OUTPUT_MESSAGES attribute, truncated for storage
333
const fullResponse = extractAssistantResponse(span.attributes[GenAiAttr.OUTPUT_MESSAGES] as string | undefined);
334
const assistantResponse = truncateForStore(fullResponse, MAX_ASSISTANT_RESPONSE_LENGTH);
335
336
// Use in-memory turn counter to avoid collisions with buffered-but-unflushed turns.
337
// Initialize from DB on first use, then increment in memory.
338
if (!this._turnCounters.has(sessionId)) {
339
this._turnCounters.set(sessionId, this._sessionStore.getMaxTurnIndex(sessionId) + 1);
340
}
341
for (let i = 0; i < userMessages.length; i++) {
342
const msg = userMessages[i];
343
const absoluteTurnIndex = this._turnCounters.get(sessionId)!;
344
this._turnCounters.set(sessionId, absoluteTurnIndex + 1);
345
this._buffer.turns.push({
346
session_id: sessionId,
347
turn_index: absoluteTurnIndex,
348
user_message: msg.content,
349
// Attach assistant response to the last turn (the final model output)
350
...(i === userMessages.length - 1 && assistantResponse
351
? { assistant_response: assistantResponse }
352
: {}),
353
});
354
}
355
}
356
357
// ── Buffering helpers ────────────────────────────────────────────────
358
359
/**
360
* Merge a session upsert into the buffer. Later writes overwrite earlier
361
* ones for the same field, but null/undefined fields don't overwrite.
362
*/
363
private _bufferSessionUpsert(session: SessionRow): void {
364
const existing = this._buffer.sessions.get(session.id);
365
if (existing) {
366
// Merge: keep existing values unless new value is non-null
367
this._buffer.sessions.set(session.id, {
368
...existing,
369
...(session.cwd ? { cwd: session.cwd } : {}),
370
...(session.repository ? { repository: session.repository } : {}),
371
...(session.host_type ? { host_type: session.host_type } : {}),
372
...(session.branch ? { branch: session.branch } : {}),
373
...(session.summary ? { summary: session.summary } : {}),
374
});
375
} else {
376
this._buffer.sessions.set(session.id, { ...session });
377
}
378
}
379
380
/**
381
* Buffer a timestamp-only upsert, but skip if we recently flushed one
382
* for this session (cooldown-based dedup).
383
*/
384
private _bufferSessionTimestamp(sessionId: string): void {
385
const now = Date.now();
386
const last = this._lastSessionTimestamp.get(sessionId) ?? 0;
387
if (now - last < SESSION_UPSERT_COOLDOWN_MS) {
388
return; // Skip — too recent
389
}
390
this._lastSessionTimestamp.set(sessionId, now);
391
this._bufferSessionUpsert({ id: sessionId, host_type: 'vscode' });
392
}
393
394
/** Whether we've already sent a successful-write telemetry event. */
395
private _firstWriteLogged = false;
396
397
/** The session source of the first initialized session (for firstWrite telemetry). */
398
private _firstWriteSessionSource: string | undefined;
399
400
// ── Flush: batch all buffered writes into one transaction ────────────
401
402
private _flush(): void {
403
const { sessions, files, refs, turns } = this._buffer;
404
const totalOps = sessions.size + files.length + refs.length + turns.length;
405
if (totalOps === 0) {
406
return;
407
}
408
409
// Swap out the buffer contents so new writes during flush go to fresh arrays
410
const sessionsToFlush = [...sessions.values()];
411
const filesToFlush = [...files];
412
const refsToFlush = [...refs];
413
const turnsToFlush = [...turns];
414
sessions.clear();
415
files.length = 0;
416
refs.length = 0;
417
turns.length = 0;
418
419
try {
420
this._sessionStore.runInTransaction(() => {
421
for (const session of sessionsToFlush) {
422
this._sessionStore.upsertSession(session);
423
}
424
for (const file of filesToFlush) {
425
this._sessionStore.insertFile(file);
426
}
427
for (const ref of refsToFlush) {
428
this._sessionStore.insertRef(ref);
429
}
430
for (const turn of turnsToFlush) {
431
this._sessionStore.insertTurn(turn);
432
}
433
});
434
435
if (!this._firstWriteLogged) {
436
this._firstWriteLogged = true;
437
438
this._telemetryService.sendMSFTTelemetryEvent('chronicle.localStore', {
439
operation: 'firstWrite',
440
sessionSource: this._firstWriteSessionSource ?? 'unknown',
441
}, {
442
filesCount: filesToFlush.length,
443
refsCount: refsToFlush.length,
444
});
445
}
446
} catch (err) {
447
448
this._telemetryService.sendMSFTTelemetryErrorEvent('chronicle.localStore', {
449
operation: 'flush',
450
success: 'false',
451
error: err instanceof Error ? err.message.substring(0, 100) : 'unknown',
452
}, { opsCount: totalOps });
453
}
454
}
455
}
456
457