Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/extensions/copilot/src/platform/endpoint/node/messagesApi.ts
13401 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
import { ContentBlockParam, DocumentBlockParam, ImageBlockParam, MessageParam, RedactedThinkingBlockParam, TextBlockParam, ThinkingBlockParam, ToolReferenceBlockParam, ToolResultBlockParam } from '@anthropic-ai/sdk/resources';
7
import { Raw } from '@vscode/prompt-tsx';
8
import { Response } from '../../../platform/networking/common/fetcherService';
9
import { AsyncIterableObject } from '../../../util/vs/base/common/async';
10
import { SSEParser } from '../../../util/vs/base/common/sseParser';
11
import { generateUuid } from '../../../util/vs/base/common/uuid';
12
import { IInstantiationService, ServicesAccessor } from '../../../util/vs/platform/instantiation/common/instantiation';
13
import { ChatLocation } from '../../chat/common/commonTypes';
14
import { ConfigKey, IConfigurationService } from '../../configuration/common/configurationService';
15
import { ILogService } from '../../log/common/logService';
16
import { AnthropicMessagesTool, ContextManagementResponse, CUSTOM_TOOL_SEARCH_NAME, getContextManagementFromConfig, isAnthropicContextEditingEnabled } from '../../networking/common/anthropic';
17
import { FinishedCallback, getRequestId, IIPCodeCitation, IResponseDelta } from '../../networking/common/fetch';
18
import { IChatEndpoint, ICreateEndpointBodyOptions, IEndpointBody } from '../../networking/common/networking';
19
import { ChatCompletion, FinishedCompletionReason, rawMessageToCAPI } from '../../networking/common/openai';
20
import { IToolDeferralService } from '../../networking/common/toolDeferralService';
21
import { sendEngineMessagesTelemetry } from '../../networking/node/chatStream';
22
import { IExperimentationService } from '../../telemetry/common/nullExperimentationService';
23
import { ITelemetryService } from '../../telemetry/common/telemetry';
24
import { TelemetryData } from '../../telemetry/common/telemetryData';
25
26
/**
27
* Build the `input_schema` for an Anthropic tool from an arbitrary JSON Schema
28
* object. Ensures `type: 'object'` and `properties` default, preserves extra
29
* keys like `$defs` and `additionalProperties`, and strips `$schema` which the
30
* Anthropic API rejects.
31
*/
32
export function buildToolInputSchema(schema: Record<string, unknown> | undefined): Record<string, unknown> & { type: 'object' } {
33
if (!schema) {
34
return { type: 'object', properties: {} };
35
}
36
const { $schema: _, ...rest } = schema;
37
return { type: 'object', properties: {}, ...rest };
38
}
39
40
/** IP Code Citation annotation from Messages API copilot_annotations */
41
interface AnthropicIPCodeCitation {
42
id: number;
43
start_offset: number;
44
end_offset: number;
45
details: Record<string, unknown>;
46
citations: {
47
snippet: string;
48
url: string;
49
ip_type?: string;
50
license: string;
51
};
52
}
53
54
interface AnthropicStreamEvent {
55
type: string;
56
message?: {
57
id: string;
58
type: string;
59
role: string;
60
content: ContentBlockParam[];
61
model: string;
62
stop_reason: string | null;
63
stop_sequence: string | null;
64
usage: {
65
input_tokens: number;
66
output_tokens: number;
67
cache_creation_input_tokens?: number;
68
cache_read_input_tokens?: number;
69
};
70
};
71
index?: number;
72
content_block?: ContentBlockParam | ThinkingBlockParam | RedactedThinkingBlockParam;
73
delta?: {
74
type: string;
75
text?: string;
76
partial_json?: string;
77
thinking?: string;
78
signature?: string;
79
stop_reason?: string;
80
stop_sequence?: string;
81
stop_details?: {
82
category?: string;
83
explanation?: string;
84
type?: string;
85
};
86
};
87
copilot_annotations?: {
88
IPCodeCitations?: AnthropicIPCodeCitation[];
89
};
90
usage?: {
91
output_tokens: number;
92
input_tokens?: number;
93
cache_creation_input_tokens?: number;
94
cache_read_input_tokens?: number;
95
};
96
context_management?: ContextManagementResponse;
97
}
98
99
export function createMessagesRequestBody(accessor: ServicesAccessor, options: ICreateEndpointBodyOptions, model: string, endpoint: IChatEndpoint): IEndpointBody {
100
const configurationService = accessor.get(IConfigurationService);
101
const experimentationService = accessor.get(IExperimentationService);
102
const toolDeferralService = accessor.get(IToolDeferralService);
103
104
const toolSearchEnabled = !!endpoint.supportsToolSearch
105
&& !!options.requestOptions?.tools?.some(t => t.function.name === CUSTOM_TOOL_SEARCH_NAME);
106
107
// Split tools into non-deferred and deferred up front so we can build finalTools
108
// with non-deferred first. This ensures the cache_control breakpoint on the last
109
// non-deferred tool caches the maximum stable prefix.
110
const nonDeferredTools: AnthropicMessagesTool[] = [];
111
const deferredTools: AnthropicMessagesTool[] = [];
112
if (options.requestOptions?.tools) {
113
for (const tool of options.requestOptions.tools) {
114
if (!tool.function.name || tool.function.name.length === 0) {
115
continue;
116
}
117
const isDeferred = options.modelCapabilities?.enableToolSearch && toolSearchEnabled && !toolDeferralService.isNonDeferredTool(tool.function.name);
118
const anthropicTool: AnthropicMessagesTool = {
119
name: tool.function.name,
120
description: tool.function.description || '',
121
input_schema: buildToolInputSchema(tool.function.parameters as Record<string, unknown> | undefined),
122
...(isDeferred ? { defer_loading: true } : {}),
123
};
124
(isDeferred ? deferredTools : nonDeferredTools).push(anthropicTool);
125
}
126
}
127
128
// Build final tools array. The client-side search_tools tool is already in the
129
// anthropicTools array (registered as a model-specific VS Code tool) and will handle
130
// tool search client-side. Deferred tools still have defer_loading: true so the model
131
// knows to use the search tool to discover them.
132
const finalTools: AnthropicMessagesTool[] = [...nonDeferredTools, ...deferredTools];
133
134
// Thinking is enabled only when options.modelCapabilities?.enableThinking is true, a non-zero thinking budget
135
// is configured for the model, and the model supports thinking. reasoningEffort (if present)
136
// is used only to configure the effort level when thinking is enabled, not to gate it.
137
const reasoningEffort = options.modelCapabilities?.reasoningEffort;
138
let thinkingConfig: { type: 'enabled' | 'adaptive'; budget_tokens?: number; display?: 'summarized' } | undefined;
139
if (options.modelCapabilities?.enableThinking) {
140
const hardcodedBudget = 16000;
141
if (endpoint.supportsAdaptiveThinking) {
142
thinkingConfig = { type: 'adaptive', display: 'summarized' };
143
} else if (endpoint.maxThinkingBudget && endpoint.minThinkingBudget) {
144
const maxTokens = options.postOptions.max_tokens ?? 1024;
145
const minBudget = endpoint.minThinkingBudget ?? 1024;
146
const normalizedBudget = hardcodedBudget < minBudget ? minBudget : hardcodedBudget;
147
const maxBudget = endpoint.maxThinkingBudget ?? 32000;
148
const thinkingBudget = Math.min(maxBudget, maxTokens - 1, normalizedBudget);
149
if (thinkingBudget) {
150
thinkingConfig = { type: 'enabled', budget_tokens: thinkingBudget };
151
}
152
}
153
}
154
155
const thinkingEnabled = !!thinkingConfig;
156
let effort: 'low' | 'medium' | 'high' | undefined;
157
if (thinkingConfig && endpoint.supportsReasoningEffort?.length) {
158
const candidateEffort = configurationService.getConfig(ConfigKey.Advanced.ReasoningEffortOverride)
159
?? reasoningEffort
160
?? (endpoint.supportsReasoningEffort.length === 1 ? endpoint.supportsReasoningEffort[0] : 'medium');
161
if (candidateEffort === 'low' || candidateEffort === 'medium' || candidateEffort === 'high') {
162
effort = candidateEffort;
163
}
164
}
165
166
// Build context management configuration
167
const contextManagement = options.modelCapabilities?.enableContextEditing && isAnthropicContextEditingEnabled(endpoint, configurationService, experimentationService)
168
? getContextManagementFromConfig(configurationService, experimentationService, thinkingEnabled)
169
: undefined;
170
171
const logService = accessor.get(ILogService);
172
const telemetryService = accessor.get(ITelemetryService);
173
// TODO: Ideally the custom tool_search tool should filter results itself, but it doesn't
174
// have access to the enabled tools for the request. For now, filter tool_reference blocks
175
// here against the actual tools sent to Anthropic to avoid 400 errors from unknown tool names.
176
const validToolNames = finalTools.length > 0 ? new Set(finalTools.map(t => t.name)) : undefined;
177
const messagesResult = rawMessagesToMessagesAPI(options.messages, toolSearchEnabled ? validToolNames : undefined);
178
179
// "Last two messages" cache breakpoint strategy: place cache_control on the last
180
// two merged messages. This is gated behind an experiment and replaces the
181
// heuristic-based addCacheBreakpoints (which runs upstream in the prompt builder).
182
// Run before addToolsAndSystemCacheControl: shifting markers placed first,
183
// static markers fill the remainder. When the experiment is on we count slots
184
// once and thread the budget through both functions to avoid a redundant walk.
185
const useLastTwoMessages = configurationService.getExperimentBasedConfig(ConfigKey.AnthropicCacheBreakpointsLastTwoMessages, experimentationService);
186
let precomputedSlots: number | undefined;
187
if (useLastTwoMessages) {
188
precomputedSlots = maxCacheBreakpoints - countExistingMessageAndSystemCacheControl(messagesResult);
189
if (precomputedSlots > 0) {
190
precomputedSlots -= addLastTwoMessagesCacheControl(messagesResult, precomputedSlots);
191
}
192
}
193
194
// Add cache_control to the last tool and last system block so the stable tools+system
195
// prefix is cached across turns. Per the Anthropic docs, cache prefixes are created in
196
// order: tools → system → messages, and a max of 4 cache_control blocks is allowed.
197
addToolsAndSystemCacheControl(finalTools, messagesResult, precomputedSlots);
198
199
// Guard: The Anthropic Messages API requires the conversation to end with a user message.
200
// A trailing assistant message is treated as a prefill request, which is not supported
201
// and will return a 400 error. This catches upstream edge cases where isContinuation
202
// skips the UserMessage or validateToolMessages drops trailing tool messages.
203
const lastMessage = messagesResult.messages.at(-1);
204
if (lastMessage && lastMessage.role === 'assistant') {
205
logService.warn(`[messagesAPI] Trailing assistant message detected — appending synthetic user message to prevent prefill error. Total messages: ${messagesResult.messages.length}`);
206
207
/* __GDPR__
208
"messagesApi.trailingAssistantGuard" : {
209
"owner": "bhavyaus",
210
"comment": "Tracks when a trailing assistant message is detected and a synthetic user message is appended to prevent prefill errors",
211
"model": { "classification": "SystemMetaData", "purpose": "FeatureInsight", "comment": "The model being used" },
212
"location": { "classification": "SystemMetaData", "purpose": "FeatureInsight", "comment": "The chat location (agent, panel, etc)" },
213
"messageCount": { "classification": "SystemMetaData", "purpose": "FeatureInsight", "isMeasurement": true, "comment": "Total number of messages in the conversation" }
214
}
215
*/
216
telemetryService.sendMSFTTelemetryEvent('messagesApi.trailingAssistantGuard',
217
{ model, location: ChatLocation.toString(options.location) },
218
{ messageCount: messagesResult.messages.length }
219
);
220
221
messagesResult.messages.push({
222
role: 'user',
223
content: [{ type: 'text', text: 'Please continue.' }],
224
});
225
}
226
227
return {
228
model,
229
...messagesResult,
230
stream: true,
231
tools: finalTools.length > 0 ? finalTools : undefined,
232
max_tokens: options.postOptions.max_tokens,
233
thinking: thinkingConfig,
234
...(effort ? { output_config: { effort } } : {}),
235
...(contextManagement ? { context_management: contextManagement } : {}),
236
};
237
}
238
239
export function rawMessagesToMessagesAPI(messages: readonly Raw.ChatMessage[], validToolNames?: Set<string>): { messages: MessageParam[]; system?: TextBlockParam[] } {
240
const unmergedMessages: MessageParam[] = [];
241
const systemBlocks: TextBlockParam[] = [];
242
const toolCallIdToName = new Map<string, string>();
243
244
for (const message of messages) {
245
switch (message.role) {
246
case Raw.ChatRole.System: {
247
systemBlocks.push(...rawContentToAnthropicContent(message.content).filter((c): c is TextBlockParam => c.type === 'text'));
248
break;
249
}
250
case Raw.ChatRole.User: {
251
const content = rawContentToAnthropicContent(message.content);
252
if (content.length > 0) {
253
unmergedMessages.push({
254
role: 'user',
255
content,
256
});
257
}
258
break;
259
}
260
case Raw.ChatRole.Assistant: {
261
const content = rawContentToAnthropicContent(message.content);
262
if (message.toolCalls) {
263
for (const toolCall of message.toolCalls) {
264
let parsedInput: Record<string, unknown> = {};
265
try {
266
parsedInput = JSON.parse(toolCall.function.arguments);
267
} catch {
268
// Keep empty object if parse fails
269
}
270
content.push({
271
type: 'tool_use',
272
id: toolCall.id,
273
name: toolCall.function.name,
274
input: parsedInput,
275
});
276
toolCallIdToName.set(toolCall.id, toolCall.function.name);
277
}
278
}
279
280
if (content.length > 0) {
281
unmergedMessages.push({
282
role: 'assistant',
283
content,
284
});
285
}
286
break;
287
}
288
case Raw.ChatRole.Tool: {
289
if (message.toolCallId) {
290
const toolContent = rawContentToAnthropicContent(message.content);
291
// Extract cache_control from content blocks - it belongs on the tool_result block, not inner content
292
let hasCacheControl = false;
293
for (const block of toolContent) {
294
if (contentBlockSupportsCacheControl(block) && block.cache_control) {
295
hasCacheControl = true;
296
delete block.cache_control;
297
}
298
}
299
300
// If this is a custom tool search result and validToolNames is provided,
301
// attempt to convert the text content into tool_reference blocks per the
302
// Anthropic custom tool search spec. When validToolNames is undefined
303
// (i.e. tool_reference conversion is disabled/unsupported for this
304
// request), fall through to the regular text/image filter to avoid
305
// sending unsupported content types.
306
const isCustomToolSearch = validToolNames && toolCallIdToName.get(message.toolCallId) === CUSTOM_TOOL_SEARCH_NAME;
307
const toolReferenceContent = isCustomToolSearch
308
? tryParseToolReferences(toolContent, validToolNames)
309
: undefined;
310
311
const validContent = toolReferenceContent
312
?? toolContent.filter((c): c is TextBlockParam | ImageBlockParam | DocumentBlockParam =>
313
(c.type === 'text' || c.type === 'image' || c.type === 'document') && !(c.type === 'text' && c.text.trim() === '')
314
);
315
316
const toolResultBlock: ToolResultBlockParam = {
317
type: 'tool_result',
318
tool_use_id: message.toolCallId,
319
content: validContent.length > 0 ? validContent : undefined,
320
};
321
if (hasCacheControl) {
322
toolResultBlock.cache_control = { type: 'ephemeral' };
323
}
324
unmergedMessages.push({
325
role: 'user',
326
content: [toolResultBlock],
327
});
328
}
329
break;
330
}
331
}
332
}
333
334
const mergedMessages: MessageParam[] = [];
335
for (const message of unmergedMessages) {
336
const lastMessage = mergedMessages[mergedMessages.length - 1];
337
if (lastMessage && lastMessage.role === message.role) {
338
const prevContent = Array.isArray(lastMessage.content) ? lastMessage.content : [{ type: 'text' as const, text: lastMessage.content }];
339
const newContent = Array.isArray(message.content) ? message.content : [{ type: 'text' as const, text: message.content }];
340
lastMessage.content = [...prevContent, ...newContent];
341
} else {
342
mergedMessages.push(message);
343
}
344
}
345
346
return {
347
messages: mergedMessages,
348
...(systemBlocks.length ? { system: systemBlocks } : {}),
349
};
350
}
351
352
/**
353
* Parses tool result content from the custom tool search tool into
354
* tool_reference content blocks that the Anthropic API understands.
355
* Expects a single text block containing a JSON array of tool name strings.
356
* @see https://platform.claude.com/docs/en/agents-and-tools/tool-use/tool-search-tool#custom-tool-search-implementation
357
*/
358
function tryParseToolReferences(content: ContentBlockParam[], validToolNames?: Set<string>): ToolReferenceBlockParam[] | undefined {
359
if (content.length !== 1 || content[0].type !== 'text') {
360
return undefined;
361
}
362
363
let parsed: unknown;
364
try {
365
parsed = JSON.parse(content[0].text);
366
} catch {
367
return undefined;
368
}
369
370
if (!Array.isArray(parsed)) {
371
return undefined;
372
}
373
374
return parsed
375
.filter((name): name is string => typeof name === 'string' && (!validToolNames || validToolNames.has(name)))
376
.map((name): ToolReferenceBlockParam => ({ type: 'tool_reference', tool_name: name }));
377
}
378
379
function rawContentToAnthropicContent(content: readonly Raw.ChatCompletionContentPart[]): ContentBlockParam[] {
380
const convertedContent: ContentBlockParam[] = [];
381
// Track pending cache_control that couldn't be attached to a preceding block
382
let pendingCacheControl = false;
383
384
for (const part of content) {
385
switch (part.type) {
386
case Raw.ChatCompletionContentPartKind.Text:
387
if (part.text.trim()) {
388
convertedContent.push({ type: 'text', text: part.text });
389
}
390
break;
391
case Raw.ChatCompletionContentPartKind.Image: {
392
const url = part.imageUrl.url;
393
// Parse data URL: data:image/png;base64,<data>
394
const match = url.match(/^data:(image\/(?:jpeg|png|gif|webp));base64,(.+)$/);
395
if (match) {
396
convertedContent.push({
397
type: 'image',
398
source: {
399
type: 'base64',
400
media_type: match[1] as 'image/jpeg' | 'image/png' | 'image/gif' | 'image/webp',
401
data: match[2],
402
}
403
});
404
} else if (url.startsWith('https://')) {
405
// URL image source: https://platform.claude.com/docs/en/api/messages#url_image_source
406
convertedContent.push({
407
type: 'image',
408
source: {
409
type: 'url',
410
url,
411
}
412
});
413
}
414
break;
415
}
416
case Raw.ChatCompletionContentPartKind.CacheBreakpoint: {
417
const previousBlock = convertedContent.at(-1);
418
if (previousBlock && contentBlockSupportsCacheControl(previousBlock)) {
419
previousBlock.cache_control = { type: 'ephemeral' };
420
} else {
421
// No preceding block to attach to — defer until the next
422
// cacheable content block is added, or silently drop it.
423
// Previously this created a whitespace-only text block which
424
// the Anthropic API rejects with "text content block must
425
// contain non-whitespace text".
426
pendingCacheControl = true;
427
}
428
break;
429
}
430
case Raw.ChatCompletionContentPartKind.Document: {
431
if (part.documentData.mediaType === 'application/pdf') {
432
convertedContent.push({
433
type: 'document',
434
source: {
435
type: 'base64',
436
media_type: 'application/pdf',
437
data: part.documentData.data,
438
}
439
} satisfies DocumentBlockParam);
440
}
441
break;
442
}
443
case Raw.ChatCompletionContentPartKind.Opaque: {
444
if (part.value && typeof part.value === 'object' && 'type' in part.value) {
445
const opaqueValue = part.value as { type: string; thinking?: { id: string; text?: string | string[]; encrypted?: string } };
446
if (opaqueValue.type === 'thinking' && opaqueValue.thinking) {
447
const thinkingText = Array.isArray(opaqueValue.thinking.text)
448
? opaqueValue.thinking.text.join('')
449
: opaqueValue.thinking.text;
450
if (thinkingText && opaqueValue.thinking.encrypted) {
451
// Regular thinking block: text is present, encrypted field contains the signature
452
convertedContent.push({
453
type: 'thinking',
454
thinking: thinkingText,
455
signature: opaqueValue.thinking.encrypted,
456
});
457
} else if (opaqueValue.thinking.encrypted && !thinkingText) {
458
// Redacted thinking block: no text, only encrypted data from Claude
459
convertedContent.push({
460
type: 'redacted_thinking',
461
data: opaqueValue.thinking.encrypted,
462
});
463
}
464
}
465
}
466
break;
467
}
468
}
469
470
// Attach any pending cache_control to the block we just added
471
if (pendingCacheControl && convertedContent.length > 0) {
472
const lastBlock = convertedContent.at(-1)!;
473
if (contentBlockSupportsCacheControl(lastBlock)) {
474
lastBlock.cache_control = { type: 'ephemeral' };
475
pendingCacheControl = false;
476
}
477
}
478
}
479
480
return convertedContent;
481
}
482
483
function contentBlockSupportsCacheControl(block: ContentBlockParam): block is Exclude<ContentBlockParam, ThinkingBlockParam | RedactedThinkingBlockParam> {
484
return block.type !== 'thinking' && block.type !== 'redacted_thinking';
485
}
486
487
const maxCacheBreakpoints = 4;
488
489
/**
490
* Counts existing cache_control markers across system blocks and messages.
491
* Does not count tool-level cache_control — tools are managed separately by
492
* addToolsAndSystemCacheControl.
493
*/
494
function countExistingMessageAndSystemCacheControl(messagesResult: { messages: MessageParam[]; system?: TextBlockParam[] }): number {
495
let count = 0;
496
if (messagesResult.system) {
497
for (const block of messagesResult.system) {
498
if (block.cache_control) {
499
count++;
500
}
501
}
502
}
503
for (const msg of messagesResult.messages) {
504
if (Array.isArray(msg.content)) {
505
for (const block of msg.content) {
506
if (typeof block === 'object' && 'cache_control' in block && block.cache_control) {
507
count++;
508
}
509
}
510
}
511
}
512
return count;
513
}
514
515
/**
516
* Optionally adds cache_control to the tools and system prefix when there are spare
517
* slots available (i.e. existing breakpoints < max). The last non-deferred tool is
518
* marked first if possible, and the last system block is marked only while slots remain.
519
* Message-level cache breakpoints are never evicted because they already implicitly
520
* cache the tools+system prefix (Anthropic cache hierarchy: tools → system → messages)
521
* and cover more content.
522
*/
523
export function addToolsAndSystemCacheControl(
524
tools: AnthropicMessagesTool[],
525
messagesResult: { messages: MessageParam[]; system?: TextBlockParam[] },
526
slotsAvailable?: number,
527
): void {
528
if (slotsAvailable === undefined) {
529
slotsAvailable = maxCacheBreakpoints - countExistingMessageAndSystemCacheControl(messagesResult);
530
}
531
if (slotsAvailable <= 0) {
532
return;
533
}
534
535
// Find the last non-deferred tool — deferred tools cannot have cache_control.
536
let lastCacheableTool: AnthropicMessagesTool | undefined;
537
for (let i = tools.length - 1; i >= 0; i--) {
538
if (!tools[i].defer_loading) {
539
lastCacheableTool = tools[i];
540
break;
541
}
542
}
543
544
if (lastCacheableTool && slotsAvailable > 0) {
545
lastCacheableTool.cache_control = { type: 'ephemeral' };
546
slotsAvailable--;
547
}
548
549
// Add cache_control to the last system block (caches the stable system prompt)
550
const lastSystemBlock = messagesResult.system?.at(-1);
551
if (lastSystemBlock && !lastSystemBlock.cache_control && slotsAvailable > 0) {
552
lastSystemBlock.cache_control = { type: 'ephemeral' };
553
}
554
}
555
556
/**
557
* Adds cache_control to the last two distinct messages in the conversation.
558
* This implements a simpler "shifting breakpoint" strategy: the last two messages
559
* always carry cache breakpoints, which naturally advances as the conversation grows.
560
* Combined with addToolsAndSystemCacheControl (which handles tools + system),
561
* this gives 4 breakpoints: 2 static (tools/system) + 2 shifting (last two messages).
562
*
563
* If a trailing message already carries a cache_control marker, it counts toward the
564
* "two distinct messages" target and no additional marker is added — protecting the
565
* intent against any upstream code that may have placed markers before this runs.
566
*
567
* Returns the number of new cache_control markers added (0–2).
568
*/
569
export function addLastTwoMessagesCacheControl(
570
messagesResult: { messages: MessageParam[]; system?: TextBlockParam[] },
571
slotsAvailable?: number,
572
): number {
573
if (slotsAvailable === undefined) {
574
slotsAvailable = maxCacheBreakpoints - countExistingMessageAndSystemCacheControl(messagesResult);
575
}
576
if (slotsAvailable <= 0) {
577
return 0;
578
}
579
580
// Walk messages in reverse, marking the last cacheable content block of the
581
// last two distinct messages. A message that already has a cache_control
582
// marker counts toward the target without a new marker being added.
583
const messages = messagesResult.messages;
584
let markedCount = 0;
585
let added = 0;
586
for (let i = messages.length - 1; i >= 0 && slotsAvailable > 0 && markedCount < 2; i--) {
587
const msg = messages[i];
588
if (!Array.isArray(msg.content) || msg.content.length === 0) {
589
continue;
590
}
591
592
const alreadyMarked = msg.content.some(b =>
593
typeof b === 'object' && 'cache_control' in b && b.cache_control
594
);
595
if (alreadyMarked) {
596
markedCount++;
597
continue;
598
}
599
600
// Find the last block in this message that supports cache_control
601
for (let j = msg.content.length - 1; j >= 0; j--) {
602
const block = msg.content[j];
603
if (typeof block === 'object' && contentBlockSupportsCacheControl(block)) {
604
block.cache_control = { type: 'ephemeral' };
605
slotsAvailable--;
606
markedCount++;
607
added++;
608
break;
609
}
610
}
611
}
612
return added;
613
}
614
615
export async function processResponseFromMessagesEndpoint(
616
instantiationService: IInstantiationService,
617
telemetryService: ITelemetryService,
618
logService: ILogService,
619
response: Response,
620
finishCallback: FinishedCallback,
621
telemetryData: TelemetryData
622
): Promise<AsyncIterableObject<ChatCompletion>> {
623
return new AsyncIterableObject<ChatCompletion>(async feed => {
624
const requestId = response.headers.get('X-Request-ID') ?? generateUuid();
625
const ghRequestId = response.headers.get('x-github-request-id') ?? '';
626
const { serverExperiments } = getRequestId(response.headers);
627
const processor = instantiationService.createInstance(AnthropicMessagesProcessor, telemetryData, requestId, ghRequestId, serverExperiments);
628
const parser = new SSEParser((ev) => {
629
try {
630
logService.trace(`[messagesAPI]SSE: ${ev.data}`);
631
const trimmed = ev.data?.trim();
632
if (!trimmed || trimmed === '[DONE]') {
633
return;
634
}
635
636
const parsed = JSON.parse(trimmed) as Partial<AnthropicStreamEvent>;
637
const type = parsed.type ?? ev.type;
638
if (!type) {
639
return;
640
}
641
const completion = processor.push({ ...parsed, type } as AnthropicStreamEvent, finishCallback);
642
if (completion) {
643
logService.info(`[messagesAPI] message ${completion.choiceIndex} returned. finish reason: [${completion.finishReason}]`);
644
645
const dataToSendToTelemetry = telemetryData.extendedBy({
646
completionChoiceFinishReason: completion.finishReason,
647
headerRequestId: completion.requestId.headerRequestId
648
});
649
telemetryService.sendGHTelemetryEvent('completion.finishReason', dataToSendToTelemetry.properties, dataToSendToTelemetry.measurements);
650
651
const telemetryMessage = rawMessageToCAPI(completion.message);
652
let telemetryDataWithUsage = telemetryData;
653
if (completion.usage) {
654
telemetryDataWithUsage = telemetryData.extendedBy({}, {
655
promptTokens: completion.usage.prompt_tokens,
656
completionTokens: completion.usage.completion_tokens,
657
totalTokens: completion.usage.total_tokens,
658
...(completion.usage.prompt_tokens_details && { cachedTokens: completion.usage.prompt_tokens_details.cached_tokens }),
659
...(completion.usage.completion_tokens_details && {
660
reasoningTokens: completion.usage.completion_tokens_details.reasoning_tokens,
661
acceptedPredictionTokens: completion.usage.completion_tokens_details.accepted_prediction_tokens,
662
rejectedPredictionTokens: completion.usage.completion_tokens_details.rejected_prediction_tokens,
663
}),
664
});
665
}
666
sendEngineMessagesTelemetry(telemetryService, [telemetryMessage], telemetryDataWithUsage, true, logService);
667
668
feed.emitOne(completion);
669
}
670
} catch (e) {
671
feed.reject(e);
672
}
673
});
674
675
for await (const chunk of response.body) {
676
parser.feed(chunk);
677
}
678
}, async () => {
679
await response.body.destroy();
680
});
681
}
682
683
export class AnthropicMessagesProcessor {
684
private textAccumulator: string = '';
685
private toolCallAccumulator: Map<number, { id: string; name: string; arguments: string }> = new Map();
686
private thinkingAccumulator: Map<number, { thinking: string; signature: string }> = new Map();
687
private completedToolCalls: Array<{ id: string; name: string; arguments: string }> = [];
688
private messageId: string = '';
689
private model: string = '';
690
private inputTokens: number = 0;
691
private outputTokens: number = 0;
692
private cacheCreationTokens: number = 0;
693
private cacheReadTokens: number = 0;
694
private contextManagementResponse?: ContextManagementResponse;
695
private stopReason: string | undefined;
696
private stopDetails?: { category?: string; explanation?: string; type?: string };
697
698
constructor(
699
private readonly telemetryData: TelemetryData,
700
private readonly requestId: string,
701
private readonly ghRequestId: string,
702
private readonly serverExperiments: string,
703
@ILogService private readonly logService: ILogService,
704
@ITelemetryService private readonly telemetryService: ITelemetryService,
705
) { }
706
707
/**
708
* Extract IP code citations from copilot_annotations and convert to IIPCodeCitation format
709
*/
710
private extractIPCodeCitations(annotations?: { IPCodeCitations?: AnthropicIPCodeCitation[] }): IIPCodeCitation[] {
711
if (!annotations?.IPCodeCitations?.length) {
712
return [];
713
}
714
715
// Deduplicate by URL since the same citation can appear multiple times
716
const seenUrls = new Set<string>();
717
const citations: IIPCodeCitation[] = [];
718
719
for (const citation of annotations.IPCodeCitations) {
720
const citationDetails = citation.citations;
721
if (!citationDetails) {
722
continue;
723
}
724
725
const { url, license, snippet } = citationDetails;
726
727
if (typeof url !== 'string' || url.trim() === '') {
728
continue;
729
}
730
731
if (typeof license !== 'string' || license.trim() === '') {
732
continue;
733
}
734
735
if (typeof snippet !== 'string' || snippet.trim() === '') {
736
continue;
737
}
738
739
if (!seenUrls.has(url)) {
740
seenUrls.add(url);
741
citations.push({
742
citations: {
743
url,
744
license,
745
snippet,
746
}
747
});
748
}
749
}
750
751
if (citations.length > 0) {
752
this.logService.trace(`[messagesAPI] IP code citations found: ${citations.length} unique citations`);
753
}
754
755
return citations;
756
}
757
758
public push(chunk: AnthropicStreamEvent, _onProgress: FinishedCallback): ChatCompletion | undefined {
759
const onProgress = (delta: IResponseDelta): undefined => {
760
this.textAccumulator += delta.text;
761
_onProgress(this.textAccumulator, 0, delta);
762
};
763
764
switch (chunk.type) {
765
case 'message_start':
766
if (chunk.message) {
767
this.messageId = chunk.message.id;
768
this.model = chunk.message.model;
769
this.inputTokens = chunk.message.usage.input_tokens ?? 0;
770
this.outputTokens = chunk.message.usage.output_tokens ?? 0;
771
this.cacheCreationTokens = chunk.message.usage.cache_creation_input_tokens ?? 0;
772
this.cacheReadTokens = chunk.message.usage.cache_read_input_tokens ?? 0;
773
}
774
return;
775
case 'content_block_start':
776
if (chunk.content_block?.type === 'tool_use' && chunk.index !== undefined) {
777
const toolCallId = chunk.content_block.id || generateUuid();
778
this.toolCallAccumulator.set(chunk.index, {
779
id: toolCallId,
780
name: chunk.content_block.name || '',
781
arguments: '',
782
});
783
if (this.textAccumulator.length) {
784
onProgress({ text: ' ' });
785
}
786
onProgress({
787
text: '',
788
beginToolCalls: [{ name: chunk.content_block.name || '', id: toolCallId }]
789
});
790
} else if (chunk.content_block?.type === 'thinking' && chunk.index !== undefined) {
791
this.thinkingAccumulator.set(chunk.index, {
792
thinking: '',
793
signature: '',
794
});
795
} else if (chunk.content_block?.type === 'redacted_thinking' && chunk.index !== undefined) {
796
const data = (chunk.content_block as { type: 'redacted_thinking'; data: string }).data;
797
onProgress({
798
text: '',
799
thinking: {
800
id: `thinking_${chunk.index}`,
801
encrypted: data,
802
}
803
});
804
}
805
return;
806
case 'content_block_delta':
807
if (chunk.delta) {
808
if (chunk.delta.type === 'text_delta' && chunk.delta.text) {
809
const ipCitations = this.extractIPCodeCitations(chunk.copilot_annotations);
810
if (ipCitations.length > 0) {
811
return onProgress({ text: chunk.delta.text, ipCitations });
812
}
813
return onProgress({ text: chunk.delta.text });
814
} else if (chunk.delta.type === 'thinking_delta' && chunk.delta.thinking && chunk.index !== undefined) {
815
const thinking = this.thinkingAccumulator.get(chunk.index);
816
if (thinking) {
817
thinking.thinking += chunk.delta.thinking;
818
}
819
return onProgress({
820
text: '',
821
thinking: {
822
id: `thinking_${chunk.index}`,
823
text: chunk.delta.thinking,
824
}
825
});
826
} else if (chunk.delta.type === 'signature_delta' && chunk.delta.signature && chunk.index !== undefined) {
827
const thinking = this.thinkingAccumulator.get(chunk.index);
828
if (thinking) {
829
thinking.signature += chunk.delta.signature;
830
}
831
// Don't report signature deltas to the user
832
} else if (chunk.delta.type === 'input_json_delta' && chunk.delta.partial_json && chunk.index !== undefined) {
833
const toolCall = this.toolCallAccumulator.get(chunk.index);
834
if (toolCall) {
835
toolCall.arguments += chunk.delta.partial_json;
836
onProgress({
837
text: '',
838
copilotToolCallStreamUpdates: [{
839
id: toolCall.id,
840
name: toolCall.name,
841
arguments: toolCall.arguments,
842
}],
843
});
844
}
845
}
846
}
847
return;
848
case 'content_block_stop':
849
if (chunk.index !== undefined) {
850
const toolCall = this.toolCallAccumulator.get(chunk.index);
851
if (toolCall) {
852
this.completedToolCalls.push(toolCall);
853
onProgress({
854
text: '',
855
copilotToolCalls: [{
856
id: toolCall.id,
857
name: toolCall.name,
858
arguments: toolCall.arguments,
859
}],
860
});
861
this.toolCallAccumulator.delete(chunk.index);
862
}
863
const thinking = this.thinkingAccumulator.get(chunk.index);
864
if (thinking && thinking.signature) {
865
onProgress({
866
text: '',
867
thinking: {
868
id: `thinking_${chunk.index}`,
869
encrypted: thinking.signature,
870
}
871
});
872
this.thinkingAccumulator.delete(chunk.index);
873
}
874
}
875
return;
876
case 'message_delta':
877
if (chunk.usage) {
878
// message_delta provides the most accurate token counts
879
this.outputTokens = chunk.usage.output_tokens;
880
this.inputTokens = chunk.usage.input_tokens ?? this.inputTokens;
881
this.cacheCreationTokens = chunk.usage.cache_creation_input_tokens ?? this.cacheCreationTokens;
882
this.cacheReadTokens = chunk.usage.cache_read_input_tokens ?? this.cacheReadTokens;
883
}
884
if (chunk.context_management) {
885
this.contextManagementResponse = chunk.context_management;
886
// Report context management via delta so it gets logged to request logger
887
return onProgress({
888
text: '',
889
contextManagement: chunk.context_management
890
});
891
}
892
// Track stop_reason and stop_details for determining finish reason in message_stop
893
if (chunk.delta?.stop_reason) {
894
this.stopReason = chunk.delta.stop_reason;
895
}
896
if (chunk.delta?.stop_details) {
897
this.stopDetails = chunk.delta.stop_details;
898
}
899
return;
900
case 'message_stop': {
901
if (this.contextManagementResponse) {
902
const totalClearedTokens = this.contextManagementResponse.applied_edits.reduce(
903
(sum, edit) => sum + (edit.cleared_input_tokens || 0),
904
0
905
);
906
const totalClearedToolUses = this.contextManagementResponse.applied_edits.reduce(
907
(sum, edit) => sum + (edit.cleared_tool_uses || 0),
908
0
909
);
910
const totalClearedThinkingTurns = this.contextManagementResponse.applied_edits.reduce(
911
(sum, edit) => sum + (edit.cleared_thinking_turns || 0),
912
0
913
);
914
this.logService.trace(`[messagesAPI] Anthropic context editing applied: cleared ${totalClearedTokens} tokens, ${totalClearedToolUses} tool uses.`);
915
916
/* __GDPR__
917
"contextEditingApplied" : {
918
"owner": "bhavyaus",
919
"comment": "Tracks when Anthropic context editing is applied to manage context window",
920
"requestId": { "classification": "SystemMetaData", "purpose": "FeatureInsight", "comment": "The request ID for correlation" },
921
"interactionId": { "classification": "SystemMetaData", "purpose": "FeatureInsight", "comment": "The interaction ID for correlation" },
922
"model": { "classification": "SystemMetaData", "purpose": "FeatureInsight", "comment": "The model used" },
923
"clearedTokens": { "classification": "SystemMetaData", "purpose": "FeatureInsight", "isMeasurement": true, "comment": "Total tokens cleared" },
924
"clearedToolUses": { "classification": "SystemMetaData", "purpose": "FeatureInsight", "isMeasurement": true, "comment": "Total tool uses cleared" },
925
"clearedThinkingTurns": { "classification": "SystemMetaData", "purpose": "FeatureInsight", "isMeasurement": true, "comment": "Total thinking turns cleared" }
926
}
927
*/
928
this.telemetryService.sendMSFTTelemetryEvent('contextEditingApplied',
929
{
930
requestId: this.requestId,
931
interactionId: this.requestId,
932
model: this.model,
933
},
934
{
935
clearedTokens: totalClearedTokens,
936
clearedToolUses: totalClearedToolUses,
937
clearedThinkingTurns: totalClearedThinkingTurns,
938
}
939
);
940
}
941
if (this.stopReason === 'refusal') {
942
const category = this.stopDetails?.category ?? 'unknown';
943
this.logService.warn(`[messagesAPI] Refusal received: category='${category}' for model ${this.model}`);
944
945
/* __GDPR__
946
"messagesApi.refusal" : {
947
"owner": "bhavyaus",
948
"comment": "Tracks Anthropic refusal responses including cyber and other policy categories",
949
"requestId": { "classification": "SystemMetaData", "purpose": "FeatureInsight", "comment": "The request ID for correlation" },
950
"model": { "classification": "SystemMetaData", "purpose": "FeatureInsight", "comment": "The model that produced the refusal" },
951
"category": { "classification": "SystemMetaData", "purpose": "FeatureInsight", "comment": "The refusal category (e.g. cyber, content_policy)" }
952
}
953
*/
954
this.telemetryService.sendMSFTTelemetryEvent('messagesApi.refusal',
955
{
956
requestId: this.requestId,
957
model: this.model,
958
category,
959
}
960
);
961
}
962
963
let finishReason: FinishedCompletionReason;
964
switch (this.stopReason) {
965
case 'refusal':
966
finishReason = FinishedCompletionReason.ClientDone;
967
break;
968
case 'max_tokens':
969
case 'model_context_window_exceeded':
970
finishReason = FinishedCompletionReason.Length;
971
break;
972
default:
973
finishReason = FinishedCompletionReason.Stop;
974
break;
975
}
976
977
const computedPromptTokens = this.inputTokens + this.cacheCreationTokens + this.cacheReadTokens;
978
if (computedPromptTokens < this.cacheReadTokens) {
979
this.logService.warn(`[messagesAPI] Token count inconsistency: computed prompt_tokens (${computedPromptTokens}) < cached_tokens (${this.cacheReadTokens}). Raw values: inputTokens=${this.inputTokens}, cacheCreationTokens=${this.cacheCreationTokens}, cacheReadTokens=${this.cacheReadTokens}`);
980
}
981
982
return {
983
blockFinished: true,
984
choiceIndex: 0,
985
model: this.model,
986
tokens: [],
987
telemetryData: this.telemetryData,
988
requestId: {
989
headerRequestId: this.requestId,
990
gitHubRequestId: this.ghRequestId,
991
completionId: this.messageId,
992
created: Date.now(),
993
deploymentId: '',
994
serverExperiments: this.serverExperiments,
995
},
996
usage: {
997
prompt_tokens: computedPromptTokens,
998
completion_tokens: this.outputTokens,
999
total_tokens: computedPromptTokens + this.outputTokens,
1000
prompt_tokens_details: {
1001
cached_tokens: this.cacheReadTokens,
1002
cache_creation_input_tokens: this.cacheCreationTokens,
1003
},
1004
completion_tokens_details: {
1005
reasoning_tokens: 0,
1006
accepted_prediction_tokens: 0,
1007
rejected_prediction_tokens: 0,
1008
},
1009
},
1010
finishReason,
1011
message: {
1012
role: Raw.ChatRole.Assistant,
1013
content: this.textAccumulator ? [{
1014
type: Raw.ChatCompletionContentPartKind.Text,
1015
text: this.textAccumulator
1016
}] : [],
1017
...(this.completedToolCalls.length > 0 ? {
1018
toolCalls: this.completedToolCalls.map(tc => ({
1019
id: tc.id,
1020
type: 'function' as const,
1021
function: {
1022
name: tc.name,
1023
arguments: tc.arguments
1024
}
1025
}))
1026
} : {})
1027
}
1028
};
1029
}
1030
case 'error': {
1031
const errorMessage = (chunk as unknown as { error?: { message?: string } }).error?.message || 'Unknown error';
1032
return onProgress({
1033
text: '',
1034
copilotErrors: [{
1035
agent: 'anthropic',
1036
code: 'unknown',
1037
message: errorMessage,
1038
type: 'error',
1039
identifier: undefined
1040
}]
1041
});
1042
}
1043
}
1044
}
1045
}
1046
1047
1048
1049