Path: blob/main/extensions/copilot/src/extension/chronicle/vscode-node/sessionStoreTracker.ts
13399 views
/*---------------------------------------------------------------------------------------------1* Copyright (c) Microsoft Corporation. All rights reserved.2* Licensed under the MIT License. See License.txt in the project root for license information.3*--------------------------------------------------------------------------------------------*/45import * as vscode from 'vscode';6import { IChatSessionService } from '../../../platform/chat/common/chatSessionService';7import { ConfigKey, IConfigurationService } from '../../../platform/configuration/common/configurationService';8import { type FileRow, type RefRow, type SessionRow, type TurnRow, ISessionStore } from '../../../platform/chronicle/common/sessionStore';9import { IExperimentationService } from '../../../platform/telemetry/common/nullExperimentationService';10import { CopilotChatAttr, GenAiAttr, GenAiOperationName } from '../../../platform/otel/common/genAiAttributes';11import { type ICompletedSpanData, IOTelService } from '../../../platform/otel/common/otelService';12import { Disposable, DisposableStore } from '../../../util/vs/base/common/lifecycle';13import { autorun } from '../../../util/vs/base/common/observableInternal';14import { ITelemetryService } from '../../../platform/telemetry/common/telemetry';15import { IExtensionContribution } from '../../common/contributions';16import {17MAX_ASSISTANT_RESPONSE_LENGTH,18MAX_SUMMARY_LENGTH,19extractAssistantResponse,20extractFilePath,21extractRefsFromMcpTool,22extractRefsFromTerminal,23extractRepoFromMcpTool,24extractToolArgs,25isGitHubMcpTool,26isTerminalTool,27truncateForStore,28} from '../common/sessionStoreTracking';2930/** How often to flush buffered writes to SQLite (ms). */31const FLUSH_INTERVAL_MS = 3_000;3233/** Minimum interval between session upserts for the same session (ms). */34const SESSION_UPSERT_COOLDOWN_MS = 30_000;3536/**37* Buffered write operations waiting to be flushed to SQLite.38*/39interface WriteBuffer {40/** Session upserts keyed by session ID — later writes merge into earlier ones. */41sessions: Map<string, SessionRow>;42files: FileRow[];43refs: RefRow[];44turns: TurnRow[];45}4647/**48* Populates the Chronicle session store from VS Code session lifecycle events.49*50* Optimizations:51* 1. **Write batching**: All writes are buffered and flushed every 3s in a single transaction.52* 2. **Deferred processing**: Span handling is deferred via queueMicrotask to avoid blocking.53* 3. **Duplicate suppression**: Session upserts with no new data are skipped via cooldown cache.54*/55export class SessionStoreTracker extends Disposable implements IExtensionContribution {5657/** Track which sessions have been initialized in the store. */58private readonly _initializedSessions = new Set<string>();5960/** Pending writes waiting for the next flush. */61private readonly _buffer: WriteBuffer = {62sessions: new Map(),63files: [],64refs: [],65turns: [],66};6768/** Flush timer handle. */69private _flushTimer: ReturnType<typeof setInterval> | undefined;7071/** Last time each session had a timestamp-only upsert flushed (ms since epoch). */72private readonly _lastSessionTimestamp = new Map<string, number>();7374/** Per-session turn counter to avoid collisions between buffered writes and DB state. */75private readonly _turnCounters = new Map<string, number>();7677/** Tool spans received before session was initialized, keyed by session ID. */78private readonly _pendingToolSpans = new Map<string, ICompletedSpanData[]>();7980constructor(81@ISessionStore private readonly _sessionStore: ISessionStore,82@IOTelService private readonly _otelService: IOTelService,83@IChatSessionService private readonly _chatSessionService: IChatSessionService,84@IConfigurationService private readonly _configService: IConfigurationService,85@IExperimentationService private readonly _expService: IExperimentationService,86@ITelemetryService private readonly _telemetryService: ITelemetryService,87) {88super();8990// Only set up span listener and flush timer when the feature is enabled.91// Uses autorun to react if the setting changes at runtime.92const featureEnabled = this._configService.getExperimentBasedConfigObservable(ConfigKey.LocalIndexEnabled, this._expService);93const spanListenerStore = this._register(new DisposableStore());94this._register(autorun(reader => {95spanListenerStore.clear();96if (!featureEnabled.read(reader)) {97return;98}99100// Warm up the DB eagerly so schema issues surface early101try {102this._sessionStore.getStats();103} catch (err) {104/* __GDPR__105"chronicle.localStore" : {106"owner": "vijayu",107"comment": "Tracks local session store operations (init, write, flush errors)",108"operation": { "classification": "SystemMetaData", "purpose": "PerformanceAndHealth", "comment": "The operation performed." },109"sessionSource": { "classification": "SystemMetaData", "purpose": "PerformanceAndHealth", "comment": "The agent name/source for the session, or unknown if unavailable." },110"success": { "classification": "SystemMetaData", "purpose": "PerformanceAndHealth", "comment": "Whether the operation succeeded." },111"error": { "classification": "CallstackOrException", "purpose": "PerformanceAndHealth", "comment": "Truncated error message if failed." },112"opsCount": { "classification": "SystemMetaData", "purpose": "PerformanceAndHealth", "isMeasurement": true, "comment": "Number of buffered operations in a failed flush." },113"filesCount": { "classification": "SystemMetaData", "purpose": "PerformanceAndHealth", "isMeasurement": true, "comment": "Number of files tracked in first write." },114"refsCount": { "classification": "SystemMetaData", "purpose": "PerformanceAndHealth", "isMeasurement": true, "comment": "Number of refs tracked in first write." },115"pendingSpansProcessed": { "classification": "SystemMetaData", "purpose": "PerformanceAndHealth", "isMeasurement": true, "comment": "Number of pending tool spans processed on session init." }116}117*/118this._telemetryService.sendMSFTTelemetryErrorEvent('chronicle.localStore', {119operation: 'dbInit',120success: 'false',121error: err instanceof Error ? err.message.substring(0, 100) : 'unknown',122}, {});123}124125// Start periodic flush126this._flushTimer = setInterval(() => this._flush(), FLUSH_INTERVAL_MS);127spanListenerStore.add({ dispose: () => { if (this._flushTimer) { clearInterval(this._flushTimer); this._flushTimer = undefined; } } });128129// Listen to completed OTel spans for tool calls and session activity130spanListenerStore.add(this._otelService.onDidCompleteSpan(span => {131queueMicrotask(() => this._handleSpan(span));132}));133134// Flush and clean up on session disposal135spanListenerStore.add(this._chatSessionService.onDidDisposeChatSession(sessionId => {136this._initializedSessions.delete(sessionId);137this._lastSessionTimestamp.delete(sessionId);138this._turnCounters.delete(sessionId);139this._pendingToolSpans.delete(sessionId);140}));141}));142}143144override dispose(): void {145// Flush any remaining buffered writes before shutdown146if (this._flushTimer !== undefined) {147clearInterval(this._flushTimer);148this._flushTimer = undefined;149}150this._flush();151super.dispose();152}153154// ── Span handling (produces buffered writes, no direct DB calls) ─────155156private _handleSpan(span: ICompletedSpanData): void {157try {158const sessionId = this._getSessionId(span);159const operationName = span.attributes[GenAiAttr.OPERATION_NAME] as string | undefined;160if (!sessionId) {161return;162}163164// Only track sessions that have an invoke_agent span (real user interactions).165// Skip internal LLM calls (title generation, progress messages, etc.)166if (!this._initializedSessions.has(sessionId)) {167// Queue tool spans to process after session initialization168// (tool spans complete before their parent invoke_agent span)169if (operationName === GenAiOperationName.EXECUTE_TOOL) {170let pending = this._pendingToolSpans.get(sessionId);171if (!pending) {172pending = [];173this._pendingToolSpans.set(sessionId, pending);174}175pending.push(span);176return;177}178if (operationName !== GenAiOperationName.INVOKE_AGENT) {179return;180}181this._initSession(sessionId, span);182}183184// Extract metadata from any span that carries workspace/user info185this._backfillFromSpanAttributes(sessionId, span);186187// Track turns from invoke_agent spans188if (operationName === GenAiOperationName.INVOKE_AGENT) {189this._handleAgentSpan(sessionId, span);190}191192// Track tool executions193if (operationName === GenAiOperationName.EXECUTE_TOOL) {194this._handleToolSpan(sessionId, span);195}196197// Lightweight timestamp bump — throttled by cooldown198this._bufferSessionTimestamp(sessionId);199} catch {200// Non-fatal — individual span processing failure201}202}203204private _getSessionId(span: ICompletedSpanData): string | undefined {205return (span.attributes[CopilotChatAttr.CHAT_SESSION_ID] as string | undefined)206?? (span.attributes[GenAiAttr.CONVERSATION_ID] as string | undefined)207?? (span.attributes[CopilotChatAttr.SESSION_ID] as string | undefined);208}209210private _initSession(sessionId: string, span: ICompletedSpanData): void {211this._initializedSessions.add(sessionId);212213const sessionSource = (span.attributes[GenAiAttr.AGENT_NAME] as string | undefined) ?? 'unknown';214const cwd = vscode.workspace.workspaceFolders?.[0]?.uri.fsPath;215this._bufferSessionUpsert({ id: sessionId, host_type: 'vscode', agent_name: sessionSource, ...(cwd ? { cwd } : {}) });216217// Track the source of the very first session for firstWrite telemetry218if (!this._firstWriteSessionSource) {219this._firstWriteSessionSource = sessionSource;220}221222// Process any tool spans that arrived before session was initialized223const pendingSpans = this._pendingToolSpans.get(sessionId);224const pendingCount = pendingSpans?.length ?? 0;225if (pendingSpans) {226this._pendingToolSpans.delete(sessionId);227for (const toolSpan of pendingSpans) {228this._handleToolSpan(sessionId, toolSpan);229}230}231232this._telemetryService.sendMSFTTelemetryEvent('chronicle.localStore', {233operation: 'sessionInit',234sessionSource,235}, {236pendingSpansProcessed: pendingCount,237});238}239240private _backfillFromSpanAttributes(sessionId: string, span: ICompletedSpanData): void {241const branch = span.attributes[CopilotChatAttr.REPO_HEAD_BRANCH_NAME] as string | undefined;242const remoteUrl = span.attributes[CopilotChatAttr.REPO_REMOTE_URL] as string | undefined;243const userRequest = span.attributes[CopilotChatAttr.USER_REQUEST] as string | undefined;244245if (branch || remoteUrl || userRequest) {246const summary = truncateForStore(userRequest, MAX_SUMMARY_LENGTH);247248this._bufferSessionUpsert({249id: sessionId,250...(branch ? { branch } : {}),251...(remoteUrl ? { repository: remoteUrl } : {}),252...(summary ? { summary } : {}),253});254}255}256257private _handleToolSpan(sessionId: string, span: ICompletedSpanData): void {258const toolName = span.attributes[GenAiAttr.TOOL_NAME] as string | undefined;259if (!toolName) {260return;261}262263const turnIndex = span.attributes[CopilotChatAttr.TURN_INDEX] as number | undefined;264const toolArgs = extractToolArgs(span);265266// Extract file path267const filePath = extractFilePath(toolName, toolArgs);268if (filePath) {269this._buffer.files.push({270session_id: sessionId,271file_path: filePath,272tool_name: toolName,273turn_index: turnIndex,274});275}276277// Track refs from GitHub MCP server tools278if (isGitHubMcpTool(toolName)) {279const refs = extractRefsFromMcpTool(toolName, toolArgs);280for (const ref of refs) {281this._buffer.refs.push({ session_id: sessionId, ...ref, turn_index: turnIndex });282}283284const repo = extractRepoFromMcpTool(toolArgs);285if (repo) {286this._bufferSessionUpsert({ id: sessionId, repository: repo });287}288}289290// Track refs from terminal/shell tool291if (isTerminalTool(toolName)) {292const resultText = span.attributes['gen_ai.tool.result'] as string | undefined;293const refs = extractRefsFromTerminal(toolArgs, resultText);294for (const ref of refs) {295this._buffer.refs.push({ session_id: sessionId, ...ref, turn_index: turnIndex });296}297}298}299300private _handleAgentSpan(sessionId: string, span: ICompletedSpanData): void {301const userRequest = span.attributes[CopilotChatAttr.USER_REQUEST] as string | undefined;302303// Extract user messages from span events304const userMessages: { turnIndex: number; content: string }[] = [];305let turnCounter = 0;306307for (const event of span.events) {308if (event.name === 'user_message') {309const content = event.attributes?.['content'] as string | undefined;310if (content) {311userMessages.push({ turnIndex: turnCounter, content });312}313turnCounter++;314}315}316317if (userMessages.length === 0 && userRequest) {318userMessages.push({ turnIndex: 0, content: userRequest });319}320321// Use the first user message as the session summary if one hasn't been set yet322const existingSession = this._buffer.sessions.get(sessionId);323if (!existingSession?.summary) {324const firstMessage = userMessages[0]?.content ?? userRequest;325const summary = truncateForStore(firstMessage, MAX_SUMMARY_LENGTH);326if (summary) {327this._bufferSessionUpsert({ id: sessionId, summary });328}329}330331// Extract assistant response from OUTPUT_MESSAGES attribute, truncated for storage332const fullResponse = extractAssistantResponse(span.attributes[GenAiAttr.OUTPUT_MESSAGES] as string | undefined);333const assistantResponse = truncateForStore(fullResponse, MAX_ASSISTANT_RESPONSE_LENGTH);334335// Use in-memory turn counter to avoid collisions with buffered-but-unflushed turns.336// Initialize from DB on first use, then increment in memory.337if (!this._turnCounters.has(sessionId)) {338this._turnCounters.set(sessionId, this._sessionStore.getMaxTurnIndex(sessionId) + 1);339}340for (let i = 0; i < userMessages.length; i++) {341const msg = userMessages[i];342const absoluteTurnIndex = this._turnCounters.get(sessionId)!;343this._turnCounters.set(sessionId, absoluteTurnIndex + 1);344this._buffer.turns.push({345session_id: sessionId,346turn_index: absoluteTurnIndex,347user_message: msg.content,348// Attach assistant response to the last turn (the final model output)349...(i === userMessages.length - 1 && assistantResponse350? { assistant_response: assistantResponse }351: {}),352});353}354}355356// ── Buffering helpers ────────────────────────────────────────────────357358/**359* Merge a session upsert into the buffer. Later writes overwrite earlier360* ones for the same field, but null/undefined fields don't overwrite.361*/362private _bufferSessionUpsert(session: SessionRow): void {363const existing = this._buffer.sessions.get(session.id);364if (existing) {365// Merge: keep existing values unless new value is non-null366this._buffer.sessions.set(session.id, {367...existing,368...(session.cwd ? { cwd: session.cwd } : {}),369...(session.repository ? { repository: session.repository } : {}),370...(session.host_type ? { host_type: session.host_type } : {}),371...(session.branch ? { branch: session.branch } : {}),372...(session.summary ? { summary: session.summary } : {}),373});374} else {375this._buffer.sessions.set(session.id, { ...session });376}377}378379/**380* Buffer a timestamp-only upsert, but skip if we recently flushed one381* for this session (cooldown-based dedup).382*/383private _bufferSessionTimestamp(sessionId: string): void {384const now = Date.now();385const last = this._lastSessionTimestamp.get(sessionId) ?? 0;386if (now - last < SESSION_UPSERT_COOLDOWN_MS) {387return; // Skip — too recent388}389this._lastSessionTimestamp.set(sessionId, now);390this._bufferSessionUpsert({ id: sessionId, host_type: 'vscode' });391}392393/** Whether we've already sent a successful-write telemetry event. */394private _firstWriteLogged = false;395396/** The session source of the first initialized session (for firstWrite telemetry). */397private _firstWriteSessionSource: string | undefined;398399// ── Flush: batch all buffered writes into one transaction ────────────400401private _flush(): void {402const { sessions, files, refs, turns } = this._buffer;403const totalOps = sessions.size + files.length + refs.length + turns.length;404if (totalOps === 0) {405return;406}407408// Swap out the buffer contents so new writes during flush go to fresh arrays409const sessionsToFlush = [...sessions.values()];410const filesToFlush = [...files];411const refsToFlush = [...refs];412const turnsToFlush = [...turns];413sessions.clear();414files.length = 0;415refs.length = 0;416turns.length = 0;417418try {419this._sessionStore.runInTransaction(() => {420for (const session of sessionsToFlush) {421this._sessionStore.upsertSession(session);422}423for (const file of filesToFlush) {424this._sessionStore.insertFile(file);425}426for (const ref of refsToFlush) {427this._sessionStore.insertRef(ref);428}429for (const turn of turnsToFlush) {430this._sessionStore.insertTurn(turn);431}432});433434if (!this._firstWriteLogged) {435this._firstWriteLogged = true;436437this._telemetryService.sendMSFTTelemetryEvent('chronicle.localStore', {438operation: 'firstWrite',439sessionSource: this._firstWriteSessionSource ?? 'unknown',440}, {441filesCount: filesToFlush.length,442refsCount: refsToFlush.length,443});444}445} catch (err) {446447this._telemetryService.sendMSFTTelemetryErrorEvent('chronicle.localStore', {448operation: 'flush',449success: 'false',450error: err instanceof Error ? err.message.substring(0, 100) : 'unknown',451}, { opsCount: totalOps });452}453}454}455456457