Path: blob/main/extensions/copilot/src/platform/endpoint/node/messagesApi.ts
13401 views
/*---------------------------------------------------------------------------------------------1* Copyright (c) Microsoft Corporation. All rights reserved.2* Licensed under the MIT License. See License.txt in the project root for license information.3*--------------------------------------------------------------------------------------------*/45import { ContentBlockParam, DocumentBlockParam, ImageBlockParam, MessageParam, RedactedThinkingBlockParam, TextBlockParam, ThinkingBlockParam, ToolReferenceBlockParam, ToolResultBlockParam } from '@anthropic-ai/sdk/resources';6import { Raw } from '@vscode/prompt-tsx';7import { Response } from '../../../platform/networking/common/fetcherService';8import { AsyncIterableObject } from '../../../util/vs/base/common/async';9import { SSEParser } from '../../../util/vs/base/common/sseParser';10import { generateUuid } from '../../../util/vs/base/common/uuid';11import { IInstantiationService, ServicesAccessor } from '../../../util/vs/platform/instantiation/common/instantiation';12import { ChatLocation } from '../../chat/common/commonTypes';13import { ConfigKey, IConfigurationService } from '../../configuration/common/configurationService';14import { ILogService } from '../../log/common/logService';15import { AnthropicMessagesTool, ContextManagementResponse, CUSTOM_TOOL_SEARCH_NAME, getContextManagementFromConfig, isAnthropicContextEditingEnabled } from '../../networking/common/anthropic';16import { FinishedCallback, getRequestId, IIPCodeCitation, IResponseDelta } from '../../networking/common/fetch';17import { IChatEndpoint, ICreateEndpointBodyOptions, IEndpointBody } from '../../networking/common/networking';18import { ChatCompletion, FinishedCompletionReason, rawMessageToCAPI } from '../../networking/common/openai';19import { IToolDeferralService } from '../../networking/common/toolDeferralService';20import { sendEngineMessagesTelemetry } from '../../networking/node/chatStream';21import { IExperimentationService } from '../../telemetry/common/nullExperimentationService';22import { ITelemetryService } from '../../telemetry/common/telemetry';23import { TelemetryData } from '../../telemetry/common/telemetryData';2425/**26* Build the `input_schema` for an Anthropic tool from an arbitrary JSON Schema27* object. Ensures `type: 'object'` and `properties` default, preserves extra28* keys like `$defs` and `additionalProperties`, and strips `$schema` which the29* Anthropic API rejects.30*/31export function buildToolInputSchema(schema: Record<string, unknown> | undefined): Record<string, unknown> & { type: 'object' } {32if (!schema) {33return { type: 'object', properties: {} };34}35const { $schema: _, ...rest } = schema;36return { type: 'object', properties: {}, ...rest };37}3839/** IP Code Citation annotation from Messages API copilot_annotations */40interface AnthropicIPCodeCitation {41id: number;42start_offset: number;43end_offset: number;44details: Record<string, unknown>;45citations: {46snippet: string;47url: string;48ip_type?: string;49license: string;50};51}5253interface AnthropicStreamEvent {54type: string;55message?: {56id: string;57type: string;58role: string;59content: ContentBlockParam[];60model: string;61stop_reason: string | null;62stop_sequence: string | null;63usage: {64input_tokens: number;65output_tokens: number;66cache_creation_input_tokens?: number;67cache_read_input_tokens?: number;68};69};70index?: number;71content_block?: ContentBlockParam | ThinkingBlockParam | RedactedThinkingBlockParam;72delta?: {73type: string;74text?: string;75partial_json?: string;76thinking?: string;77signature?: string;78stop_reason?: string;79stop_sequence?: string;80stop_details?: {81category?: string;82explanation?: string;83type?: string;84};85};86copilot_annotations?: {87IPCodeCitations?: AnthropicIPCodeCitation[];88};89usage?: {90output_tokens: number;91input_tokens?: number;92cache_creation_input_tokens?: number;93cache_read_input_tokens?: number;94};95context_management?: ContextManagementResponse;96}9798export function createMessagesRequestBody(accessor: ServicesAccessor, options: ICreateEndpointBodyOptions, model: string, endpoint: IChatEndpoint): IEndpointBody {99const configurationService = accessor.get(IConfigurationService);100const experimentationService = accessor.get(IExperimentationService);101const toolDeferralService = accessor.get(IToolDeferralService);102103const toolSearchEnabled = !!endpoint.supportsToolSearch104&& !!options.requestOptions?.tools?.some(t => t.function.name === CUSTOM_TOOL_SEARCH_NAME);105106// Split tools into non-deferred and deferred up front so we can build finalTools107// with non-deferred first. This ensures the cache_control breakpoint on the last108// non-deferred tool caches the maximum stable prefix.109const nonDeferredTools: AnthropicMessagesTool[] = [];110const deferredTools: AnthropicMessagesTool[] = [];111if (options.requestOptions?.tools) {112for (const tool of options.requestOptions.tools) {113if (!tool.function.name || tool.function.name.length === 0) {114continue;115}116const isDeferred = options.modelCapabilities?.enableToolSearch && toolSearchEnabled && !toolDeferralService.isNonDeferredTool(tool.function.name);117const anthropicTool: AnthropicMessagesTool = {118name: tool.function.name,119description: tool.function.description || '',120input_schema: buildToolInputSchema(tool.function.parameters as Record<string, unknown> | undefined),121...(isDeferred ? { defer_loading: true } : {}),122};123(isDeferred ? deferredTools : nonDeferredTools).push(anthropicTool);124}125}126127// Build final tools array. The client-side search_tools tool is already in the128// anthropicTools array (registered as a model-specific VS Code tool) and will handle129// tool search client-side. Deferred tools still have defer_loading: true so the model130// knows to use the search tool to discover them.131const finalTools: AnthropicMessagesTool[] = [...nonDeferredTools, ...deferredTools];132133// Thinking is enabled only when options.modelCapabilities?.enableThinking is true, a non-zero thinking budget134// is configured for the model, and the model supports thinking. reasoningEffort (if present)135// is used only to configure the effort level when thinking is enabled, not to gate it.136const reasoningEffort = options.modelCapabilities?.reasoningEffort;137let thinkingConfig: { type: 'enabled' | 'adaptive'; budget_tokens?: number; display?: 'summarized' } | undefined;138if (options.modelCapabilities?.enableThinking) {139const hardcodedBudget = 16000;140if (endpoint.supportsAdaptiveThinking) {141thinkingConfig = { type: 'adaptive', display: 'summarized' };142} else if (endpoint.maxThinkingBudget && endpoint.minThinkingBudget) {143const maxTokens = options.postOptions.max_tokens ?? 1024;144const minBudget = endpoint.minThinkingBudget ?? 1024;145const normalizedBudget = hardcodedBudget < minBudget ? minBudget : hardcodedBudget;146const maxBudget = endpoint.maxThinkingBudget ?? 32000;147const thinkingBudget = Math.min(maxBudget, maxTokens - 1, normalizedBudget);148if (thinkingBudget) {149thinkingConfig = { type: 'enabled', budget_tokens: thinkingBudget };150}151}152}153154const thinkingEnabled = !!thinkingConfig;155let effort: 'low' | 'medium' | 'high' | undefined;156if (thinkingConfig && endpoint.supportsReasoningEffort?.length) {157const candidateEffort = configurationService.getConfig(ConfigKey.Advanced.ReasoningEffortOverride)158?? reasoningEffort159?? (endpoint.supportsReasoningEffort.length === 1 ? endpoint.supportsReasoningEffort[0] : 'medium');160if (candidateEffort === 'low' || candidateEffort === 'medium' || candidateEffort === 'high') {161effort = candidateEffort;162}163}164165// Build context management configuration166const contextManagement = options.modelCapabilities?.enableContextEditing && isAnthropicContextEditingEnabled(endpoint, configurationService, experimentationService)167? getContextManagementFromConfig(configurationService, experimentationService, thinkingEnabled)168: undefined;169170const logService = accessor.get(ILogService);171const telemetryService = accessor.get(ITelemetryService);172// TODO: Ideally the custom tool_search tool should filter results itself, but it doesn't173// have access to the enabled tools for the request. For now, filter tool_reference blocks174// here against the actual tools sent to Anthropic to avoid 400 errors from unknown tool names.175const validToolNames = finalTools.length > 0 ? new Set(finalTools.map(t => t.name)) : undefined;176const messagesResult = rawMessagesToMessagesAPI(options.messages, toolSearchEnabled ? validToolNames : undefined);177178// "Last two messages" cache breakpoint strategy: place cache_control on the last179// two merged messages. This is gated behind an experiment and replaces the180// heuristic-based addCacheBreakpoints (which runs upstream in the prompt builder).181// Run before addToolsAndSystemCacheControl: shifting markers placed first,182// static markers fill the remainder. When the experiment is on we count slots183// once and thread the budget through both functions to avoid a redundant walk.184const useLastTwoMessages = configurationService.getExperimentBasedConfig(ConfigKey.AnthropicCacheBreakpointsLastTwoMessages, experimentationService);185let precomputedSlots: number | undefined;186if (useLastTwoMessages) {187precomputedSlots = maxCacheBreakpoints - countExistingMessageAndSystemCacheControl(messagesResult);188if (precomputedSlots > 0) {189precomputedSlots -= addLastTwoMessagesCacheControl(messagesResult, precomputedSlots);190}191}192193// Add cache_control to the last tool and last system block so the stable tools+system194// prefix is cached across turns. Per the Anthropic docs, cache prefixes are created in195// order: tools → system → messages, and a max of 4 cache_control blocks is allowed.196addToolsAndSystemCacheControl(finalTools, messagesResult, precomputedSlots);197198// Guard: The Anthropic Messages API requires the conversation to end with a user message.199// A trailing assistant message is treated as a prefill request, which is not supported200// and will return a 400 error. This catches upstream edge cases where isContinuation201// skips the UserMessage or validateToolMessages drops trailing tool messages.202const lastMessage = messagesResult.messages.at(-1);203if (lastMessage && lastMessage.role === 'assistant') {204logService.warn(`[messagesAPI] Trailing assistant message detected — appending synthetic user message to prevent prefill error. Total messages: ${messagesResult.messages.length}`);205206/* __GDPR__207"messagesApi.trailingAssistantGuard" : {208"owner": "bhavyaus",209"comment": "Tracks when a trailing assistant message is detected and a synthetic user message is appended to prevent prefill errors",210"model": { "classification": "SystemMetaData", "purpose": "FeatureInsight", "comment": "The model being used" },211"location": { "classification": "SystemMetaData", "purpose": "FeatureInsight", "comment": "The chat location (agent, panel, etc)" },212"messageCount": { "classification": "SystemMetaData", "purpose": "FeatureInsight", "isMeasurement": true, "comment": "Total number of messages in the conversation" }213}214*/215telemetryService.sendMSFTTelemetryEvent('messagesApi.trailingAssistantGuard',216{ model, location: ChatLocation.toString(options.location) },217{ messageCount: messagesResult.messages.length }218);219220messagesResult.messages.push({221role: 'user',222content: [{ type: 'text', text: 'Please continue.' }],223});224}225226return {227model,228...messagesResult,229stream: true,230tools: finalTools.length > 0 ? finalTools : undefined,231max_tokens: options.postOptions.max_tokens,232thinking: thinkingConfig,233...(effort ? { output_config: { effort } } : {}),234...(contextManagement ? { context_management: contextManagement } : {}),235};236}237238export function rawMessagesToMessagesAPI(messages: readonly Raw.ChatMessage[], validToolNames?: Set<string>): { messages: MessageParam[]; system?: TextBlockParam[] } {239const unmergedMessages: MessageParam[] = [];240const systemBlocks: TextBlockParam[] = [];241const toolCallIdToName = new Map<string, string>();242243for (const message of messages) {244switch (message.role) {245case Raw.ChatRole.System: {246systemBlocks.push(...rawContentToAnthropicContent(message.content).filter((c): c is TextBlockParam => c.type === 'text'));247break;248}249case Raw.ChatRole.User: {250const content = rawContentToAnthropicContent(message.content);251if (content.length > 0) {252unmergedMessages.push({253role: 'user',254content,255});256}257break;258}259case Raw.ChatRole.Assistant: {260const content = rawContentToAnthropicContent(message.content);261if (message.toolCalls) {262for (const toolCall of message.toolCalls) {263let parsedInput: Record<string, unknown> = {};264try {265parsedInput = JSON.parse(toolCall.function.arguments);266} catch {267// Keep empty object if parse fails268}269content.push({270type: 'tool_use',271id: toolCall.id,272name: toolCall.function.name,273input: parsedInput,274});275toolCallIdToName.set(toolCall.id, toolCall.function.name);276}277}278279if (content.length > 0) {280unmergedMessages.push({281role: 'assistant',282content,283});284}285break;286}287case Raw.ChatRole.Tool: {288if (message.toolCallId) {289const toolContent = rawContentToAnthropicContent(message.content);290// Extract cache_control from content blocks - it belongs on the tool_result block, not inner content291let hasCacheControl = false;292for (const block of toolContent) {293if (contentBlockSupportsCacheControl(block) && block.cache_control) {294hasCacheControl = true;295delete block.cache_control;296}297}298299// If this is a custom tool search result and validToolNames is provided,300// attempt to convert the text content into tool_reference blocks per the301// Anthropic custom tool search spec. When validToolNames is undefined302// (i.e. tool_reference conversion is disabled/unsupported for this303// request), fall through to the regular text/image filter to avoid304// sending unsupported content types.305const isCustomToolSearch = validToolNames && toolCallIdToName.get(message.toolCallId) === CUSTOM_TOOL_SEARCH_NAME;306const toolReferenceContent = isCustomToolSearch307? tryParseToolReferences(toolContent, validToolNames)308: undefined;309310const validContent = toolReferenceContent311?? toolContent.filter((c): c is TextBlockParam | ImageBlockParam | DocumentBlockParam =>312(c.type === 'text' || c.type === 'image' || c.type === 'document') && !(c.type === 'text' && c.text.trim() === '')313);314315const toolResultBlock: ToolResultBlockParam = {316type: 'tool_result',317tool_use_id: message.toolCallId,318content: validContent.length > 0 ? validContent : undefined,319};320if (hasCacheControl) {321toolResultBlock.cache_control = { type: 'ephemeral' };322}323unmergedMessages.push({324role: 'user',325content: [toolResultBlock],326});327}328break;329}330}331}332333const mergedMessages: MessageParam[] = [];334for (const message of unmergedMessages) {335const lastMessage = mergedMessages[mergedMessages.length - 1];336if (lastMessage && lastMessage.role === message.role) {337const prevContent = Array.isArray(lastMessage.content) ? lastMessage.content : [{ type: 'text' as const, text: lastMessage.content }];338const newContent = Array.isArray(message.content) ? message.content : [{ type: 'text' as const, text: message.content }];339lastMessage.content = [...prevContent, ...newContent];340} else {341mergedMessages.push(message);342}343}344345return {346messages: mergedMessages,347...(systemBlocks.length ? { system: systemBlocks } : {}),348};349}350351/**352* Parses tool result content from the custom tool search tool into353* tool_reference content blocks that the Anthropic API understands.354* Expects a single text block containing a JSON array of tool name strings.355* @see https://platform.claude.com/docs/en/agents-and-tools/tool-use/tool-search-tool#custom-tool-search-implementation356*/357function tryParseToolReferences(content: ContentBlockParam[], validToolNames?: Set<string>): ToolReferenceBlockParam[] | undefined {358if (content.length !== 1 || content[0].type !== 'text') {359return undefined;360}361362let parsed: unknown;363try {364parsed = JSON.parse(content[0].text);365} catch {366return undefined;367}368369if (!Array.isArray(parsed)) {370return undefined;371}372373return parsed374.filter((name): name is string => typeof name === 'string' && (!validToolNames || validToolNames.has(name)))375.map((name): ToolReferenceBlockParam => ({ type: 'tool_reference', tool_name: name }));376}377378function rawContentToAnthropicContent(content: readonly Raw.ChatCompletionContentPart[]): ContentBlockParam[] {379const convertedContent: ContentBlockParam[] = [];380// Track pending cache_control that couldn't be attached to a preceding block381let pendingCacheControl = false;382383for (const part of content) {384switch (part.type) {385case Raw.ChatCompletionContentPartKind.Text:386if (part.text.trim()) {387convertedContent.push({ type: 'text', text: part.text });388}389break;390case Raw.ChatCompletionContentPartKind.Image: {391const url = part.imageUrl.url;392// Parse data URL: data:image/png;base64,<data>393const match = url.match(/^data:(image\/(?:jpeg|png|gif|webp));base64,(.+)$/);394if (match) {395convertedContent.push({396type: 'image',397source: {398type: 'base64',399media_type: match[1] as 'image/jpeg' | 'image/png' | 'image/gif' | 'image/webp',400data: match[2],401}402});403} else if (url.startsWith('https://')) {404// URL image source: https://platform.claude.com/docs/en/api/messages#url_image_source405convertedContent.push({406type: 'image',407source: {408type: 'url',409url,410}411});412}413break;414}415case Raw.ChatCompletionContentPartKind.CacheBreakpoint: {416const previousBlock = convertedContent.at(-1);417if (previousBlock && contentBlockSupportsCacheControl(previousBlock)) {418previousBlock.cache_control = { type: 'ephemeral' };419} else {420// No preceding block to attach to — defer until the next421// cacheable content block is added, or silently drop it.422// Previously this created a whitespace-only text block which423// the Anthropic API rejects with "text content block must424// contain non-whitespace text".425pendingCacheControl = true;426}427break;428}429case Raw.ChatCompletionContentPartKind.Document: {430if (part.documentData.mediaType === 'application/pdf') {431convertedContent.push({432type: 'document',433source: {434type: 'base64',435media_type: 'application/pdf',436data: part.documentData.data,437}438} satisfies DocumentBlockParam);439}440break;441}442case Raw.ChatCompletionContentPartKind.Opaque: {443if (part.value && typeof part.value === 'object' && 'type' in part.value) {444const opaqueValue = part.value as { type: string; thinking?: { id: string; text?: string | string[]; encrypted?: string } };445if (opaqueValue.type === 'thinking' && opaqueValue.thinking) {446const thinkingText = Array.isArray(opaqueValue.thinking.text)447? opaqueValue.thinking.text.join('')448: opaqueValue.thinking.text;449if (thinkingText && opaqueValue.thinking.encrypted) {450// Regular thinking block: text is present, encrypted field contains the signature451convertedContent.push({452type: 'thinking',453thinking: thinkingText,454signature: opaqueValue.thinking.encrypted,455});456} else if (opaqueValue.thinking.encrypted && !thinkingText) {457// Redacted thinking block: no text, only encrypted data from Claude458convertedContent.push({459type: 'redacted_thinking',460data: opaqueValue.thinking.encrypted,461});462}463}464}465break;466}467}468469// Attach any pending cache_control to the block we just added470if (pendingCacheControl && convertedContent.length > 0) {471const lastBlock = convertedContent.at(-1)!;472if (contentBlockSupportsCacheControl(lastBlock)) {473lastBlock.cache_control = { type: 'ephemeral' };474pendingCacheControl = false;475}476}477}478479return convertedContent;480}481482function contentBlockSupportsCacheControl(block: ContentBlockParam): block is Exclude<ContentBlockParam, ThinkingBlockParam | RedactedThinkingBlockParam> {483return block.type !== 'thinking' && block.type !== 'redacted_thinking';484}485486const maxCacheBreakpoints = 4;487488/**489* Counts existing cache_control markers across system blocks and messages.490* Does not count tool-level cache_control — tools are managed separately by491* addToolsAndSystemCacheControl.492*/493function countExistingMessageAndSystemCacheControl(messagesResult: { messages: MessageParam[]; system?: TextBlockParam[] }): number {494let count = 0;495if (messagesResult.system) {496for (const block of messagesResult.system) {497if (block.cache_control) {498count++;499}500}501}502for (const msg of messagesResult.messages) {503if (Array.isArray(msg.content)) {504for (const block of msg.content) {505if (typeof block === 'object' && 'cache_control' in block && block.cache_control) {506count++;507}508}509}510}511return count;512}513514/**515* Optionally adds cache_control to the tools and system prefix when there are spare516* slots available (i.e. existing breakpoints < max). The last non-deferred tool is517* marked first if possible, and the last system block is marked only while slots remain.518* Message-level cache breakpoints are never evicted because they already implicitly519* cache the tools+system prefix (Anthropic cache hierarchy: tools → system → messages)520* and cover more content.521*/522export function addToolsAndSystemCacheControl(523tools: AnthropicMessagesTool[],524messagesResult: { messages: MessageParam[]; system?: TextBlockParam[] },525slotsAvailable?: number,526): void {527if (slotsAvailable === undefined) {528slotsAvailable = maxCacheBreakpoints - countExistingMessageAndSystemCacheControl(messagesResult);529}530if (slotsAvailable <= 0) {531return;532}533534// Find the last non-deferred tool — deferred tools cannot have cache_control.535let lastCacheableTool: AnthropicMessagesTool | undefined;536for (let i = tools.length - 1; i >= 0; i--) {537if (!tools[i].defer_loading) {538lastCacheableTool = tools[i];539break;540}541}542543if (lastCacheableTool && slotsAvailable > 0) {544lastCacheableTool.cache_control = { type: 'ephemeral' };545slotsAvailable--;546}547548// Add cache_control to the last system block (caches the stable system prompt)549const lastSystemBlock = messagesResult.system?.at(-1);550if (lastSystemBlock && !lastSystemBlock.cache_control && slotsAvailable > 0) {551lastSystemBlock.cache_control = { type: 'ephemeral' };552}553}554555/**556* Adds cache_control to the last two distinct messages in the conversation.557* This implements a simpler "shifting breakpoint" strategy: the last two messages558* always carry cache breakpoints, which naturally advances as the conversation grows.559* Combined with addToolsAndSystemCacheControl (which handles tools + system),560* this gives 4 breakpoints: 2 static (tools/system) + 2 shifting (last two messages).561*562* If a trailing message already carries a cache_control marker, it counts toward the563* "two distinct messages" target and no additional marker is added — protecting the564* intent against any upstream code that may have placed markers before this runs.565*566* Returns the number of new cache_control markers added (0–2).567*/568export function addLastTwoMessagesCacheControl(569messagesResult: { messages: MessageParam[]; system?: TextBlockParam[] },570slotsAvailable?: number,571): number {572if (slotsAvailable === undefined) {573slotsAvailable = maxCacheBreakpoints - countExistingMessageAndSystemCacheControl(messagesResult);574}575if (slotsAvailable <= 0) {576return 0;577}578579// Walk messages in reverse, marking the last cacheable content block of the580// last two distinct messages. A message that already has a cache_control581// marker counts toward the target without a new marker being added.582const messages = messagesResult.messages;583let markedCount = 0;584let added = 0;585for (let i = messages.length - 1; i >= 0 && slotsAvailable > 0 && markedCount < 2; i--) {586const msg = messages[i];587if (!Array.isArray(msg.content) || msg.content.length === 0) {588continue;589}590591const alreadyMarked = msg.content.some(b =>592typeof b === 'object' && 'cache_control' in b && b.cache_control593);594if (alreadyMarked) {595markedCount++;596continue;597}598599// Find the last block in this message that supports cache_control600for (let j = msg.content.length - 1; j >= 0; j--) {601const block = msg.content[j];602if (typeof block === 'object' && contentBlockSupportsCacheControl(block)) {603block.cache_control = { type: 'ephemeral' };604slotsAvailable--;605markedCount++;606added++;607break;608}609}610}611return added;612}613614export async function processResponseFromMessagesEndpoint(615instantiationService: IInstantiationService,616telemetryService: ITelemetryService,617logService: ILogService,618response: Response,619finishCallback: FinishedCallback,620telemetryData: TelemetryData621): Promise<AsyncIterableObject<ChatCompletion>> {622return new AsyncIterableObject<ChatCompletion>(async feed => {623const requestId = response.headers.get('X-Request-ID') ?? generateUuid();624const ghRequestId = response.headers.get('x-github-request-id') ?? '';625const { serverExperiments } = getRequestId(response.headers);626const processor = instantiationService.createInstance(AnthropicMessagesProcessor, telemetryData, requestId, ghRequestId, serverExperiments);627const parser = new SSEParser((ev) => {628try {629logService.trace(`[messagesAPI]SSE: ${ev.data}`);630const trimmed = ev.data?.trim();631if (!trimmed || trimmed === '[DONE]') {632return;633}634635const parsed = JSON.parse(trimmed) as Partial<AnthropicStreamEvent>;636const type = parsed.type ?? ev.type;637if (!type) {638return;639}640const completion = processor.push({ ...parsed, type } as AnthropicStreamEvent, finishCallback);641if (completion) {642logService.info(`[messagesAPI] message ${completion.choiceIndex} returned. finish reason: [${completion.finishReason}]`);643644const dataToSendToTelemetry = telemetryData.extendedBy({645completionChoiceFinishReason: completion.finishReason,646headerRequestId: completion.requestId.headerRequestId647});648telemetryService.sendGHTelemetryEvent('completion.finishReason', dataToSendToTelemetry.properties, dataToSendToTelemetry.measurements);649650const telemetryMessage = rawMessageToCAPI(completion.message);651let telemetryDataWithUsage = telemetryData;652if (completion.usage) {653telemetryDataWithUsage = telemetryData.extendedBy({}, {654promptTokens: completion.usage.prompt_tokens,655completionTokens: completion.usage.completion_tokens,656totalTokens: completion.usage.total_tokens,657...(completion.usage.prompt_tokens_details && { cachedTokens: completion.usage.prompt_tokens_details.cached_tokens }),658...(completion.usage.completion_tokens_details && {659reasoningTokens: completion.usage.completion_tokens_details.reasoning_tokens,660acceptedPredictionTokens: completion.usage.completion_tokens_details.accepted_prediction_tokens,661rejectedPredictionTokens: completion.usage.completion_tokens_details.rejected_prediction_tokens,662}),663});664}665sendEngineMessagesTelemetry(telemetryService, [telemetryMessage], telemetryDataWithUsage, true, logService);666667feed.emitOne(completion);668}669} catch (e) {670feed.reject(e);671}672});673674for await (const chunk of response.body) {675parser.feed(chunk);676}677}, async () => {678await response.body.destroy();679});680}681682export class AnthropicMessagesProcessor {683private textAccumulator: string = '';684private toolCallAccumulator: Map<number, { id: string; name: string; arguments: string }> = new Map();685private thinkingAccumulator: Map<number, { thinking: string; signature: string }> = new Map();686private completedToolCalls: Array<{ id: string; name: string; arguments: string }> = [];687private messageId: string = '';688private model: string = '';689private inputTokens: number = 0;690private outputTokens: number = 0;691private cacheCreationTokens: number = 0;692private cacheReadTokens: number = 0;693private contextManagementResponse?: ContextManagementResponse;694private stopReason: string | undefined;695private stopDetails?: { category?: string; explanation?: string; type?: string };696697constructor(698private readonly telemetryData: TelemetryData,699private readonly requestId: string,700private readonly ghRequestId: string,701private readonly serverExperiments: string,702@ILogService private readonly logService: ILogService,703@ITelemetryService private readonly telemetryService: ITelemetryService,704) { }705706/**707* Extract IP code citations from copilot_annotations and convert to IIPCodeCitation format708*/709private extractIPCodeCitations(annotations?: { IPCodeCitations?: AnthropicIPCodeCitation[] }): IIPCodeCitation[] {710if (!annotations?.IPCodeCitations?.length) {711return [];712}713714// Deduplicate by URL since the same citation can appear multiple times715const seenUrls = new Set<string>();716const citations: IIPCodeCitation[] = [];717718for (const citation of annotations.IPCodeCitations) {719const citationDetails = citation.citations;720if (!citationDetails) {721continue;722}723724const { url, license, snippet } = citationDetails;725726if (typeof url !== 'string' || url.trim() === '') {727continue;728}729730if (typeof license !== 'string' || license.trim() === '') {731continue;732}733734if (typeof snippet !== 'string' || snippet.trim() === '') {735continue;736}737738if (!seenUrls.has(url)) {739seenUrls.add(url);740citations.push({741citations: {742url,743license,744snippet,745}746});747}748}749750if (citations.length > 0) {751this.logService.trace(`[messagesAPI] IP code citations found: ${citations.length} unique citations`);752}753754return citations;755}756757public push(chunk: AnthropicStreamEvent, _onProgress: FinishedCallback): ChatCompletion | undefined {758const onProgress = (delta: IResponseDelta): undefined => {759this.textAccumulator += delta.text;760_onProgress(this.textAccumulator, 0, delta);761};762763switch (chunk.type) {764case 'message_start':765if (chunk.message) {766this.messageId = chunk.message.id;767this.model = chunk.message.model;768this.inputTokens = chunk.message.usage.input_tokens ?? 0;769this.outputTokens = chunk.message.usage.output_tokens ?? 0;770this.cacheCreationTokens = chunk.message.usage.cache_creation_input_tokens ?? 0;771this.cacheReadTokens = chunk.message.usage.cache_read_input_tokens ?? 0;772}773return;774case 'content_block_start':775if (chunk.content_block?.type === 'tool_use' && chunk.index !== undefined) {776const toolCallId = chunk.content_block.id || generateUuid();777this.toolCallAccumulator.set(chunk.index, {778id: toolCallId,779name: chunk.content_block.name || '',780arguments: '',781});782if (this.textAccumulator.length) {783onProgress({ text: ' ' });784}785onProgress({786text: '',787beginToolCalls: [{ name: chunk.content_block.name || '', id: toolCallId }]788});789} else if (chunk.content_block?.type === 'thinking' && chunk.index !== undefined) {790this.thinkingAccumulator.set(chunk.index, {791thinking: '',792signature: '',793});794} else if (chunk.content_block?.type === 'redacted_thinking' && chunk.index !== undefined) {795const data = (chunk.content_block as { type: 'redacted_thinking'; data: string }).data;796onProgress({797text: '',798thinking: {799id: `thinking_${chunk.index}`,800encrypted: data,801}802});803}804return;805case 'content_block_delta':806if (chunk.delta) {807if (chunk.delta.type === 'text_delta' && chunk.delta.text) {808const ipCitations = this.extractIPCodeCitations(chunk.copilot_annotations);809if (ipCitations.length > 0) {810return onProgress({ text: chunk.delta.text, ipCitations });811}812return onProgress({ text: chunk.delta.text });813} else if (chunk.delta.type === 'thinking_delta' && chunk.delta.thinking && chunk.index !== undefined) {814const thinking = this.thinkingAccumulator.get(chunk.index);815if (thinking) {816thinking.thinking += chunk.delta.thinking;817}818return onProgress({819text: '',820thinking: {821id: `thinking_${chunk.index}`,822text: chunk.delta.thinking,823}824});825} else if (chunk.delta.type === 'signature_delta' && chunk.delta.signature && chunk.index !== undefined) {826const thinking = this.thinkingAccumulator.get(chunk.index);827if (thinking) {828thinking.signature += chunk.delta.signature;829}830// Don't report signature deltas to the user831} else if (chunk.delta.type === 'input_json_delta' && chunk.delta.partial_json && chunk.index !== undefined) {832const toolCall = this.toolCallAccumulator.get(chunk.index);833if (toolCall) {834toolCall.arguments += chunk.delta.partial_json;835onProgress({836text: '',837copilotToolCallStreamUpdates: [{838id: toolCall.id,839name: toolCall.name,840arguments: toolCall.arguments,841}],842});843}844}845}846return;847case 'content_block_stop':848if (chunk.index !== undefined) {849const toolCall = this.toolCallAccumulator.get(chunk.index);850if (toolCall) {851this.completedToolCalls.push(toolCall);852onProgress({853text: '',854copilotToolCalls: [{855id: toolCall.id,856name: toolCall.name,857arguments: toolCall.arguments,858}],859});860this.toolCallAccumulator.delete(chunk.index);861}862const thinking = this.thinkingAccumulator.get(chunk.index);863if (thinking && thinking.signature) {864onProgress({865text: '',866thinking: {867id: `thinking_${chunk.index}`,868encrypted: thinking.signature,869}870});871this.thinkingAccumulator.delete(chunk.index);872}873}874return;875case 'message_delta':876if (chunk.usage) {877// message_delta provides the most accurate token counts878this.outputTokens = chunk.usage.output_tokens;879this.inputTokens = chunk.usage.input_tokens ?? this.inputTokens;880this.cacheCreationTokens = chunk.usage.cache_creation_input_tokens ?? this.cacheCreationTokens;881this.cacheReadTokens = chunk.usage.cache_read_input_tokens ?? this.cacheReadTokens;882}883if (chunk.context_management) {884this.contextManagementResponse = chunk.context_management;885// Report context management via delta so it gets logged to request logger886return onProgress({887text: '',888contextManagement: chunk.context_management889});890}891// Track stop_reason and stop_details for determining finish reason in message_stop892if (chunk.delta?.stop_reason) {893this.stopReason = chunk.delta.stop_reason;894}895if (chunk.delta?.stop_details) {896this.stopDetails = chunk.delta.stop_details;897}898return;899case 'message_stop': {900if (this.contextManagementResponse) {901const totalClearedTokens = this.contextManagementResponse.applied_edits.reduce(902(sum, edit) => sum + (edit.cleared_input_tokens || 0),9030904);905const totalClearedToolUses = this.contextManagementResponse.applied_edits.reduce(906(sum, edit) => sum + (edit.cleared_tool_uses || 0),9070908);909const totalClearedThinkingTurns = this.contextManagementResponse.applied_edits.reduce(910(sum, edit) => sum + (edit.cleared_thinking_turns || 0),9110912);913this.logService.trace(`[messagesAPI] Anthropic context editing applied: cleared ${totalClearedTokens} tokens, ${totalClearedToolUses} tool uses.`);914915/* __GDPR__916"contextEditingApplied" : {917"owner": "bhavyaus",918"comment": "Tracks when Anthropic context editing is applied to manage context window",919"requestId": { "classification": "SystemMetaData", "purpose": "FeatureInsight", "comment": "The request ID for correlation" },920"interactionId": { "classification": "SystemMetaData", "purpose": "FeatureInsight", "comment": "The interaction ID for correlation" },921"model": { "classification": "SystemMetaData", "purpose": "FeatureInsight", "comment": "The model used" },922"clearedTokens": { "classification": "SystemMetaData", "purpose": "FeatureInsight", "isMeasurement": true, "comment": "Total tokens cleared" },923"clearedToolUses": { "classification": "SystemMetaData", "purpose": "FeatureInsight", "isMeasurement": true, "comment": "Total tool uses cleared" },924"clearedThinkingTurns": { "classification": "SystemMetaData", "purpose": "FeatureInsight", "isMeasurement": true, "comment": "Total thinking turns cleared" }925}926*/927this.telemetryService.sendMSFTTelemetryEvent('contextEditingApplied',928{929requestId: this.requestId,930interactionId: this.requestId,931model: this.model,932},933{934clearedTokens: totalClearedTokens,935clearedToolUses: totalClearedToolUses,936clearedThinkingTurns: totalClearedThinkingTurns,937}938);939}940if (this.stopReason === 'refusal') {941const category = this.stopDetails?.category ?? 'unknown';942this.logService.warn(`[messagesAPI] Refusal received: category='${category}' for model ${this.model}`);943944/* __GDPR__945"messagesApi.refusal" : {946"owner": "bhavyaus",947"comment": "Tracks Anthropic refusal responses including cyber and other policy categories",948"requestId": { "classification": "SystemMetaData", "purpose": "FeatureInsight", "comment": "The request ID for correlation" },949"model": { "classification": "SystemMetaData", "purpose": "FeatureInsight", "comment": "The model that produced the refusal" },950"category": { "classification": "SystemMetaData", "purpose": "FeatureInsight", "comment": "The refusal category (e.g. cyber, content_policy)" }951}952*/953this.telemetryService.sendMSFTTelemetryEvent('messagesApi.refusal',954{955requestId: this.requestId,956model: this.model,957category,958}959);960}961962let finishReason: FinishedCompletionReason;963switch (this.stopReason) {964case 'refusal':965finishReason = FinishedCompletionReason.ClientDone;966break;967case 'max_tokens':968case 'model_context_window_exceeded':969finishReason = FinishedCompletionReason.Length;970break;971default:972finishReason = FinishedCompletionReason.Stop;973break;974}975976const computedPromptTokens = this.inputTokens + this.cacheCreationTokens + this.cacheReadTokens;977if (computedPromptTokens < this.cacheReadTokens) {978this.logService.warn(`[messagesAPI] Token count inconsistency: computed prompt_tokens (${computedPromptTokens}) < cached_tokens (${this.cacheReadTokens}). Raw values: inputTokens=${this.inputTokens}, cacheCreationTokens=${this.cacheCreationTokens}, cacheReadTokens=${this.cacheReadTokens}`);979}980981return {982blockFinished: true,983choiceIndex: 0,984model: this.model,985tokens: [],986telemetryData: this.telemetryData,987requestId: {988headerRequestId: this.requestId,989gitHubRequestId: this.ghRequestId,990completionId: this.messageId,991created: Date.now(),992deploymentId: '',993serverExperiments: this.serverExperiments,994},995usage: {996prompt_tokens: computedPromptTokens,997completion_tokens: this.outputTokens,998total_tokens: computedPromptTokens + this.outputTokens,999prompt_tokens_details: {1000cached_tokens: this.cacheReadTokens,1001cache_creation_input_tokens: this.cacheCreationTokens,1002},1003completion_tokens_details: {1004reasoning_tokens: 0,1005accepted_prediction_tokens: 0,1006rejected_prediction_tokens: 0,1007},1008},1009finishReason,1010message: {1011role: Raw.ChatRole.Assistant,1012content: this.textAccumulator ? [{1013type: Raw.ChatCompletionContentPartKind.Text,1014text: this.textAccumulator1015}] : [],1016...(this.completedToolCalls.length > 0 ? {1017toolCalls: this.completedToolCalls.map(tc => ({1018id: tc.id,1019type: 'function' as const,1020function: {1021name: tc.name,1022arguments: tc.arguments1023}1024}))1025} : {})1026}1027};1028}1029case 'error': {1030const errorMessage = (chunk as unknown as { error?: { message?: string } }).error?.message || 'Unknown error';1031return onProgress({1032text: '',1033copilotErrors: [{1034agent: 'anthropic',1035code: 'unknown',1036message: errorMessage,1037type: 'error',1038identifier: undefined1039}]1040});1041}1042}1043}1044}10451046104710481049