import { JSONRPCMessage, MessageExtraInfo } from '@modelcontextprotocol/sdk/types.js';
import { Transport, TransportSendOptions } from '@modelcontextprotocol/sdk/shared/transport.js';
import { Duplex } from 'stream';
export function createInMemoryTransportPair(): [InMemoryTransport, InMemoryTransport] {
const serverStream = new Duplex({ objectMode: true, allowHalfOpen: false });
const clientStream = new Duplex({ objectMode: true, allowHalfOpen: false });
serverStream._write = (chunk: any, encoding: BufferEncoding, callback: (error?: Error | null) => void) => {
clientStream.push(chunk);
callback();
};
serverStream._read = () => {
};
clientStream._write = (chunk: any, encoding: BufferEncoding, callback: (error?: Error | null) => void) => {
serverStream.push(chunk);
callback();
};
clientStream._read = () => {
};
serverStream.on('end', () => {
if (!clientStream.destroyed) {
clientStream.push(null);
}
});
clientStream.on('end', () => {
if (!serverStream.destroyed) {
serverStream.push(null);
}
});
const serverTransport = new InMemoryTransport(serverStream);
const clientTransport = new InMemoryTransport(clientStream);
return [serverTransport, clientTransport];
}
export class InMemoryTransport implements Transport {
private _stream: Duplex;
private _started = false;
private _closed = false;
private _sessionId: string;
public onclose?: () => void;
public onerror?: (error: Error) => void;
public onmessage?: (message: JSONRPCMessage, extra?: MessageExtraInfo) => void;
constructor(stream: Duplex) {
this._stream = stream;
this._sessionId = `memory-${Math.random().toString(36).substring(2, 15)}`;
this._stream.on('data', (data: any) => {
if (this._started && !this._closed) {
try {
const message = typeof data === 'string' ? JSON.parse(data) : data;
const extra: MessageExtraInfo | undefined = undefined;
this.onmessage?.(message, extra);
} catch (error) {
this.onerror?.(error instanceof Error ? error : new Error(String(error)));
}
}
});
this._stream.on('error', (error: Error) => {
this.onerror?.(error);
});
this._stream.on('end', () => {
this._closed = true;
this.onclose?.();
});
this._stream.on('close', () => {
this._closed = true;
this.onclose?.();
});
}
async start(): Promise<void> {
if (this._started) {
return;
}
if (this._closed) {
throw new Error('Cannot start a closed transport');
}
this._started = true;
}
async send(message: JSONRPCMessage, options?: TransportSendOptions): Promise<void> {
if (!this._started) {
throw new Error('Transport not started');
}
if (this._closed) {
throw new Error('Transport is closed');
}
return new Promise<void>((resolve, reject) => {
this._stream.write(message, (error) => {
if (error) {
reject(error);
} else {
resolve();
}
});
});
}
async close(): Promise<void> {
if (this._closed) {
return;
}
this._closed = true;
return new Promise<void>((resolve) => {
this._stream.end(() => {
resolve();
});
});
}
get sessionId(): string {
return this._sessionId;
}
setProtocolVersion?(version: string): void {
}
get isConnected(): boolean {
return this._started && !this._closed && !this._stream.destroyed;
}
get isClosed(): boolean {
return this._closed || this._stream.destroyed;
}
}