Path: blob/main/extensions/copilot/src/platform/networking/node/chatStream.ts
13401 views
/*---------------------------------------------------------------------------------------------1* Copyright (c) Microsoft Corporation. All rights reserved.2* Licensed under the MIT License. See License.txt in the project root for license information.3*--------------------------------------------------------------------------------------------*/45import { Raw } from '@vscode/prompt-tsx';6import { hash } from '../../../util/vs/base/common/hash';7import { LRUCache } from '../../../util/vs/base/common/map';8import { generateUuid } from '../../../util/vs/base/common/uuid';9import { toTextParts } from '../../chat/common/globalStringUtils';10import { ILogService } from '../../log/common/logService';11import { ITelemetryService, multiplexProperties } from '../../telemetry/common/telemetry';12import { TelemetryData } from '../../telemetry/common/telemetryData';13import { APIJsonData, CAPIChatMessage, ChatCompletion, rawMessageToCAPI } from '../common/openai';14import { FinishedCompletion, convertToAPIJsonData } from './stream';1516// TODO @lramos15 - Find a better file for this, since this file is for the chat stream and should not be telemetry related17export function sendEngineMessagesLengthTelemetry(telemetryService: ITelemetryService, messages: CAPIChatMessage[], telemetryData: TelemetryData, isOutput: boolean, logService?: ILogService) {18const messageType = isOutput ? 'output' : 'input';1920// Get the unique model call ID - it should already be set in the base telemetryData21const modelCallId = telemetryData.properties.modelCallId as string;22if (!modelCallId) {23// This shouldn't happen if the ID was properly generated at request start24logService?.warn('[TELEMETRY] modelCallId not found in telemetryData, input/output messages cannot be linked');25return;26}2728// Create messages with content and tool_calls arguments replaced by length29const messagesWithLength = messages.map(msg => {30const processedMsg: any = {31...msg, // This preserves ALL existing fields including tool_calls, tool_call_id, copilot_references, etc.32content: typeof msg.content === 'string'33? msg.content.length34: Array.isArray(msg.content)35? msg.content.reduce((total: number, part: any) => {36if (typeof part === 'string') {37return total + part.length;38}39if (part.type === 'text') {40return total + (part.text?.length || 0);41}42return total;43}, 0)44: 0,45};4647// Process tool_calls if present48if ('tool_calls' in msg && msg.tool_calls && Array.isArray(msg.tool_calls)) {49processedMsg.tool_calls = msg.tool_calls.map((toolCall: any) => ({50...toolCall,51function: toolCall.function ? {52...toolCall.function,53arguments: typeof toolCall.function.arguments === 'string'54? toolCall.function.arguments.length55: toolCall.function.arguments56} : toolCall.function57}));58}5960return processedMsg;61});6263// Process properties to replace request.option.tools.* field values with their length64const processedProperties: { [key: string]: string } = {};65for (const [key, value] of Object.entries(telemetryData.properties)) {66if (key.startsWith('request.option.tools')) {67// Replace the content with its length68if (typeof value === 'string') {69// If it's a string, it might be a JSON array, try to parse it70try {71const parsed = JSON.parse(value);72if (Array.isArray(parsed)) {73processedProperties[key] = parsed.length.toString();74} else {75processedProperties[key] = value.length.toString();76}77} catch {78// If parsing fails, just use string length79processedProperties[key] = value.length.toString();80}81} else if (Array.isArray(value)) {82processedProperties[key] = (value as any[]).length.toString();83} else {84processedProperties[key] = '0';85}86} else {87processedProperties[key] = value;88}89}9091const telemetryDataWithPrompt = TelemetryData.createAndMarkAsIssued({92...processedProperties,93messagesJson: JSON.stringify(messagesWithLength),94message_direction: messageType,95modelCallId: modelCallId, // Include at telemetry event level too96}, telemetryData.measurements);9798telemetryService.sendEnhancedGHTelemetryEvent('engine.messages.length', multiplexProperties(telemetryDataWithPrompt.properties), telemetryDataWithPrompt.measurements);99telemetryService.sendInternalMSFTTelemetryEvent('engine.messages.length', multiplexProperties(telemetryDataWithPrompt.properties), telemetryDataWithPrompt.measurements);100}101102// LRU cache from message hash to UUID to ensure same content gets same UUID (limit: 1000 entries)103const messageHashToUuid = new LRUCache<string, string>(1000);104105// LRU cache from request options hash to requestOptionsId to ensure same options get same ID (limit: 500 entries)106const requestOptionsHashToId = new LRUCache<string, string>(500);107108// LRU cache to track headerRequestId to requestTurn mapping for temporal location tracking along main agent flow (limit: 1000 entries)109const headerRequestIdTracker = new LRUCache<string, number>(1000);110111// Track most recent conversation headerRequestId for linking supplementary calls112const mainHeaderRequestIdTracker: { headerRequestId: string | null } = {113headerRequestId: null114};115116// Track conversation turns for model.request.added events (limit: 100 entries)117const conversationTracker = new LRUCache<string, number>(100);118119/**120* Updates the headerRequestIdTracker with the given headerRequestId.121* If the headerRequestId already exists, increments its requestTurn.122* If it doesn't exist, adds it with requestTurn = 1.123* Returns the current requestTurn for the headerRequestId.124*/125function updateHeaderRequestIdTracker(headerRequestId: string): number {126const currentTurn = headerRequestIdTracker.get(headerRequestId);127if (currentTurn !== undefined) {128// HeaderRequestId exists, increment turn129const newTurn = currentTurn + 1;130headerRequestIdTracker.set(headerRequestId, newTurn);131return newTurn;132} else {133// New headerRequestId, set turn to 1134headerRequestIdTracker.set(headerRequestId, 1);135return 1;136}137}138139/**140* Updates the conversationTracker with the given conversationId.141* If the conversationId already exists, increments its turn.142* If it doesn't exist, adds it with turn = 1.143* Returns the current conversationTurn for the conversationId.144*/145function updateConversationTracker(conversationId: string): number {146const currentTurn = conversationTracker.get(conversationId);147if (currentTurn !== undefined) {148// ConversationId exists, increment turn149const newTurn = currentTurn + 1;150conversationTracker.set(conversationId, newTurn);151return newTurn;152} else {153// New conversationId, set turn to 1154conversationTracker.set(conversationId, 1);155return 1;156}157}158159// ===== MODEL TELEMETRY FUNCTIONS =====160// These functions send 'model...' events and are grouped together for better organization161162function sendModelRequestOptionsTelemetry(telemetryService: ITelemetryService, telemetryData: TelemetryData, logService?: ILogService): string | undefined {163// Extract all request.option.* properties164const requestOptions: { [key: string]: string } = {};165for (const [key, value] of Object.entries(telemetryData.properties)) {166if (key.startsWith('request.option.')) {167requestOptions[key] = value;168}169}170171// Only process if there are request options172if (Object.keys(requestOptions).length === 0) {173return undefined;174}175176// Extract context properties177const conversationId = telemetryData.properties.conversationId || telemetryData.properties.sessionId || 'unknown';178const headerRequestId = telemetryData.properties.headerRequestId || 'unknown';179180// Create a hash of the request options to detect duplicates181const requestOptionsHash = hash(requestOptions).toString();182183// Get existing requestOptionsId for this content, or generate a new one184let requestOptionsId = requestOptionsHashToId.get(requestOptionsHash);185if (!requestOptionsId) {186// This is a new set of request options, generate ID and send the event187requestOptionsId = generateUuid();188requestOptionsHashToId.set(requestOptionsHash, requestOptionsId);189} else {190// Skip sending model.request.options.added if this exact request options have already been logged191return requestOptionsId;192}193194// Convert request options to JSON string for chunking195const requestOptionsJsonString = JSON.stringify(requestOptions);196const maxChunkSize = 8000;197198// Split request options JSON into chunks of 8000 characters or less199const chunks: string[] = [];200for (let i = 0; i < requestOptionsJsonString.length; i += maxChunkSize) {201chunks.push(requestOptionsJsonString.substring(i, i + maxChunkSize));202}203204// Send one telemetry event per chunk205for (let chunkIndex = 0; chunkIndex < chunks.length; chunkIndex++) {206const requestOptionsData = TelemetryData.createAndMarkAsIssued({207requestOptionsId,208conversationId,209headerRequestId,210requestOptionsJson: chunks[chunkIndex], // Store chunk of request options JSON211chunkIndex: chunkIndex.toString(), // 0-based chunk index for ordering212totalChunks: chunks.length.toString(), // Total number of chunks for this request213}, telemetryData.measurements); // Include measurements from original telemetryData214215telemetryService.sendInternalMSFTTelemetryEvent('model.request.options.added', requestOptionsData.properties, requestOptionsData.measurements);216}217218return requestOptionsId;219}220221function sendNewRequestAddedTelemetry(telemetryService: ITelemetryService, telemetryData: TelemetryData, logService?: ILogService): void {222// This function captures user-level request context (username, session info, user preferences, etc.)223// It's called once per unique user request (identified by headerRequestId)224// It excludes message content and request options which are captured separately225226// Extract headerRequestId to check for uniqueness227const headerRequestId = telemetryData.properties.headerRequestId;228if (!headerRequestId) {229return;230}231232// Check if this is a conversation mode (has conversationId) or supplementary mode233// This must be done BEFORE the duplicate check to ensure tracker is always updated234const conversationId = telemetryData.properties.conversationId;235if (conversationId) {236// Conversation mode: update tracker with current headerRequestId237mainHeaderRequestIdTracker.headerRequestId = headerRequestId;238}239240// Check if we've already processed this headerRequestId241if (headerRequestIdTracker.has(headerRequestId)) {242return;243}244245// Update conversation tracker and get conversation turn only for new headerRequestIds246let conversationTurn: number | undefined;247if (conversationId) {248conversationTurn = updateConversationTracker(conversationId);249}250251// Filter out properties that start with "message" or "request.option" and exclude modelCallId252const filteredProperties: { [key: string]: string } = {};253for (const [key, value] of Object.entries(telemetryData.properties)) {254if (!key.startsWith('message') && !key.startsWith('request.option') && key !== 'modelCallId') {255filteredProperties[key] = value;256}257}258259// Add conversationTurn if conversationId is present260if (conversationTurn !== undefined) {261filteredProperties.conversationTurn = conversationTurn.toString();262}263264// For supplementary mode: add conversation linking fields if we have tracked data265if (!conversationId && mainHeaderRequestIdTracker.headerRequestId) {266const mostRecentTurn = headerRequestIdTracker.get(mainHeaderRequestIdTracker.headerRequestId);267filteredProperties.mostRecentConversationHeaderRequestId = mainHeaderRequestIdTracker.headerRequestId;268if (mostRecentTurn !== undefined) {269filteredProperties.mostRecentConversationHeaderRequestIdTurn = mostRecentTurn.toString();270}271}272273// Create telemetry data for the request274const requestData = TelemetryData.createAndMarkAsIssued(filteredProperties, telemetryData.measurements);275276telemetryService.sendInternalMSFTTelemetryEvent('model.request.added', requestData.properties, requestData.measurements);277}278279function sendIndividualMessagesTelemetry(telemetryService: ITelemetryService, messages: CAPIChatMessage[], telemetryData: TelemetryData, messageDirection: 'input' | 'output', logService?: ILogService): Array<{ uuid: string; headerRequestId: string }> {280const messageData: Array<{ uuid: string; headerRequestId: string }> = [];281282for (const message of messages) {283// Extract context properties with fallbacks284const conversationId = telemetryData.properties.conversationId || telemetryData.properties.sessionId || 'unknown';285const headerRequestId = telemetryData.properties.headerRequestId || 'unknown';286287// Create a hash of the message content AND headerRequestId to detect duplicates288// Including headerRequestId ensures same message content with different headerRequestIds gets separate UUIDs289const messageHash = hash({290role: message.role,291content: message.content,292headerRequestId: headerRequestId, // Include headerRequestId in hash for proper deduplication293...(('tool_calls' in message && message.tool_calls) && { tool_calls: message.tool_calls }),294...(('tool_call_id' in message && message.tool_call_id) && { tool_call_id: message.tool_call_id })295}).toString();296297// Get existing UUID for this message content + headerRequestId combination, or generate a new one298let messageUuid = messageHashToUuid.get(messageHash);299300if (!messageUuid) {301// This is a new message, generate UUID and send the event302messageUuid = generateUuid();303messageHashToUuid.set(messageHash, messageUuid);304} else {305// Always collect UUIDs and headerRequestIds for model call tracking306messageData.push({ uuid: messageUuid, headerRequestId });307308// Skip sending model.message.added if this exact message has already been logged309continue;310}311312// Always collect UUIDs and headerRequestIds for model call tracking313messageData.push({ uuid: messageUuid, headerRequestId });314315// Convert message to JSON string for chunking316const messageJsonString = JSON.stringify(message);317318const maxChunkSize = 8000;319320// Split messageJson into chunks of 8000 characters or less321const chunks: string[] = [];322for (let i = 0; i < messageJsonString.length; i += maxChunkSize) {323chunks.push(messageJsonString.substring(i, i + maxChunkSize));324}325326// Send one telemetry event per chunk327for (let chunkIndex = 0; chunkIndex < chunks.length; chunkIndex++) {328const messageData = TelemetryData.createAndMarkAsIssued({329messageUuid,330messageDirection,331conversationId,332headerRequestId,333messageJson: chunks[chunkIndex], // Store chunk of message JSON334chunkIndex: chunkIndex.toString(), // 0-based chunk index for ordering335totalChunks: chunks.length.toString(), // Total number of chunks for this message336}, telemetryData.measurements); // Include measurements from original telemetryData337338telemetryService.sendInternalMSFTTelemetryEvent('model.message.added', messageData.properties, messageData.measurements);339}340}341342return messageData; // Return collected message data with UUIDs and headerRequestIds343}344345function sendModelCallTelemetry(telemetryService: ITelemetryService, messageData: Array<{ uuid: string; headerRequestId: string }>, telemetryData: TelemetryData, messageDirection: 'input' | 'output', logService?: ILogService) {346// Get the unique model call ID347const modelCallId = telemetryData.properties.modelCallId as string;348if (!modelCallId) {349return;350}351352// For input calls, process request options and get requestOptionsId353let requestOptionsId: string | undefined;354if (messageDirection === 'input') {355requestOptionsId = sendModelRequestOptionsTelemetry(telemetryService, telemetryData, logService);356}357358// Extract trajectory context359const conversationId = telemetryData.properties.conversationId || telemetryData.properties.sessionId || 'unknown';360361// Group messages by headerRequestId362const messagesByHeaderRequestId = new Map<string, string[]>();363364for (const item of messageData) {365if (!messagesByHeaderRequestId.has(item.headerRequestId)) {366messagesByHeaderRequestId.set(item.headerRequestId, []);367}368messagesByHeaderRequestId.get(item.headerRequestId)!.push(item.uuid);369}370371// Send separate telemetry events for each headerRequestId372for (const [headerRequestId, messageUuids] of messagesByHeaderRequestId) {373const eventName = messageDirection === 'input' ? 'model.modelCall.input' : 'model.modelCall.output';374375// Update headerRequestIdTracker and get requestTurn only for input events376let requestTurn: number | undefined;377if (messageDirection === 'input') {378requestTurn = updateHeaderRequestIdTracker(headerRequestId);379}380381// Convert messageUuids to JSON string for chunking382const messageUuidsJsonString = JSON.stringify(messageUuids);383const maxChunkSize = 8000;384385// Split messageUuids JSON into chunks of 8000 characters or less386const chunks: string[] = [];387for (let i = 0; i < messageUuidsJsonString.length; i += maxChunkSize) {388chunks.push(messageUuidsJsonString.substring(i, i + maxChunkSize));389}390391// Send one telemetry event per chunk392for (let chunkIndex = 0; chunkIndex < chunks.length; chunkIndex++) {393const parentToolCallId = telemetryData.properties.parentToolCallId;394const parentHeaderRequestId = telemetryData.properties.parentHeaderRequestId;395const modelCallData = TelemetryData.createAndMarkAsIssued({396modelCallId,397conversationId, // Trajectory identifier linking main and supplementary calls398headerRequestId, // Specific to this set of messages399messageDirection,400messageUuids: chunks[chunkIndex], // Store chunk of messageUuids JSON401chunkIndex: chunkIndex.toString(), // 0-based chunk index for ordering402totalChunks: chunks.length.toString(), // Total number of chunks for this headerRequestId403messageCount: messageUuids.length.toString(),404...(requestTurn !== undefined && { requestTurn: requestTurn.toString() }), // Add requestTurn only for input calls405...(requestOptionsId && { requestOptionsId }), // Add requestOptionsId for input calls406...(telemetryData.properties.turnIndex && { turnIndex: telemetryData.properties.turnIndex }), // Add turnIndex from original telemetryData407...(parentToolCallId && { parentToolCallId }), // Link subagent calls to parent tool invocation408...(parentHeaderRequestId && { parentHeaderRequestId }), // Link subagent calls to parent HTTP request409}, telemetryData.measurements); // Include measurements from original telemetryData410411telemetryService.sendInternalMSFTTelemetryEvent(eventName, modelCallData.properties, modelCallData.measurements);412}413}414}415416function sendModelTelemetryEvents(telemetryService: ITelemetryService, messages: CAPIChatMessage[], telemetryData: TelemetryData, isOutput: boolean, logService?: ILogService): void {417// Skip model telemetry events for XtabProvider and api.* message sources418const messageSource = telemetryData.properties.messageSource as string;419if (messageSource === 'XtabProvider' || (messageSource && messageSource.startsWith('api.'))) {420return;421}422423// Send model.request.added event for user input requests (once per headerRequestId)424// This captures user-level context (username, session info, etc.) for the user's request425// Note: This is different from model-level context which is captured in model.modelCall events426if (!isOutput) {427sendNewRequestAddedTelemetry(telemetryService, telemetryData, logService);428}429430// Skip input message telemetry for retry requests to avoid duplicates431// Retry requests are identified by the presence of retryAfterFilterCategory property432const isRetryRequest = telemetryData.properties.retryAfterFilterCategory !== undefined;433if (!isOutput && isRetryRequest) {434return;435}436437// Send individual message telemetry for deduplication tracking and collect UUIDs with their headerRequestIds438const messageData = sendIndividualMessagesTelemetry(telemetryService, messages, telemetryData, isOutput ? 'output' : 'input', logService);439440// Send model call telemetry grouped by headerRequestId (separate events for different headerRequestIds)441// For input calls, this also handles request options deduplication442// Always send model call telemetry regardless of whether messages are new or duplicates to ensure every model invocation is tracked443sendModelCallTelemetry(telemetryService, messageData, telemetryData, isOutput ? 'output' : 'input', logService);444}445446// ===== END MODEL TELEMETRY FUNCTIONS =====447448export function sendEngineMessagesTelemetry(telemetryService: ITelemetryService, messages: CAPIChatMessage[], telemetryData: TelemetryData, isOutput: boolean, logService?: ILogService) {449const telemetryDataWithPrompt = telemetryData.extendedBy({450messagesJson: JSON.stringify(messages),451});452453telemetryService.sendEnhancedGHTelemetryEvent('engine.messages', multiplexProperties(telemetryDataWithPrompt.properties), telemetryDataWithPrompt.measurements);454// Commenting this out to test a new deduplicated way to collect the same information using sendModelTelemetryEvents()455// TO DO remove this line completely if the new way allows for complete reconstruction of entire message arrays with much lower drop rate456//telemetryService.sendInternalMSFTTelemetryEvent('engine.messages', multiplexProperties(telemetryDataWithPrompt.properties), telemetryDataWithPrompt.measurements);457458// Send all model telemetry events (model.request.added, model.message.added, model.modelCall.input/output, model.request.options.added)459// Comment out the line below to disable the new deduplicated model telemetry events460sendModelTelemetryEvents(telemetryService, messages, telemetryData, isOutput, logService);461462// Also send length-only telemetry463sendEngineMessagesLengthTelemetry(telemetryService, messages, telemetryData, isOutput, logService);464}465466export function sendResponsesApiCompactionTelemetry(467telemetryService: ITelemetryService,468properties: {469outcome: 'compaction_returned' | 'threshold_met_no_compaction';470headerRequestId: string;471gitHubRequestId: string;472model: string;473},474measurements: {475compactThreshold?: number;476promptTokens: number;477totalTokens: number;478}479): void {480/* __GDPR__481"responsesApi.compactionOutcome" : {482"owner": "dileepy",483"comment": "Tracks server-side Responses API compaction outcomes.",484"outcome": { "classification": "SystemMetaData", "purpose": "FeatureInsight", "comment": "Whether the server returned a compaction item or exceeded the threshold without returning one." },485"headerRequestId": { "classification": "SystemMetaData", "purpose": "FeatureInsight", "comment": "Request ID from the response headers." },486"gitHubRequestId": { "classification": "SystemMetaData", "purpose": "FeatureInsight", "comment": "GitHub request ID from the response headers if present." },487"model": { "classification": "SystemMetaData", "purpose": "FeatureInsight", "comment": "Model identifier reported by the response." },488"compactThreshold": { "classification": "SystemMetaData", "purpose": "FeatureInsight", "isMeasurement": true, "comment": "Compaction threshold configured for the request." },489"promptTokens": { "classification": "SystemMetaData", "purpose": "FeatureInsight", "isMeasurement": true, "comment": "Prompt token count reported by the response." },490"totalTokens": { "classification": "SystemMetaData", "purpose": "FeatureInsight", "isMeasurement": true, "comment": "Total token count reported by the response." }491}492*/493telemetryService.sendGHTelemetryEvent('responsesApi.compactionOutcome', {494outcome: properties.outcome,495headerRequestId: properties.headerRequestId,496gitHubRequestId: properties.gitHubRequestId,497model: properties.model,498}, {499compactThreshold: measurements.compactThreshold,500promptTokens: measurements.promptTokens,501totalTokens: measurements.totalTokens,502});503}504505export function prepareChatCompletionForReturn(506telemetryService: ITelemetryService,507logService: ILogService,508c: FinishedCompletion,509telemetryData: TelemetryData510): ChatCompletion {511let messageContent = c.solution.text.join('');512513let blockFinished = false;514if (c.finishOffset !== undefined) {515// Trim solution to finishOffset returned by finishedCb516logService.debug(`message ${c.index}: early finish at offset ${c.finishOffset}`);517messageContent = messageContent.substring(0, c.finishOffset);518blockFinished = true;519}520521logService.info(`message ${c.index} returned. finish reason: [${c.reason}]`);522logService.debug(523`message ${c.index} details: finishOffset: [${c.finishOffset}] completionId: [{${c.requestId.completionId}}] created: [{${c.requestId.created}}]`524);525const jsonData: APIJsonData = convertToAPIJsonData(c.solution);526const message: Raw.ChatMessage = {527role: Raw.ChatRole.Assistant,528content: toTextParts(messageContent),529};530531// Create enhanced message for telemetry with usage information532const telemetryMessage = rawMessageToCAPI(message);533534// Add request metadata to telemetry data535telemetryData.extendWithRequestId(c.requestId);536537// Add usage information to telemetryData if available538let telemetryDataWithUsage = telemetryData;539if (c.usage) {540telemetryDataWithUsage = telemetryData.extendedBy({}, {541promptTokens: c.usage.prompt_tokens,542completionTokens: c.usage.completion_tokens,543totalTokens: c.usage.total_tokens,544...(c.usage.prompt_tokens_details && { cachedTokens: c.usage.prompt_tokens_details.cached_tokens }),545...(c.usage.completion_tokens_details && {546reasoningTokens: c.usage.completion_tokens_details.reasoning_tokens,547acceptedPredictionTokens: c.usage.completion_tokens_details.accepted_prediction_tokens,548rejectedPredictionTokens: c.usage.completion_tokens_details.rejected_prediction_tokens,549}),550});551}552553sendEngineMessagesTelemetry(telemetryService, [telemetryMessage], telemetryDataWithUsage, true, logService);554return {555message: message,556choiceIndex: c.index,557requestId: c.requestId,558blockFinished: blockFinished,559finishReason: c.reason,560filterReason: c.filterReason,561error: c.error,562tokens: jsonData.tokens,563model: c.solution.model,564usage: c.usage,565telemetryData: telemetryDataWithUsage,566};567}568569570