Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/test/mcp/src/inMemoryTransport.ts
3520 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
import { JSONRPCMessage, MessageExtraInfo } from '@modelcontextprotocol/sdk/types.js';
7
import { Transport, TransportSendOptions } from '@modelcontextprotocol/sdk/shared/transport.js';
8
import { Duplex } from 'stream';
9
10
/**
11
* Creates a pair of in-memory transports that are connected to each other.
12
* Messages sent on one transport are received on the other transport.
13
* This uses actual Node.js streams to simulate real stdio behavior.
14
*
15
* @returns A tuple of [serverTransport, clientTransport] where the server
16
* and client can communicate with each other through these transports.
17
*/
18
export function createInMemoryTransportPair(): [InMemoryTransport, InMemoryTransport] {
19
// Create two duplex streams that are connected to each other
20
const serverStream = new Duplex({ objectMode: true, allowHalfOpen: false });
21
const clientStream = new Duplex({ objectMode: true, allowHalfOpen: false });
22
23
// Cross-connect the streams: server writes go to client reads and vice versa
24
// Server stream implementation
25
serverStream._write = (chunk: any, encoding: BufferEncoding, callback: (error?: Error | null) => void) => {
26
// When server writes, client should receive it
27
clientStream.push(chunk);
28
callback();
29
};
30
31
serverStream._read = () => {
32
// Signal that we're ready to read - no action needed for cross-connected streams
33
};
34
35
// Client stream implementation
36
clientStream._write = (chunk: any, encoding: BufferEncoding, callback: (error?: Error | null) => void) => {
37
// When client writes, server should receive it
38
serverStream.push(chunk);
39
callback();
40
};
41
42
clientStream._read = () => {
43
// Signal that we're ready to read - no action needed for cross-connected streams
44
};
45
46
// Handle stream ending properly
47
serverStream.on('end', () => {
48
if (!clientStream.destroyed) {
49
clientStream.push(null);
50
}
51
});
52
53
clientStream.on('end', () => {
54
if (!serverStream.destroyed) {
55
serverStream.push(null);
56
}
57
});
58
59
const serverTransport = new InMemoryTransport(serverStream);
60
const clientTransport = new InMemoryTransport(clientStream);
61
62
return [serverTransport, clientTransport];
63
}
64
65
/**
66
* An in-memory transport implementation that allows two MCP endpoints to communicate
67
* using Node.js streams, similar to how StdioTransport works. This provides more
68
* realistic behavior than direct message passing.
69
*/
70
export class InMemoryTransport implements Transport {
71
private _stream: Duplex;
72
private _started = false;
73
private _closed = false;
74
private _sessionId: string;
75
76
// Transport callbacks
77
public onclose?: () => void;
78
public onerror?: (error: Error) => void;
79
public onmessage?: (message: JSONRPCMessage, extra?: MessageExtraInfo) => void;
80
81
constructor(stream: Duplex) {
82
this._stream = stream;
83
this._sessionId = `memory-${Math.random().toString(36).substring(2, 15)}`;
84
85
// Set up stream event handlers
86
this._stream.on('data', (data: any) => {
87
if (this._started && !this._closed) {
88
try {
89
// Expect data to be a JSON-RPC message object
90
const message = typeof data === 'string' ? JSON.parse(data) : data;
91
const extra: MessageExtraInfo | undefined = undefined;
92
this.onmessage?.(message, extra);
93
} catch (error) {
94
this.onerror?.(error instanceof Error ? error : new Error(String(error)));
95
}
96
}
97
});
98
99
this._stream.on('error', (error: Error) => {
100
this.onerror?.(error);
101
});
102
103
this._stream.on('end', () => {
104
this._closed = true;
105
this.onclose?.();
106
});
107
108
this._stream.on('close', () => {
109
this._closed = true;
110
this.onclose?.();
111
});
112
}
113
114
/**
115
* Starts the transport. This must be called before sending or receiving messages.
116
*/
117
async start(): Promise<void> {
118
if (this._started) {
119
return;
120
}
121
122
if (this._closed) {
123
throw new Error('Cannot start a closed transport');
124
}
125
126
this._started = true;
127
}
128
129
/**
130
* Sends a JSON-RPC message through the stream.
131
*/
132
async send(message: JSONRPCMessage, options?: TransportSendOptions): Promise<void> {
133
if (!this._started) {
134
throw new Error('Transport not started');
135
}
136
137
if (this._closed) {
138
throw new Error('Transport is closed');
139
}
140
141
// Write the message to the stream - similar to how StdioTransport works
142
return new Promise<void>((resolve, reject) => {
143
this._stream.write(message, (error) => {
144
if (error) {
145
reject(error);
146
} else {
147
resolve();
148
}
149
});
150
});
151
}
152
153
/**
154
* Closes the transport and the underlying stream.
155
*/
156
async close(): Promise<void> {
157
if (this._closed) {
158
return;
159
}
160
161
this._closed = true;
162
163
// End the stream, which will trigger the 'end' event on the peer
164
return new Promise<void>((resolve) => {
165
this._stream.end(() => {
166
resolve();
167
});
168
});
169
}
170
171
/**
172
* Gets the session ID for this transport connection.
173
*/
174
get sessionId(): string {
175
return this._sessionId;
176
}
177
178
/**
179
* Sets the protocol version (optional implementation).
180
*/
181
setProtocolVersion?(version: string): void {
182
// No-op for in-memory transport
183
}
184
185
/**
186
* Checks if the transport is currently connected and started.
187
*/
188
get isConnected(): boolean {
189
return this._started && !this._closed && !this._stream.destroyed;
190
}
191
192
/**
193
* Checks if the transport has been closed.
194
*/
195
get isClosed(): boolean {
196
return this._closed || this._stream.destroyed;
197
}
198
}
199
200