Path: blob/main/extensions/copilot/src/extension/chronicle/vscode-node/remoteSessionExporter.ts
13399 views
/*---------------------------------------------------------------------------------------------1* Copyright (c) Microsoft Corporation. All rights reserved.2* Licensed under the MIT License. See License.txt in the project root for license information.3*--------------------------------------------------------------------------------------------*/45import { IAuthenticationService } from '../../../platform/authentication/common/authentication';6import { ICopilotTokenManager } from '../../../platform/authentication/common/copilotTokenManager';7import { IChatSessionService } from '../../../platform/chat/common/chatSessionService';8import { ConfigKey, IConfigurationService } from '../../../platform/configuration/common/configurationService';9import { CopilotChatAttr, GenAiAttr, GenAiOperationName } from '../../../platform/otel/common/genAiAttributes';10import { IExperimentationService } from '../../../platform/telemetry/common/nullExperimentationService';11import { type ICompletedSpanData, IOTelService } from '../../../platform/otel/common/otelService';12import { getGitHubRepoInfoFromContext, IGitService } from '../../../platform/git/common/gitService';13import { IGithubRepositoryService } from '../../../platform/github/common/githubService';14import { Disposable, DisposableStore } from '../../../util/vs/base/common/lifecycle';15import { autorun } from '../../../util/vs/base/common/observableInternal';16import { IExtensionContribution } from '../../common/contributions';17import { CircuitBreaker } from '../common/circuitBreaker';18import {19createSessionTranslationState,20makeShutdownEvent,21translateSpan,22type SessionTranslationState,23} from '../common/eventTranslator';24import type { GitHubRepository, CloudSessionIds, SessionEvent, WorkingDirectoryContext } from '../common/cloudSessionTypes';25import { filterSecretsFromObj, addSecretValues } from '../common/secretFilter';26import { SessionIndexingPreference, type SessionIndexingLevel } from '../common/sessionIndexingPreference';27import { IFetcherService } from '../../../platform/networking/common/fetcherService';28import { ITelemetryService } from '../../../platform/telemetry/common/telemetry';29import { CloudSessionApiClient } from '../node/cloudSessionApiClient';3031// ── Configuration ───────────────────────────────────────────────────────────────3233/** How often to flush buffered events to the cloud (ms). */34const BATCH_INTERVAL_MS = 500;3536/** Faster drain interval when buffer is above soft cap. */37const FAST_BATCH_INTERVAL_MS = 200;3839/** Max events per flush request. */40const MAX_EVENTS_PER_FLUSH = 500;4142/** Hard cap on buffered events (drop oldest beyond this). */43const MAX_BUFFER_SIZE = 1_000;4445/** Soft cap — switch to faster drain. */46const SOFT_BUFFER_CAP = 500;4748/**49* Exports VS Code chat session events to the cloud in real-time.50*51* - Listens to OTel spans, translates to cloud SessionEvent format52* - Buffers events and flushes in batches every 500ms53* - Circuit breaker prevents cascading failures when the cloud is unavailable54* - Lazy initialization: no work until the first real chat interaction55*56* All cloud operations are fire-and-forget — never blocks or slows the chat session.57*/58export class RemoteSessionExporter extends Disposable implements IExtensionContribution {5960// ── Per-session state ────────────────────────────────────────────────────────6162/** Per-session cloud IDs (created lazily on first interaction). */63private readonly _cloudSessions = new Map<string, CloudSessionIds>();6465/** Per-session translation state (parentId chaining, session.start tracking). */66private readonly _translationStates = new Map<string, SessionTranslationState>();6768/** Sessions that failed cloud initialization — don't retry. */69private readonly _disabledSessions = new Set<string>();7071/** Sessions currently initializing (prevent concurrent init). */72private readonly _initializingSessions = new Set<string>();7374// ── Shared state ─────────────────────────────────────────────────────────────7576/** Buffered events tagged with their chat session ID for correct routing. */77private readonly _eventBuffer: Array<{ chatSessionId: string; event: SessionEvent }> = [];78private readonly _cloudClient: CloudSessionApiClient;79private readonly _circuitBreaker: CircuitBreaker;8081private _flushTimer: ReturnType<typeof setInterval> | undefined;82private _isFlushing = false;83private _firstCloudWriteLogged = false;8485/** The session source of the first initialized session (for firstWrite telemetry). */86private _firstCloudWriteSessionSource: string | undefined;8788/** Resolved lazily on first use. */89private _repository: GitHubRepository | undefined;90private _repositoryResolved = false;9192/** User's session indexing preference (resolved once per repo). */93private readonly _indexingPreference: SessionIndexingPreference;9495constructor(96@IOTelService private readonly _otelService: IOTelService,97@IChatSessionService private readonly _chatSessionService: IChatSessionService,98@ICopilotTokenManager private readonly _tokenManager: ICopilotTokenManager,99@IAuthenticationService private readonly _authService: IAuthenticationService,100@IGitService private readonly _gitService: IGitService,101@IGithubRepositoryService private readonly _githubRepoService: IGithubRepositoryService,102@IConfigurationService private readonly _configService: IConfigurationService,103@IExperimentationService private readonly _expService: IExperimentationService,104@ITelemetryService private readonly _telemetryService: ITelemetryService,105@IFetcherService private readonly _fetcherService: IFetcherService,106) {107super();108109this._indexingPreference = new SessionIndexingPreference(this._configService);110this._cloudClient = new CloudSessionApiClient(this._tokenManager, this._authService, this._fetcherService);111this._circuitBreaker = new CircuitBreaker({112failureThreshold: 5,113resetTimeoutMs: 1_000,114maxResetTimeoutMs: 30_000,115});116117// Register known auth tokens as dynamic secrets for filtering118this._registerAuthSecrets();119120// Only set up span listener when both local index and cloud sync are enabled.121// Uses autorun to react if settings change at runtime.122const localEnabled = this._configService.getExperimentBasedConfigObservable(ConfigKey.LocalIndexEnabled, this._expService);123const cloudEnabled = this._configService.getConfigObservable(ConfigKey.TeamInternal.SessionSearchCloudSyncEnabled);124const spanListenerStore = this._register(new DisposableStore());125this._register(autorun(reader => {126spanListenerStore.clear();127if (!localEnabled.read(reader) || !cloudEnabled.read(reader)) {128return;129}130131// Listen to completed OTel spans — deferred off the callback132spanListenerStore.add(this._otelService.onDidCompleteSpan(span => {133queueMicrotask(() => this._handleSpan(span));134}));135136// Clean up on session disposal137spanListenerStore.add(this._chatSessionService.onDidDisposeChatSession(sessionId => {138this._handleSessionDispose(sessionId);139}));140}));141}142143override dispose(): void {144if (this._flushTimer !== undefined) {145clearInterval(this._flushTimer);146this._flushTimer = undefined;147}148149// Best-effort final flush with timeout150const pending = this._eventBuffer.length;151if (pending > 0) {152// Fire-and-forget — cannot block dispose153this._flushBatch().catch(() => { /* best effort */ });154}155156this._cloudSessions.clear();157this._translationStates.clear();158this._disabledSessions.clear();159this._initializingSessions.clear();160161super.dispose();162}163164// ── Span handling ────────────────────────────────────────────────────────────165166private _handleSpan(span: ICompletedSpanData): void {167try {168const sessionId = this._getSessionId(span);169const operationName = span.attributes[GenAiAttr.OPERATION_NAME] as string | undefined;170if (!sessionId || this._disabledSessions.has(sessionId)) {171return;172}173174// Only start tracking on invoke_agent (real user interaction)175if (!this._cloudSessions.has(sessionId) && !this._initializingSessions.has(sessionId)) {176if (operationName !== GenAiOperationName.INVOKE_AGENT) {177return;178}179// Trigger lazy initialization — don't await, buffer events in the meantime180this._initializeSession(sessionId, span);181}182183// Translate span to cloud events184const state = this._getOrCreateTranslationState(sessionId);185const context = this._extractContext(span);186const events = translateSpan(span, state, context);187188if (events.length > 0) {189this._bufferEvents(sessionId, events);190this._ensureFlushTimer();191}192} catch {193// Non-fatal — individual span processing failure194}195}196197private _getSessionId(span: ICompletedSpanData): string | undefined {198return (span.attributes[CopilotChatAttr.CHAT_SESSION_ID] as string | undefined)199?? (span.attributes[GenAiAttr.CONVERSATION_ID] as string | undefined)200?? (span.attributes[CopilotChatAttr.SESSION_ID] as string | undefined);201}202203private _getOrCreateTranslationState(sessionId: string): SessionTranslationState {204let state = this._translationStates.get(sessionId);205if (!state) {206state = createSessionTranslationState();207this._translationStates.set(sessionId, state);208}209return state;210}211212private _extractContext(span: ICompletedSpanData): WorkingDirectoryContext | undefined {213const branch = span.attributes[CopilotChatAttr.REPO_HEAD_BRANCH_NAME] as string | undefined;214const remoteUrl = span.attributes[CopilotChatAttr.REPO_REMOTE_URL] as string | undefined;215const commitHash = span.attributes[CopilotChatAttr.REPO_HEAD_COMMIT_HASH] as string | undefined;216if (!branch && !remoteUrl) {217return undefined;218}219return {220repository: remoteUrl,221branch,222headCommit: commitHash,223};224}225226// ── Secret registration ─────────────────────────────────────────────────────227228/**229* Register known authentication tokens as dynamic secrets so they are230* redacted from any event data sent to the cloud.231*/232private _registerAuthSecrets(): void {233// GitHub OAuth token234const githubToken = this._authService.anyGitHubSession?.accessToken;235if (githubToken) {236addSecretValues(githubToken);237}238239// Copilot proxy token (async — register when available)240this._tokenManager.getCopilotToken().then(token => {241if (token.token) {242addSecretValues(token.token);243}244}).catch(() => { /* non-fatal */ });245}246247// ── Lazy session initialization ──────────────────────────────────────────────248249private async _initializeSession(sessionId: string, triggerSpan: ICompletedSpanData): Promise<void> {250this._initializingSessions.add(sessionId);251252try {253const sessionSource = (triggerSpan.attributes[GenAiAttr.AGENT_NAME] as string | undefined) ?? 'unknown';254255// Track the source of the very first session for firstWrite telemetry256if (!this._firstCloudWriteSessionSource) {257this._firstCloudWriteSessionSource = sessionSource;258}259const repo = await this._resolveRepository();260if (!repo) {261this._disabledSessions.add(sessionId);262return;263}264265// Only export remotely if the user has cloud consent for this repo266const repoNwo = `${repo.owner}/${repo.repo}`;267268if (!this._indexingPreference.hasCloudConsent(repoNwo)) {269this._disabledSessions.add(sessionId);270return;271}272await this._createCloudSession(sessionId, repo, this._indexingPreference.getStorageLevel(repoNwo));273/* __GDPR__274"chronicle.cloudSync" : {275"owner": "vijayu",276"comment": "Tracks cloud sync operations (session init, creation, flush, errors)",277"operation": { "classification": "SystemMetaData", "purpose": "PerformanceAndHealth", "comment": "The operation performed." },278"sessionSource": { "classification": "SystemMetaData", "purpose": "PerformanceAndHealth", "comment": "The agent name/source for the session, or unknown if unavailable." },279"success": { "classification": "SystemMetaData", "purpose": "PerformanceAndHealth", "comment": "Whether the operation succeeded." },280"error": { "classification": "CallstackOrException", "purpose": "PerformanceAndHealth", "comment": "Truncated error message if failed." },281"indexingLevel": { "classification": "SystemMetaData", "purpose": "PerformanceAndHealth", "comment": "The indexing level for the session." },282"droppedEvents": { "classification": "SystemMetaData", "purpose": "PerformanceAndHealth", "isMeasurement": true, "comment": "Number of events in a failed batch." }283}284*/285this._telemetryService.sendMSFTTelemetryEvent('chronicle.cloudSync', {286operation: 'sessionInit',287success: 'true',288sessionSource,289});290} catch (err) {291292this._telemetryService.sendMSFTTelemetryErrorEvent('chronicle.cloudSync', {293operation: 'sessionInit',294success: 'false',295error: err instanceof Error ? err.message.substring(0, 100) : 'unknown',296}, {});297this._disabledSessions.add(sessionId);298} finally {299this._initializingSessions.delete(sessionId);300}301}302303/**304* Called when the storage level setting changes.305* Creates cloud sessions for any pending sessions if cloud sync is now enabled.306*/307async notifyConsent(level: SessionIndexingLevel): Promise<void> {308if (level === 'local') {309for (const sessionId of this._translationStates.keys()) {310if (!this._cloudSessions.has(sessionId)) {311this._disabledSessions.add(sessionId);312}313}314return;315}316317const repo = this._repository;318if (!repo) {319return;320}321322for (const sessionId of this._translationStates.keys()) {323if (!this._cloudSessions.has(sessionId) && !this._disabledSessions.has(sessionId)) {324await this._createCloudSession(sessionId, repo, level);325}326}327}328329private async _createCloudSession(330sessionId: string,331repo: GitHubRepository,332indexingLevel: SessionIndexingLevel,333): Promise<void> {334const result = await this._cloudClient.createSession(335repo.repoIds.ownerId,336repo.repoIds.repoId,337sessionId,338indexingLevel === 'repo_and_user' ? 'repo_and_user' : 'user',339);340341if (!result.ok) {342343this._telemetryService.sendMSFTTelemetryErrorEvent('chronicle.cloudSync', {344operation: 'createCloudSession',345success: 'false',346error: result.reason?.substring(0, 100) ?? 'unknown',347}, {});348this._disabledSessions.add(sessionId);349return;350}351352if (!result.response.task_id) {353this._telemetryService.sendMSFTTelemetryErrorEvent('chronicle.cloudSync', {354operation: 'createCloudSession',355success: 'false',356error: 'missing_task_id',357}, {});358this._disabledSessions.add(sessionId);359return;360}361362const cloudIds: CloudSessionIds = {363cloudSessionId: result.response.id,364cloudTaskId: result.response.task_id,365};366367this._cloudSessions.set(sessionId, cloudIds);368369this._telemetryService.sendMSFTTelemetryEvent('chronicle.cloudSync', {370operation: 'createCloudSession',371success: 'true',372indexingLevel,373});374}375376/**377* Resolve the GitHub repository context (cached after first resolution).378* Uses the active git repository to get owner/repo names, then resolves379* numeric IDs via the GitHub REST API.380*/381private async _resolveRepository(): Promise<GitHubRepository | undefined> {382if (this._repositoryResolved) {383return this._repository;384}385this._repositoryResolved = true;386387try {388const repoContext = this._gitService.activeRepository?.get();389if (!repoContext) {390return undefined;391}392393const repoInfo = getGitHubRepoInfoFromContext(repoContext);394if (!repoInfo) {395return undefined;396}397398const { id: repoId } = repoInfo;399const apiResponse = await this._githubRepoService.getRepositoryInfo(repoId.org, repoId.repo);400401this._repository = {402owner: repoId.org,403repo: repoId.repo,404repoIds: {405ownerId: apiResponse.owner.id,406repoId: apiResponse.id,407},408};409return this._repository;410} catch (err) {411412this._telemetryService.sendMSFTTelemetryErrorEvent('chronicle.cloudSync', {413operation: 'resolveRepository',414success: 'false',415error: err instanceof Error ? err.message.substring(0, 100) : 'unknown',416}, {});417return undefined;418}419}420421// ── Session disposal ─────────────────────────────────────────────────────────422423private _handleSessionDispose(sessionId: string): void {424const state = this._translationStates.get(sessionId);425if (state && this._cloudSessions.has(sessionId)) {426const event = makeShutdownEvent(state);427this._bufferEvents(sessionId, [event]);428}429430this._cloudSessions.delete(sessionId);431this._translationStates.delete(sessionId);432this._disabledSessions.delete(sessionId);433this._initializingSessions.delete(sessionId);434}435436// ── Buffering ────────────────────────────────────────────────────────────────437438private _bufferEvents(chatSessionId: string, events: SessionEvent[]): void {439for (const event of events) {440this._eventBuffer.push({ chatSessionId, event });441}442443// Hard cap — drop oldest events444if (this._eventBuffer.length > MAX_BUFFER_SIZE) {445const dropped = this._eventBuffer.length - MAX_BUFFER_SIZE;446this._eventBuffer.splice(0, dropped);447}448}449450// ── Flush timer ──────────────────────────────────────────────────────────────451452private _ensureFlushTimer(): void {453if (this._flushTimer !== undefined) {454return;455}456457const interval = this._eventBuffer.length > SOFT_BUFFER_CAP458? FAST_BATCH_INTERVAL_MS459: BATCH_INTERVAL_MS;460461this._flushTimer = setInterval(() => {462this._flushBatch().catch(err => {463464this._telemetryService.sendMSFTTelemetryErrorEvent('chronicle.cloudSync', {465operation: 'flush',466success: 'false',467error: err instanceof Error ? err.message.substring(0, 100) : 'unknown',468}, {});469});470}, interval);471}472473private _stopFlushTimer(): void {474if (this._flushTimer !== undefined) {475clearInterval(this._flushTimer);476this._flushTimer = undefined;477}478}479480// ── Batch flush ──────────────────────────────────────────────────────────────481482private async _flushBatch(): Promise<void> {483if (this._isFlushing) {484return;485}486487if (this._eventBuffer.length === 0) {488if (this._cloudSessions.size === 0) {489this._stopFlushTimer();490}491return;492}493494if (!this._circuitBreaker.canRequest()) {495if (this._eventBuffer.length > MAX_BUFFER_SIZE) {496const dropped = this._eventBuffer.length - MAX_BUFFER_SIZE;497this._eventBuffer.splice(0, dropped);498}499return;500}501502this._isFlushing = true;503const batch = this._eventBuffer.splice(0, MAX_EVENTS_PER_FLUSH);504505try {506// Group events by chat session ID for correct cloud session routing507const eventsBySession = new Map<string, SessionEvent[]>();508const orphanedEntries: typeof batch = [];509510for (const entry of batch) {511const cloudIds = this._cloudSessions.get(entry.chatSessionId);512if (cloudIds) {513const arr = eventsBySession.get(cloudIds.cloudSessionId) ?? [];514arr.push(entry.event);515eventsBySession.set(cloudIds.cloudSessionId, arr);516} else {517orphanedEntries.push(entry);518}519}520521// Re-queue events with no cloud session (session not initialized yet),522// but drop events for sessions that have been disabled (init failed).523if (orphanedEntries.length > 0) {524const requeue = orphanedEntries.filter(e =>525!this._disabledSessions.has(e.chatSessionId)526&& (this._initializingSessions.has(e.chatSessionId) || this._cloudSessions.has(e.chatSessionId))527);528if (requeue.length > 0) {529this._eventBuffer.unshift(...requeue);530}531}532533// Submit each session's events to the correct cloud session534let allSuccess = true;535for (const [cloudSessionId, events] of eventsBySession) {536const filteredEvents = events.map(e => filterSecretsFromObj(e));537const success = await this._cloudClient.submitSessionEvents(cloudSessionId, filteredEvents);538if (!success) {539allSuccess = false;540}541}542543if (allSuccess && eventsBySession.size > 0) {544this._circuitBreaker.recordSuccess();545546if (!this._firstCloudWriteLogged) {547this._firstCloudWriteLogged = true;548549this._telemetryService.sendMSFTTelemetryEvent('chronicle.cloudSync', {550operation: 'firstWrite',551sessionSource: this._firstCloudWriteSessionSource ?? 'unknown',552}, {});553}554} else if (!allSuccess) {555this._circuitBreaker.recordFailure();556}557} catch (err) {558// Re-queue on unexpected error559this._eventBuffer.unshift(...batch);560this._circuitBreaker.recordFailure();561562this._telemetryService.sendMSFTTelemetryErrorEvent('chronicle.cloudSync', {563operation: 'flushBatch',564success: 'false',565error: err instanceof Error ? err.message.substring(0, 100) : 'unknown',566}, { droppedEvents: batch.length });567} finally {568this._isFlushing = false;569}570571if (this._eventBuffer.length > SOFT_BUFFER_CAP && this._flushTimer !== undefined) {572this._stopFlushTimer();573this._ensureFlushTimer();574}575}576577}578579580