Path: blob/main/src/vs/base/parts/ipc/test/common/ipc.test.ts
4780 views
/*---------------------------------------------------------------------------------------------1* Copyright (c) Microsoft Corporation. All rights reserved.2* Licensed under the MIT License. See License.txt in the project root for license information.3*--------------------------------------------------------------------------------------------*/45import assert from 'assert';6import { timeout } from '../../../../common/async.js';7import { VSBuffer } from '../../../../common/buffer.js';8import { CancellationToken, CancellationTokenSource } from '../../../../common/cancellation.js';9import { canceled } from '../../../../common/errors.js';10import { Emitter, Event } from '../../../../common/event.js';11import { DisposableStore } from '../../../../common/lifecycle.js';12import { isEqual } from '../../../../common/resources.js';13import { URI } from '../../../../common/uri.js';14import { BufferReader, BufferWriter, ClientConnectionEvent, deserialize, IChannel, IMessagePassingProtocol, IPCClient, IPCServer, IServerChannel, ProxyChannel, serialize } from '../../common/ipc.js';15import { ensureNoDisposablesAreLeakedInTestSuite } from '../../../../test/common/utils.js';1617class QueueProtocol implements IMessagePassingProtocol {1819private buffering = true;20private buffers: VSBuffer[] = [];2122private readonly _onMessage = new Emitter<VSBuffer>({23onDidAddFirstListener: () => {24for (const buffer of this.buffers) {25this._onMessage.fire(buffer);26}2728this.buffers = [];29this.buffering = false;30},31onDidRemoveLastListener: () => {32this.buffering = true;33}34});3536readonly onMessage = this._onMessage.event;37other!: QueueProtocol;3839send(buffer: VSBuffer): void {40this.other.receive(buffer);41}4243protected receive(buffer: VSBuffer): void {44if (this.buffering) {45this.buffers.push(buffer);46} else {47this._onMessage.fire(buffer);48}49}50}5152function createProtocolPair(): [IMessagePassingProtocol, IMessagePassingProtocol] {53const one = new QueueProtocol();54const other = new QueueProtocol();55one.other = other;56other.other = one;5758return [one, other];59}6061class TestIPCClient extends IPCClient<string> {6263private readonly _onDidDisconnect = new Emitter<void>();64readonly onDidDisconnect = this._onDidDisconnect.event;6566constructor(protocol: IMessagePassingProtocol, id: string) {67super(protocol, id);68}6970override dispose(): void {71this._onDidDisconnect.fire();72super.dispose();73}74}7576class TestIPCServer extends IPCServer<string> {7778private readonly onDidClientConnect: Emitter<ClientConnectionEvent>;7980constructor() {81const onDidClientConnect = new Emitter<ClientConnectionEvent>();82super(onDidClientConnect.event);83this.onDidClientConnect = onDidClientConnect;84}8586createConnection(id: string): IPCClient<string> {87const [pc, ps] = createProtocolPair();88const client = new TestIPCClient(pc, id);8990this.onDidClientConnect.fire({91protocol: ps,92onDidClientDisconnect: client.onDidDisconnect93});9495return client;96}97}9899const TestChannelId = 'testchannel';100101interface ITestService {102marco(): Promise<string>;103error(message: string): Promise<void>;104neverComplete(): Promise<void>;105neverCompleteCT(cancellationToken: CancellationToken): Promise<void>;106buffersLength(buffers: VSBuffer[]): Promise<number>;107marshall(uri: URI): Promise<URI>;108context(): Promise<unknown>;109110readonly onPong: Event<string>;111}112113class TestService implements ITestService {114115private readonly disposables = new DisposableStore();116117private readonly _onPong = new Emitter<string>();118readonly onPong = this._onPong.event;119120marco(): Promise<string> {121return Promise.resolve('polo');122}123124error(message: string): Promise<void> {125return Promise.reject(new Error(message));126}127128neverComplete(): Promise<void> {129return new Promise(_ => { });130}131132neverCompleteCT(cancellationToken: CancellationToken): Promise<void> {133if (cancellationToken.isCancellationRequested) {134return Promise.reject(canceled());135}136137return new Promise((_, e) => this.disposables.add(cancellationToken.onCancellationRequested(() => e(canceled()))));138}139140buffersLength(buffers: VSBuffer[]): Promise<number> {141return Promise.resolve(buffers.reduce((r, b) => r + b.buffer.length, 0));142}143144ping(msg: string): void {145this._onPong.fire(msg);146}147148marshall(uri: URI): Promise<URI> {149return Promise.resolve(uri);150}151152context(context?: unknown): Promise<unknown> {153return Promise.resolve(context);154}155156dispose() {157this.disposables.dispose();158}159}160161class TestChannel implements IServerChannel {162163constructor(private service: ITestService) { }164165call(_: unknown, command: string, arg: any, cancellationToken: CancellationToken): Promise<any> {166switch (command) {167case 'marco': return this.service.marco();168case 'error': return this.service.error(arg);169case 'neverComplete': return this.service.neverComplete();170case 'neverCompleteCT': return this.service.neverCompleteCT(cancellationToken);171case 'buffersLength': return this.service.buffersLength(arg);172default: return Promise.reject(new Error('not implemented'));173}174}175176listen(_: unknown, event: string, arg?: any): Event<any> {177switch (event) {178case 'onPong': return this.service.onPong;179default: throw new Error('not implemented');180}181}182}183184class TestChannelClient implements ITestService {185186get onPong(): Event<string> {187return this.channel.listen('onPong');188}189190constructor(private channel: IChannel) { }191192marco(): Promise<string> {193return this.channel.call('marco');194}195196error(message: string): Promise<void> {197return this.channel.call('error', message);198}199200neverComplete(): Promise<void> {201return this.channel.call('neverComplete');202}203204neverCompleteCT(cancellationToken: CancellationToken): Promise<void> {205return this.channel.call('neverCompleteCT', undefined, cancellationToken);206}207208buffersLength(buffers: VSBuffer[]): Promise<number> {209return this.channel.call('buffersLength', buffers);210}211212marshall(uri: URI): Promise<URI> {213return this.channel.call('marshall', uri);214}215216context(): Promise<unknown> {217return this.channel.call('context');218}219}220221suite('Base IPC', function () {222223const store = ensureNoDisposablesAreLeakedInTestSuite();224225test('createProtocolPair', async function () {226const [clientProtocol, serverProtocol] = createProtocolPair();227228const b1 = VSBuffer.alloc(0);229clientProtocol.send(b1);230231const b3 = VSBuffer.alloc(0);232serverProtocol.send(b3);233234const b2 = await Event.toPromise(serverProtocol.onMessage);235const b4 = await Event.toPromise(clientProtocol.onMessage);236237assert.strictEqual(b1, b2);238assert.strictEqual(b3, b4);239});240241suite('one to one', function () {242let server: IPCServer;243let client: IPCClient;244let service: TestService;245let ipcService: ITestService;246247setup(function () {248service = store.add(new TestService());249const testServer = store.add(new TestIPCServer());250server = testServer;251252server.registerChannel(TestChannelId, new TestChannel(service));253254client = store.add(testServer.createConnection('client1'));255ipcService = new TestChannelClient(client.getChannel(TestChannelId));256});257258test('call success', async function () {259const r = await ipcService.marco();260return assert.strictEqual(r, 'polo');261});262263test('call error', async function () {264try {265await ipcService.error('nice error');266return assert.fail('should not reach here');267} catch (err) {268return assert.strictEqual(err.message, 'nice error');269}270});271272test('cancel call with cancelled cancellation token', async function () {273try {274await ipcService.neverCompleteCT(CancellationToken.Cancelled);275return assert.fail('should not reach here');276} catch (err) {277return assert(err.message === 'Canceled');278}279});280281test('cancel call with cancellation token (sync)', function () {282const cts = new CancellationTokenSource();283const promise = ipcService.neverCompleteCT(cts.token).then(284_ => assert.fail('should not reach here'),285err => assert(err.message === 'Canceled')286);287288cts.cancel();289290return promise;291});292293test('cancel call with cancellation token (async)', function () {294const cts = new CancellationTokenSource();295const promise = ipcService.neverCompleteCT(cts.token).then(296_ => assert.fail('should not reach here'),297err => assert(err.message === 'Canceled')298);299300setTimeout(() => cts.cancel());301302return promise;303});304305test('listen to events', async function () {306const messages: string[] = [];307308store.add(ipcService.onPong(msg => messages.push(msg)));309await timeout(0);310311assert.deepStrictEqual(messages, []);312service.ping('hello');313await timeout(0);314315assert.deepStrictEqual(messages, ['hello']);316service.ping('world');317await timeout(0);318319assert.deepStrictEqual(messages, ['hello', 'world']);320});321322test('buffers in arrays', async function () {323const r = await ipcService.buffersLength([VSBuffer.alloc(2), VSBuffer.alloc(3)]);324return assert.strictEqual(r, 5);325});326327test('round trips numbers', () => {328const input = [3290,3301,331-1,33212345,333-12345,33442.6,335123412341234336];337338const writer = new BufferWriter();339serialize(writer, input);340assert.deepStrictEqual(deserialize(new BufferReader(writer.buffer)), input);341});342});343344suite('one to one (proxy)', function () {345let server: IPCServer;346let client: IPCClient;347let service: TestService;348let ipcService: ITestService;349350const disposables = new DisposableStore();351352setup(function () {353service = store.add(new TestService());354const testServer = disposables.add(new TestIPCServer());355server = testServer;356357server.registerChannel(TestChannelId, ProxyChannel.fromService(service, disposables));358359client = disposables.add(testServer.createConnection('client1'));360ipcService = ProxyChannel.toService(client.getChannel(TestChannelId));361});362363teardown(function () {364disposables.clear();365});366367test('call success', async function () {368const r = await ipcService.marco();369return assert.strictEqual(r, 'polo');370});371372test('call error', async function () {373try {374await ipcService.error('nice error');375return assert.fail('should not reach here');376} catch (err) {377return assert.strictEqual(err.message, 'nice error');378}379});380381test('listen to events', async function () {382const messages: string[] = [];383384disposables.add(ipcService.onPong(msg => messages.push(msg)));385await timeout(0);386387assert.deepStrictEqual(messages, []);388service.ping('hello');389await timeout(0);390391assert.deepStrictEqual(messages, ['hello']);392service.ping('world');393await timeout(0);394395assert.deepStrictEqual(messages, ['hello', 'world']);396});397398test('marshalling uri', async function () {399const uri = URI.file('foobar');400const r = await ipcService.marshall(uri);401assert.ok(r instanceof URI);402return assert.ok(isEqual(r, uri));403});404405test('buffers in arrays', async function () {406const r = await ipcService.buffersLength([VSBuffer.alloc(2), VSBuffer.alloc(3)]);407return assert.strictEqual(r, 5);408});409});410411suite('one to one (proxy, extra context)', function () {412let server: IPCServer;413let client: IPCClient;414let service: TestService;415let ipcService: ITestService;416417const disposables = new DisposableStore();418419setup(function () {420service = store.add(new TestService());421const testServer = disposables.add(new TestIPCServer());422server = testServer;423424server.registerChannel(TestChannelId, ProxyChannel.fromService(service, disposables));425426client = disposables.add(testServer.createConnection('client1'));427ipcService = ProxyChannel.toService(client.getChannel(TestChannelId), { context: 'Super Context' });428});429430teardown(function () {431disposables.clear();432});433434test('call extra context', async function () {435const r = await ipcService.context();436return assert.strictEqual(r, 'Super Context');437});438});439440suite('one to many', function () {441test('all clients get pinged', async function () {442const service = store.add(new TestService());443const channel = new TestChannel(service);444const server = store.add(new TestIPCServer());445server.registerChannel('channel', channel);446447let client1GotPinged = false;448const client1 = store.add(server.createConnection('client1'));449const ipcService1 = new TestChannelClient(client1.getChannel('channel'));450store.add(ipcService1.onPong(() => client1GotPinged = true));451452let client2GotPinged = false;453const client2 = store.add(server.createConnection('client2'));454const ipcService2 = new TestChannelClient(client2.getChannel('channel'));455store.add(ipcService2.onPong(() => client2GotPinged = true));456457await timeout(1);458service.ping('hello');459460await timeout(1);461assert(client1GotPinged, 'client 1 got pinged');462assert(client2GotPinged, 'client 2 got pinged');463});464465test('server gets pings from all clients (broadcast channel)', async function () {466const server = store.add(new TestIPCServer());467468const client1 = server.createConnection('client1');469const clientService1 = store.add(new TestService());470const clientChannel1 = new TestChannel(clientService1);471client1.registerChannel('channel', clientChannel1);472473const pings: string[] = [];474const channel = server.getChannel('channel', () => true);475const service = new TestChannelClient(channel);476store.add(service.onPong(msg => pings.push(msg)));477478await timeout(1);479clientService1.ping('hello 1');480481await timeout(1);482assert.deepStrictEqual(pings, ['hello 1']);483484const client2 = server.createConnection('client2');485const clientService2 = store.add(new TestService());486const clientChannel2 = new TestChannel(clientService2);487client2.registerChannel('channel', clientChannel2);488489await timeout(1);490clientService2.ping('hello 2');491492await timeout(1);493assert.deepStrictEqual(pings, ['hello 1', 'hello 2']);494495client1.dispose();496clientService1.ping('hello 1');497498await timeout(1);499assert.deepStrictEqual(pings, ['hello 1', 'hello 2']);500501await timeout(1);502clientService2.ping('hello again 2');503504await timeout(1);505assert.deepStrictEqual(pings, ['hello 1', 'hello 2', 'hello again 2']);506507client2.dispose();508});509});510});511512513