Path: blob/main/src/vs/platform/agentHost/test/node/protocol/multiClient.integrationTest.ts
13405 views
/*---------------------------------------------------------------------------------------------1* Copyright (c) Microsoft Corporation. All rights reserved.2* Licensed under the MIT License. See License.txt in the project root for license information.3*--------------------------------------------------------------------------------------------*/45import assert from 'assert';6import { SubscribeResult } from '../../../common/state/protocol/commands.js';7import type { SessionAddedNotification, SessionRemovedNotification } from '../../../common/state/sessionActions.js';8import { PROTOCOL_VERSION } from '../../../common/state/sessionCapabilities.js';9import type { INotificationBroadcastParams, ReconnectResult } from '../../../common/state/sessionProtocol.js';10import type { SessionState } from '../../../common/state/sessionState.js';11import {12createAndSubscribeSession,13dispatchTurnStarted,14getActionEnvelope,15IServerHandle,16isActionNotification,17nextSessionUri,18startServer,19TestProtocolClient,20} from './testHelpers.js';2122suite('Protocol WebSocket — Multi-Client', function () {2324let server: IServerHandle;25let client: TestProtocolClient;2627suiteSetup(async function () {28this.timeout(15_000);29server = await startServer();30});3132suiteTeardown(function () {33server.process.kill();34});3536setup(async function () {37this.timeout(10_000);38client = new TestProtocolClient(server.port);39await client.connect();40});4142teardown(function () {43client.close();44});4546test('sessionAdded notification is broadcast to all connected clients', async function () {47this.timeout(10_000);4849await client.call('initialize', { protocolVersion: PROTOCOL_VERSION, clientId: 'test-broadcast-add-1' });5051const client2 = new TestProtocolClient(server.port);52await client2.connect();53await client2.call('initialize', { protocolVersion: PROTOCOL_VERSION, clientId: 'test-broadcast-add-2' });5455client.clearReceived();56client2.clearReceived();5758await client.call('createSession', { session: nextSessionUri(), provider: 'mock' });5960const n1 = await client.waitForNotification(n =>61n.method === 'notification' && (n.params as INotificationBroadcastParams).notification.type === 'notify/sessionAdded'62);63const n2 = await client2.waitForNotification(n =>64n.method === 'notification' && (n.params as INotificationBroadcastParams).notification.type === 'notify/sessionAdded'65);66assert.ok(n1, 'client 1 should receive sessionAdded');67assert.ok(n2, 'client 2 should receive sessionAdded');6869const uri1 = ((n1.params as INotificationBroadcastParams).notification as SessionAddedNotification).summary.resource;70const uri2 = ((n2.params as INotificationBroadcastParams).notification as SessionAddedNotification).summary.resource;71assert.strictEqual(uri1, uri2, 'both clients should see the same session URI');7273client2.close();74});7576test('sessionRemoved notification is broadcast to all connected clients', async function () {77this.timeout(10_000);7879const sessionUri = await createAndSubscribeSession(client, 'test-broadcast-remove-1');8081const client2 = new TestProtocolClient(server.port);82await client2.connect();83await client2.call('initialize', { protocolVersion: PROTOCOL_VERSION, clientId: 'test-broadcast-remove-2' });84client2.clearReceived();8586await client.call('disposeSession', { session: sessionUri });8788const n1 = await client.waitForNotification(n =>89n.method === 'notification' && (n.params as INotificationBroadcastParams).notification.type === 'notify/sessionRemoved'90);91const n2 = await client2.waitForNotification(n =>92n.method === 'notification' && (n.params as INotificationBroadcastParams).notification.type === 'notify/sessionRemoved'93);94assert.ok(n1, 'client 1 should receive sessionRemoved');95assert.ok(n2, 'client 2 should receive sessionRemoved even without subscribing');9697const removed1 = (n1.params as INotificationBroadcastParams).notification as SessionRemovedNotification;98const removed2 = (n2.params as INotificationBroadcastParams).notification as SessionRemovedNotification;99assert.strictEqual(removed1.session.toString(), sessionUri.toString());100assert.strictEqual(removed2.session.toString(), sessionUri.toString());101102client2.close();103});104105test('two clients on same session both see actions', async function () {106this.timeout(10_000);107108const sessionUri = await createAndSubscribeSession(client, 'test-multi-client-1');109110const client2 = new TestProtocolClient(server.port);111await client2.connect();112await client2.call('initialize', { protocolVersion: PROTOCOL_VERSION, clientId: 'test-multi-client-2' });113await client2.call('subscribe', { resource: sessionUri });114client2.clearReceived();115116dispatchTurnStarted(client, sessionUri, 'turn-mc', 'hello', 1);117118const d1 = await client.waitForNotification(n => isActionNotification(n, 'session/responsePart'));119const d2 = await client2.waitForNotification(n => isActionNotification(n, 'session/responsePart'));120assert.ok(d1);121assert.ok(d2);122123await client.waitForNotification(n => isActionNotification(n, 'session/turnComplete'));124await client2.waitForNotification(n => isActionNotification(n, 'session/turnComplete'));125126client2.close();127});128129test('client B sends message on session created by client A', async function () {130this.timeout(10_000);131132const sessionUri = await createAndSubscribeSession(client, 'test-cross-msg-1');133134const client2 = new TestProtocolClient(server.port);135await client2.connect();136await client2.call('initialize', { protocolVersion: PROTOCOL_VERSION, clientId: 'test-cross-msg-2' });137await client2.call('subscribe', { resource: sessionUri });138client.clearReceived();139client2.clearReceived();140141// Client B dispatches the turn142dispatchTurnStarted(client2, sessionUri, 'turn-cross', 'hello', 1);143144const r1 = await client.waitForNotification(n => isActionNotification(n, 'session/responsePart'));145const r2 = await client2.waitForNotification(n => isActionNotification(n, 'session/responsePart'));146assert.ok(r1, 'client A should see responsePart from client B turn');147assert.ok(r2, 'client B should see its own responsePart');148149await client.waitForNotification(n => isActionNotification(n, 'session/turnComplete'));150await client2.waitForNotification(n => isActionNotification(n, 'session/turnComplete'));151152client2.close();153});154155test('both clients receive full tool progress updates', async function () {156this.timeout(10_000);157158const sessionUri = await createAndSubscribeSession(client, 'test-tool-progress-1');159160const client2 = new TestProtocolClient(server.port);161await client2.connect();162await client2.call('initialize', { protocolVersion: PROTOCOL_VERSION, clientId: 'test-tool-progress-2' });163await client2.call('subscribe', { resource: sessionUri });164client.clearReceived();165client2.clearReceived();166167dispatchTurnStarted(client, sessionUri, 'turn-tool-mc', 'use-tool', 1);168169// Both clients should see the full tool lifecycle170for (const c of [client, client2]) {171await c.waitForNotification(n => isActionNotification(n, 'session/toolCallStart'));172await c.waitForNotification(n => isActionNotification(n, 'session/toolCallReady'));173await c.waitForNotification(n => isActionNotification(n, 'session/toolCallComplete'));174await c.waitForNotification(n => isActionNotification(n, 'session/responsePart'));175await c.waitForNotification(n => isActionNotification(n, 'session/turnComplete'));176}177178client2.close();179});180181test('unsubscribe stops receiving session actions', async function () {182this.timeout(10_000);183184const sessionUri = await createAndSubscribeSession(client, 'test-unsubscribe');185client.notify('unsubscribe', { resource: sessionUri });186await new Promise(resolve => setTimeout(resolve, 100));187client.clearReceived();188189const client2 = new TestProtocolClient(server.port);190await client2.connect();191await client2.call('initialize', { protocolVersion: PROTOCOL_VERSION, clientId: 'test-unsub-helper' });192await client2.call('subscribe', { resource: sessionUri });193194dispatchTurnStarted(client2, sessionUri, 'turn-unsub', 'hello', 1);195await client2.waitForNotification(n => isActionNotification(n, 'session/turnComplete'));196197await new Promise(resolve => setTimeout(resolve, 300));198const sessionActions = client.receivedNotifications(n => isActionNotification(n, 'session/'));199assert.strictEqual(sessionActions.length, 0, 'unsubscribed client should not receive session actions');200201client2.close();202});203204test('unsubscribed client receives no actions but still gets notifications', async function () {205this.timeout(10_000);206207const sessionUri = await createAndSubscribeSession(client, 'test-scoping-1');208209const client2 = new TestProtocolClient(server.port);210await client2.connect();211await client2.call('initialize', { protocolVersion: PROTOCOL_VERSION, clientId: 'test-scoping-2' });212// Client 2 does NOT subscribe to the session213client2.clearReceived();214215dispatchTurnStarted(client, sessionUri, 'turn-scoped', 'hello', 1);216await client.waitForNotification(n => isActionNotification(n, 'session/turnComplete'));217218// Give some time for any stray actions to arrive219await new Promise(resolve => setTimeout(resolve, 300));220const sessionActions = client2.receivedNotifications(n => n.method === 'action');221assert.strictEqual(sessionActions.length, 0, 'unsubscribed client should receive no session actions');222223// But disposing the session should still broadcast a notification224client2.clearReceived();225await client.call('disposeSession', { session: sessionUri });226227const removed = await client2.waitForNotification(n =>228n.method === 'notification' && (n.params as INotificationBroadcastParams).notification.type === 'notify/sessionRemoved'229);230assert.ok(removed, 'unsubscribed client should still receive sessionRemoved notification');231232client2.close();233});234235test('late subscriber gets current state via snapshot', async function () {236this.timeout(15_000);237238const sessionUri = await createAndSubscribeSession(client, 'test-late-sub');239dispatchTurnStarted(client, sessionUri, 'turn-late', 'hello', 1);240await client.waitForNotification(n => isActionNotification(n, 'session/turnComplete'));241242// Client 2 joins after the turn has completed243const client2 = new TestProtocolClient(server.port);244await client2.connect();245await client2.call('initialize', { protocolVersion: PROTOCOL_VERSION, clientId: 'test-late-sub-2' });246247const result = await client2.call<SubscribeResult>('subscribe', { resource: sessionUri });248const state = result.snapshot.state as SessionState;249assert.ok(state.turns.length >= 1, `late subscriber should see completed turn, got ${state.turns.length}`);250assert.strictEqual(state.turns[0].id, 'turn-late');251assert.strictEqual(state.turns[0].state, 'complete');252253client2.close();254});255256test('permission flow: client B confirms tool started by client A', async function () {257this.timeout(10_000);258259const sessionUri = await createAndSubscribeSession(client, 'test-cross-perm-1');260261const client2 = new TestProtocolClient(server.port);262await client2.connect();263await client2.call('initialize', { protocolVersion: PROTOCOL_VERSION, clientId: 'test-cross-perm-2' });264await client2.call('subscribe', { resource: sessionUri });265client.clearReceived();266client2.clearReceived();267268// Client A starts the permission turn269dispatchTurnStarted(client, sessionUri, 'turn-cross-perm', 'permission', 1);270271// Both clients should see tool_start and tool_ready272await client.waitForNotification(n => isActionNotification(n, 'session/toolCallStart'));273await client2.waitForNotification(n => isActionNotification(n, 'session/toolCallStart'));274await client.waitForNotification(n => isActionNotification(n, 'session/toolCallReady'));275await client2.waitForNotification(n => isActionNotification(n, 'session/toolCallReady'));276277// Client B confirms the tool call278client2.notify('dispatchAction', {279clientSeq: 1,280action: {281type: 'session/toolCallConfirmed',282session: sessionUri,283turnId: 'turn-cross-perm',284toolCallId: 'tc-perm-1',285approved: true,286},287});288289// Both clients should see the response and turn completion290await client.waitForNotification(n => isActionNotification(n, 'session/responsePart'));291await client2.waitForNotification(n => isActionNotification(n, 'session/responsePart'));292await client.waitForNotification(n => isActionNotification(n, 'session/turnComplete'));293await client2.waitForNotification(n => isActionNotification(n, 'session/turnComplete'));294295client2.close();296});297298test('reconnect replays missed actions', async function () {299this.timeout(15_000);300301const sessionUri = await createAndSubscribeSession(client, 'test-reconnect');302dispatchTurnStarted(client, sessionUri, 'turn-recon', 'hello', 1);303await client.waitForNotification(n => isActionNotification(n, 'session/turnComplete'));304305const allActions = client.receivedNotifications(n => n.method === 'action');306assert.ok(allActions.length > 0);307const missedFromSeq = getActionEnvelope(allActions[0]).serverSeq - 1;308309client.close();310311const client2 = new TestProtocolClient(server.port);312await client2.connect();313const result = await client2.call<ReconnectResult>('reconnect', {314clientId: 'test-reconnect',315lastSeenServerSeq: missedFromSeq,316subscriptions: [sessionUri],317});318319assert.ok(result.type === 'replay' || result.type === 'snapshot', 'should receive replay or snapshot');320if (result.type === 'replay') {321assert.ok(result.actions.length > 0, 'should have replayed actions');322}323324client2.close();325});326});327328329