Path: blob/main/src/vs/platform/agentHost/common/state/agentSubscription.ts
13399 views
/*---------------------------------------------------------------------------------------------1* Copyright (c) Microsoft Corporation. All rights reserved.2* Licensed under the MIT License. See License.txt in the project root for license information.3*--------------------------------------------------------------------------------------------*/45import { Emitter, Event } from '../../../../base/common/event.js';6import { Disposable, IReference } from '../../../../base/common/lifecycle.js';7import { ResourceMap } from '../../../../base/common/map.js';8import { IObservable, observableFromEvent } from '../../../../base/common/observable.js';9import { URI } from '../../../../base/common/uri.js';10import { ActionEnvelope, IRootConfigChangedAction, SessionAction, StateAction, isSessionAction } from './sessionActions.js';11import { rootReducer, sessionReducer } from './sessionReducers.js';12import { terminalReducer } from './protocol/reducers.js';13import type { RootAction, SessionAction as IProtocolSessionAction, TerminalAction } from './protocol/action-origin.generated.js';14import type { RootState, SessionState, TerminalState } from './protocol/state.js';15import type { IStateSnapshot } from './sessionProtocol.js';16import { StateComponents } from './sessionState.js';1718// --- Public API --------------------------------------------------------------1920/**21* A read-only subscription to an agent host resource (root, session, or terminal).22*23* Subscriptions are hydrated from an initial server snapshot and kept in sync24* via action envelopes. Session subscriptions support write-ahead25* reconciliation — optimistic state is layered on top of confirmed state.26*/27export interface IAgentSubscription<T> {28/**29* The current state value. For write-ahead subscriptions (sessions) this30* reflects the optimistic state (confirmed + pending replayed). For31* server-only subscriptions (root, terminal) this equals `verifiedValue`.32*33* `undefined` until the first snapshot arrives. An `Error` if subscription34* failed.35*/36readonly value: T | Error | undefined;3738/**39* The server-confirmed state with no pending optimistic actions applied.40* `undefined` until the first snapshot arrives.41*/42readonly verifiedValue: T | undefined;4344/** Fires when {@link value} changes (optimistic or confirmed). */45readonly onDidChange: Event<T>;4647/** Fires before a server-originated action is applied to this subscription's state. */48readonly onWillApplyAction: Event<ActionEnvelope>;4950/** Fires after a server-originated action is applied to this subscription's state. */51readonly onDidApplyAction: Event<ActionEnvelope>;52}5354// --- Base Implementation -----------------------------------------------------5556/**57* Base class for agent subscriptions. Handles envelope reception, confirmed58* state management, and action event emission.59*60* Subclasses provide the reducer and optionally override reconciliation61* behavior.62*/63abstract class BaseAgentSubscription<T> extends Disposable implements IAgentSubscription<T> {6465protected _confirmedState: T | undefined;66private _error: Error | undefined;67private _bufferedEnvelopes: ActionEnvelope[] | undefined;6869protected readonly _onDidChange = this._register(new Emitter<T>());70readonly onDidChange: Event<T> = this._onDidChange.event;7172protected readonly _onWillApplyAction = this._register(new Emitter<ActionEnvelope>());73readonly onWillApplyAction: Event<ActionEnvelope> = this._onWillApplyAction.event;7475protected readonly _onDidApplyAction = this._register(new Emitter<ActionEnvelope>());76readonly onDidApplyAction: Event<ActionEnvelope> = this._onDidApplyAction.event;7778protected readonly _clientId: string;79protected readonly _log: (msg: string) => void;8081constructor(clientId: string, log: (msg: string) => void) {82super();83this._clientId = clientId;84this._log = log;85}8687get value(): T | Error | undefined {88if (this._error) {89return this._error;90}91return this._getOptimisticState() ?? this._confirmedState;92}9394get verifiedValue(): T | undefined {95return this._confirmedState;96}9798/**99* Apply an initial snapshot from the server.100*/101handleSnapshot(state: T, fromSeq: number): void {102this._confirmedState = state;103this._error = undefined;104this._onSnapshotApplied(fromSeq);105this._onDidChange.fire(this.value as T);106}107108/**109* Mark this subscription as failed.110*/111setError(error: Error): void {112this._error = error;113}114115/**116* Process an incoming action envelope. The subscription determines117* whether the action is relevant via {@link _isRelevantAction}.118*/119receiveEnvelope(envelope: ActionEnvelope): void {120if (!this._isRelevantAction(envelope.action)) {121return;122}123124// Buffer actions that arrive before the snapshot has been applied.125// They're replayed in _onSnapshotApplied().126if (this._confirmedState === undefined) {127if (!this._bufferedEnvelopes) {128this._bufferedEnvelopes = [];129}130this._bufferedEnvelopes.push(envelope);131return;132}133134const isOwnAction = envelope.origin?.clientId === this._clientId;135this._onWillApplyAction.fire(envelope);136137this._reconcile(envelope, isOwnAction);138139this._onDidApplyAction.fire(envelope);140}141142/** Apply the reducer to confirmed state. Subclasses must implement. */143protected abstract _applyReducer(state: T, action: StateAction): T;144145/** Whether the given action targets this subscription. */146protected abstract _isRelevantAction(action: StateAction): boolean;147148/** Return optimistic state if write-ahead is active, otherwise `undefined`. */149protected _getOptimisticState(): T | undefined {150return undefined; // No write-ahead by default151}152153/** Hook called after a snapshot is applied. Replays buffered actions. */154protected _onSnapshotApplied(_fromSeq: number): void {155// Replay any actions that arrived before the snapshot156const buffered = this._bufferedEnvelopes;157if (buffered) {158this._bufferedEnvelopes = undefined;159for (const envelope of buffered) {160// Only replay actions with serverSeq > fromSeq (snapshot is authoritative up to fromSeq)161if (envelope.serverSeq > _fromSeq) {162const isOwnAction = envelope.origin?.clientId === this._clientId;163this._reconcile(envelope, isOwnAction);164}165}166}167}168169/**170* Default reconciliation: apply to confirmed, fire change event.171* Session subscriptions override this for write-ahead.172*/173protected _reconcile(envelope: ActionEnvelope, _isOwnAction: boolean): void {174this._confirmedState = this._applyReducer(this._confirmedState!, envelope.action);175this._onDidChange.fire(this.value as T);176}177}178179// --- Root State Subscription -------------------------------------------------180181/**182* Subscription to the root state at `agenthost:/root`.183* Server-only mutations — no write-ahead.184*/185export class RootStateSubscription extends BaseAgentSubscription<RootState> {186187protected override _applyReducer(state: RootState, action: StateAction): RootState {188return rootReducer(state, action as RootAction, this._log);189}190191protected override _isRelevantAction(action: StateAction): boolean {192return action.type.startsWith('root/');193}194}195196// --- Session State Subscription ----------------------------------------------197198interface IPendingAction {199readonly clientSeq: number;200readonly action: SessionAction;201}202203/**204* Subscription to a session at `copilot:/<uuid>`.205* Supports write-ahead reconciliation for client-dispatchable actions.206*/207export class SessionStateSubscription extends BaseAgentSubscription<SessionState> {208209private readonly _pendingActions: IPendingAction[] = [];210private _optimisticState: SessionState | undefined;211private readonly _sessionUri: string;212private readonly _seqAllocator: () => number;213214constructor(215sessionUri: string,216clientId: string,217seqAllocator: () => number,218log: (msg: string) => void,219) {220super(clientId, log);221this._sessionUri = sessionUri;222this._seqAllocator = seqAllocator;223}224225/**226* Optimistically apply a session action. Returns the clientSeq to send227* to the server so it can echo back for reconciliation.228*/229applyOptimistic(action: SessionAction): number {230const clientSeq = this._seqAllocator();231this._pendingActions.push({ clientSeq, action });232// Apply on top of current optimistic233const base = this._optimisticState ?? this.verifiedValue;234if (base) {235this._optimisticState = sessionReducer(base, action as IProtocolSessionAction, this._log);236this._onDidChange.fire(this._optimisticState);237}238return clientSeq;239}240241protected override _getOptimisticState(): SessionState | undefined {242return this._optimisticState;243}244245protected override _applyReducer(state: SessionState, action: StateAction): SessionState {246return sessionReducer(state, action as IProtocolSessionAction, this._log);247}248249protected override _isRelevantAction(action: StateAction): boolean {250return isSessionAction(action) && action.session === this._sessionUri;251}252253protected override _onSnapshotApplied(fromSeq: number): void {254// Replay buffered actions first255super._onSnapshotApplied(fromSeq);256// Re-apply pending actions on top of new confirmed state257this._recomputeOptimistic();258}259260protected override _reconcile(envelope: ActionEnvelope, isOwnAction: boolean): void {261if (isOwnAction && envelope.origin) {262const idx = this._pendingActions.findIndex(p => p.clientSeq === envelope.origin!.clientSeq);263if (idx !== -1) {264if (envelope.rejectionReason) {265this._pendingActions.splice(idx, 1);266} else {267this._confirmedApply(envelope.action);268this._pendingActions.splice(idx, 1);269}270} else {271this._confirmedApply(envelope.action);272}273} else {274this._confirmedApply(envelope.action);275}276this._recomputeOptimistic();277}278279private _confirmedApply(action: StateAction): void {280if (this._confirmedState) {281this._confirmedState = this._applyReducer(this._confirmedState, action);282}283}284285private _recomputeOptimistic(): void {286const confirmed = this._confirmedState;287if (!confirmed) {288this._optimisticState = undefined;289return;290}291292if (this._pendingActions.length === 0) {293this._optimisticState = undefined; // No pending → value falls through to confirmed294this._onDidChange.fire(confirmed);295return;296}297298let state = confirmed;299for (const pending of this._pendingActions) {300state = sessionReducer(state, pending.action as IProtocolSessionAction, this._log);301}302this._optimisticState = state;303this._onDidChange.fire(state);304}305306/**307* Clear pending actions for this session (e.g., on unsubscribe).308*/309clearPending(): void {310this._pendingActions.length = 0;311this._optimisticState = undefined;312}313}314315// --- Terminal State Subscription ---------------------------------------------316317/**318* Subscription to a terminal at an agent-host terminal URI.319* Server-only mutations — no write-ahead (terminal I/O is side-effect-only).320*/321export class TerminalStateSubscription extends BaseAgentSubscription<TerminalState> {322323private readonly _terminalUri: string;324325constructor(terminalUri: string, clientId: string, log: (msg: string) => void) {326super(clientId, log);327this._terminalUri = terminalUri;328}329330protected override _applyReducer(state: TerminalState, action: StateAction): TerminalState {331return terminalReducer(state, action as TerminalAction, this._log);332}333334protected override _isRelevantAction(action: StateAction): boolean {335return action.type.startsWith('terminal/') && (action as { terminal: string }).terminal === this._terminalUri;336}337}338339// --- Subscription Manager ----------------------------------------------------340341/**342* Manages the lifecycle of resource subscriptions for an agent connection.343*344* Provides refcounted access via {@link getSubscription} — the subscription345* is created on first acquire, subscribes to the server, and stays alive346* until the last reference is disposed.347*348* The connection feeds action envelopes to all active subscriptions via349* {@link receiveEnvelope}.350*/351export class AgentSubscriptionManager extends Disposable {352353// eslint-disable-next-line @typescript-eslint/no-explicit-any354private readonly _subscriptions = new ResourceMap<{ sub: BaseAgentSubscription<any>; refCount: number }>();355private readonly _rootState: RootStateSubscription;356private readonly _clientId: string;357private readonly _seqAllocator: () => number;358private readonly _log: (msg: string) => void;359private readonly _subscribe: (resource: URI) => Promise<IStateSnapshot>;360private readonly _unsubscribe: (resource: URI) => void;361362constructor(363clientId: string,364seqAllocator: () => number,365log: (msg: string) => void,366subscribe: (resource: URI) => Promise<IStateSnapshot>,367unsubscribe: (resource: URI) => void,368) {369super();370this._clientId = clientId;371this._seqAllocator = seqAllocator;372this._log = log;373this._subscribe = subscribe;374this._unsubscribe = unsubscribe;375this._rootState = this._register(new RootStateSubscription(clientId, log));376}377378/** The always-live root state subscription. */379get rootState(): IAgentSubscription<RootState> {380return this._rootState;381}382383/**384* Initialize the root state from a snapshot received during the385* connection handshake.386*/387handleRootSnapshot(state: RootState, fromSeq: number): void {388this._rootState.handleSnapshot(state, fromSeq);389}390391/**392* Returns an existing subscription without affecting its refcount.393* Returns `undefined` if no subscription is active for the given resource.394*/395getSubscriptionUnmanaged<T>(resource: URI): IAgentSubscription<T> | undefined {396const entry = this._subscriptions.get(resource);397return entry?.sub as unknown as IAgentSubscription<T> | undefined;398}399400/**401* Get or create a refcounted subscription to any resource. Disposing402* the returned reference decrements the refcount; when it reaches zero403* the subscription is torn down and the server is notified.404*/405getSubscription<T>(kind: StateComponents, resource: URI): IReference<IAgentSubscription<T>> {406const existing = this._subscriptions.get(resource);407if (existing) {408existing.refCount++;409return {410object: existing.sub,411dispose: () => this._releaseSubscription(resource),412};413}414415// Create new subscription based on caller-specified kind416const key = resource.toString();417const sub = this._createSubscription(kind, key);418const entry = { sub, refCount: 1 };419this._subscriptions.set(resource, entry);420421// Kick off server subscription asynchronously.422// Capture the entry reference so we can validate it hasn't been423// replaced by a new subscription for the same key (race guard).424this._subscribe(resource).then(snapshot => {425if (this._subscriptions.get(resource) === entry) {426sub.handleSnapshot(snapshot.state as never, snapshot.fromSeq);427}428}).catch(err => {429if (this._subscriptions.get(resource) === entry) {430sub.setError(err instanceof Error ? err : new Error(String(err)));431}432});433434return {435object: sub,436dispose: () => this._releaseSubscription(resource),437};438}439440/**441* Route an incoming action envelope to all active subscriptions.442*/443receiveEnvelope(envelope: ActionEnvelope): void {444// Root state gets all root actions445this._rootState.receiveEnvelope(envelope);446// Other subscriptions get filtered actions447for (const { sub } of this._subscriptions.values()) {448sub.receiveEnvelope(envelope);449}450}451452/**453* Dispatch a client action. Applies optimistically to the relevant454* subscription if applicable, then returns the clientSeq.455*/456dispatchOptimistic(action: SessionAction | TerminalAction | IRootConfigChangedAction): number {457if (isSessionAction(action)) {458const entry = this._subscriptions.get(URI.parse(action.session));459if (entry && entry.sub instanceof SessionStateSubscription) {460return entry.sub.applyOptimistic(action);461}462}463return this._seqAllocator();464}465466// eslint-disable-next-line @typescript-eslint/no-explicit-any467private _createSubscription(kind: StateComponents, key: string): BaseAgentSubscription<any> {468switch (kind) {469case StateComponents.Session:470return new SessionStateSubscription(key, this._clientId, this._seqAllocator, this._log);471case StateComponents.Terminal:472return new TerminalStateSubscription(key, this._clientId, this._log);473default:474return new TerminalStateSubscription(key, this._clientId, this._log);475}476}477478private _releaseSubscription(resource: URI): void {479const entry = this._subscriptions.get(resource);480if (!entry) {481return;482}483entry.refCount--;484if (entry.refCount <= 0) {485this._subscriptions.delete(resource);486try { this._unsubscribe(resource); } catch { /* best-effort */ }487if (entry.sub instanceof SessionStateSubscription) {488entry.sub.clearPending();489}490entry.sub.dispose();491}492}493494override dispose(): void {495for (const [resource, entry] of this._subscriptions) {496try { this._unsubscribe(resource); } catch { /* best-effort */ }497entry.sub.dispose();498}499this._subscriptions.clear();500super.dispose();501}502}503504// --- Observable Adapter ------------------------------------------------------505506/**507* Adapts an {@link IAgentSubscription} into an {@link IObservable} of the508* subscription's value. Errors and the pre-snapshot phase are surfaced as509* `undefined`; consumers that need the error itself should read510* {@link IAgentSubscription.value} directly.511*/512export function observableFromSubscription<T>(owner: object | undefined, sub: IAgentSubscription<T>): IObservable<T | undefined> {513return observableFromEvent(owner, sub.onDidChange, () => {514const v = sub.value;515return v instanceof Error ? undefined : v;516});517}518519520