Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/src/vs/platform/agentHost/common/state/agentSubscription.ts
13399 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
import { Emitter, Event } from '../../../../base/common/event.js';
7
import { Disposable, IReference } from '../../../../base/common/lifecycle.js';
8
import { ResourceMap } from '../../../../base/common/map.js';
9
import { IObservable, observableFromEvent } from '../../../../base/common/observable.js';
10
import { URI } from '../../../../base/common/uri.js';
11
import { ActionEnvelope, IRootConfigChangedAction, SessionAction, StateAction, isSessionAction } from './sessionActions.js';
12
import { rootReducer, sessionReducer } from './sessionReducers.js';
13
import { terminalReducer } from './protocol/reducers.js';
14
import type { RootAction, SessionAction as IProtocolSessionAction, TerminalAction } from './protocol/action-origin.generated.js';
15
import type { RootState, SessionState, TerminalState } from './protocol/state.js';
16
import type { IStateSnapshot } from './sessionProtocol.js';
17
import { StateComponents } from './sessionState.js';
18
19
// --- Public API --------------------------------------------------------------
20
21
/**
22
* A read-only subscription to an agent host resource (root, session, or terminal).
23
*
24
* Subscriptions are hydrated from an initial server snapshot and kept in sync
25
* via action envelopes. Session subscriptions support write-ahead
26
* reconciliation — optimistic state is layered on top of confirmed state.
27
*/
28
export interface IAgentSubscription<T> {
29
/**
30
* The current state value. For write-ahead subscriptions (sessions) this
31
* reflects the optimistic state (confirmed + pending replayed). For
32
* server-only subscriptions (root, terminal) this equals `verifiedValue`.
33
*
34
* `undefined` until the first snapshot arrives. An `Error` if subscription
35
* failed.
36
*/
37
readonly value: T | Error | undefined;
38
39
/**
40
* The server-confirmed state with no pending optimistic actions applied.
41
* `undefined` until the first snapshot arrives.
42
*/
43
readonly verifiedValue: T | undefined;
44
45
/** Fires when {@link value} changes (optimistic or confirmed). */
46
readonly onDidChange: Event<T>;
47
48
/** Fires before a server-originated action is applied to this subscription's state. */
49
readonly onWillApplyAction: Event<ActionEnvelope>;
50
51
/** Fires after a server-originated action is applied to this subscription's state. */
52
readonly onDidApplyAction: Event<ActionEnvelope>;
53
}
54
55
// --- Base Implementation -----------------------------------------------------
56
57
/**
58
* Base class for agent subscriptions. Handles envelope reception, confirmed
59
* state management, and action event emission.
60
*
61
* Subclasses provide the reducer and optionally override reconciliation
62
* behavior.
63
*/
64
abstract class BaseAgentSubscription<T> extends Disposable implements IAgentSubscription<T> {
65
66
protected _confirmedState: T | undefined;
67
private _error: Error | undefined;
68
private _bufferedEnvelopes: ActionEnvelope[] | undefined;
69
70
protected readonly _onDidChange = this._register(new Emitter<T>());
71
readonly onDidChange: Event<T> = this._onDidChange.event;
72
73
protected readonly _onWillApplyAction = this._register(new Emitter<ActionEnvelope>());
74
readonly onWillApplyAction: Event<ActionEnvelope> = this._onWillApplyAction.event;
75
76
protected readonly _onDidApplyAction = this._register(new Emitter<ActionEnvelope>());
77
readonly onDidApplyAction: Event<ActionEnvelope> = this._onDidApplyAction.event;
78
79
protected readonly _clientId: string;
80
protected readonly _log: (msg: string) => void;
81
82
constructor(clientId: string, log: (msg: string) => void) {
83
super();
84
this._clientId = clientId;
85
this._log = log;
86
}
87
88
get value(): T | Error | undefined {
89
if (this._error) {
90
return this._error;
91
}
92
return this._getOptimisticState() ?? this._confirmedState;
93
}
94
95
get verifiedValue(): T | undefined {
96
return this._confirmedState;
97
}
98
99
/**
100
* Apply an initial snapshot from the server.
101
*/
102
handleSnapshot(state: T, fromSeq: number): void {
103
this._confirmedState = state;
104
this._error = undefined;
105
this._onSnapshotApplied(fromSeq);
106
this._onDidChange.fire(this.value as T);
107
}
108
109
/**
110
* Mark this subscription as failed.
111
*/
112
setError(error: Error): void {
113
this._error = error;
114
}
115
116
/**
117
* Process an incoming action envelope. The subscription determines
118
* whether the action is relevant via {@link _isRelevantAction}.
119
*/
120
receiveEnvelope(envelope: ActionEnvelope): void {
121
if (!this._isRelevantAction(envelope.action)) {
122
return;
123
}
124
125
// Buffer actions that arrive before the snapshot has been applied.
126
// They're replayed in _onSnapshotApplied().
127
if (this._confirmedState === undefined) {
128
if (!this._bufferedEnvelopes) {
129
this._bufferedEnvelopes = [];
130
}
131
this._bufferedEnvelopes.push(envelope);
132
return;
133
}
134
135
const isOwnAction = envelope.origin?.clientId === this._clientId;
136
this._onWillApplyAction.fire(envelope);
137
138
this._reconcile(envelope, isOwnAction);
139
140
this._onDidApplyAction.fire(envelope);
141
}
142
143
/** Apply the reducer to confirmed state. Subclasses must implement. */
144
protected abstract _applyReducer(state: T, action: StateAction): T;
145
146
/** Whether the given action targets this subscription. */
147
protected abstract _isRelevantAction(action: StateAction): boolean;
148
149
/** Return optimistic state if write-ahead is active, otherwise `undefined`. */
150
protected _getOptimisticState(): T | undefined {
151
return undefined; // No write-ahead by default
152
}
153
154
/** Hook called after a snapshot is applied. Replays buffered actions. */
155
protected _onSnapshotApplied(_fromSeq: number): void {
156
// Replay any actions that arrived before the snapshot
157
const buffered = this._bufferedEnvelopes;
158
if (buffered) {
159
this._bufferedEnvelopes = undefined;
160
for (const envelope of buffered) {
161
// Only replay actions with serverSeq > fromSeq (snapshot is authoritative up to fromSeq)
162
if (envelope.serverSeq > _fromSeq) {
163
const isOwnAction = envelope.origin?.clientId === this._clientId;
164
this._reconcile(envelope, isOwnAction);
165
}
166
}
167
}
168
}
169
170
/**
171
* Default reconciliation: apply to confirmed, fire change event.
172
* Session subscriptions override this for write-ahead.
173
*/
174
protected _reconcile(envelope: ActionEnvelope, _isOwnAction: boolean): void {
175
this._confirmedState = this._applyReducer(this._confirmedState!, envelope.action);
176
this._onDidChange.fire(this.value as T);
177
}
178
}
179
180
// --- Root State Subscription -------------------------------------------------
181
182
/**
183
* Subscription to the root state at `agenthost:/root`.
184
* Server-only mutations — no write-ahead.
185
*/
186
export class RootStateSubscription extends BaseAgentSubscription<RootState> {
187
188
protected override _applyReducer(state: RootState, action: StateAction): RootState {
189
return rootReducer(state, action as RootAction, this._log);
190
}
191
192
protected override _isRelevantAction(action: StateAction): boolean {
193
return action.type.startsWith('root/');
194
}
195
}
196
197
// --- Session State Subscription ----------------------------------------------
198
199
interface IPendingAction {
200
readonly clientSeq: number;
201
readonly action: SessionAction;
202
}
203
204
/**
205
* Subscription to a session at `copilot:/<uuid>`.
206
* Supports write-ahead reconciliation for client-dispatchable actions.
207
*/
208
export class SessionStateSubscription extends BaseAgentSubscription<SessionState> {
209
210
private readonly _pendingActions: IPendingAction[] = [];
211
private _optimisticState: SessionState | undefined;
212
private readonly _sessionUri: string;
213
private readonly _seqAllocator: () => number;
214
215
constructor(
216
sessionUri: string,
217
clientId: string,
218
seqAllocator: () => number,
219
log: (msg: string) => void,
220
) {
221
super(clientId, log);
222
this._sessionUri = sessionUri;
223
this._seqAllocator = seqAllocator;
224
}
225
226
/**
227
* Optimistically apply a session action. Returns the clientSeq to send
228
* to the server so it can echo back for reconciliation.
229
*/
230
applyOptimistic(action: SessionAction): number {
231
const clientSeq = this._seqAllocator();
232
this._pendingActions.push({ clientSeq, action });
233
// Apply on top of current optimistic
234
const base = this._optimisticState ?? this.verifiedValue;
235
if (base) {
236
this._optimisticState = sessionReducer(base, action as IProtocolSessionAction, this._log);
237
this._onDidChange.fire(this._optimisticState);
238
}
239
return clientSeq;
240
}
241
242
protected override _getOptimisticState(): SessionState | undefined {
243
return this._optimisticState;
244
}
245
246
protected override _applyReducer(state: SessionState, action: StateAction): SessionState {
247
return sessionReducer(state, action as IProtocolSessionAction, this._log);
248
}
249
250
protected override _isRelevantAction(action: StateAction): boolean {
251
return isSessionAction(action) && action.session === this._sessionUri;
252
}
253
254
protected override _onSnapshotApplied(fromSeq: number): void {
255
// Replay buffered actions first
256
super._onSnapshotApplied(fromSeq);
257
// Re-apply pending actions on top of new confirmed state
258
this._recomputeOptimistic();
259
}
260
261
protected override _reconcile(envelope: ActionEnvelope, isOwnAction: boolean): void {
262
if (isOwnAction && envelope.origin) {
263
const idx = this._pendingActions.findIndex(p => p.clientSeq === envelope.origin!.clientSeq);
264
if (idx !== -1) {
265
if (envelope.rejectionReason) {
266
this._pendingActions.splice(idx, 1);
267
} else {
268
this._confirmedApply(envelope.action);
269
this._pendingActions.splice(idx, 1);
270
}
271
} else {
272
this._confirmedApply(envelope.action);
273
}
274
} else {
275
this._confirmedApply(envelope.action);
276
}
277
this._recomputeOptimistic();
278
}
279
280
private _confirmedApply(action: StateAction): void {
281
if (this._confirmedState) {
282
this._confirmedState = this._applyReducer(this._confirmedState, action);
283
}
284
}
285
286
private _recomputeOptimistic(): void {
287
const confirmed = this._confirmedState;
288
if (!confirmed) {
289
this._optimisticState = undefined;
290
return;
291
}
292
293
if (this._pendingActions.length === 0) {
294
this._optimisticState = undefined; // No pending → value falls through to confirmed
295
this._onDidChange.fire(confirmed);
296
return;
297
}
298
299
let state = confirmed;
300
for (const pending of this._pendingActions) {
301
state = sessionReducer(state, pending.action as IProtocolSessionAction, this._log);
302
}
303
this._optimisticState = state;
304
this._onDidChange.fire(state);
305
}
306
307
/**
308
* Clear pending actions for this session (e.g., on unsubscribe).
309
*/
310
clearPending(): void {
311
this._pendingActions.length = 0;
312
this._optimisticState = undefined;
313
}
314
}
315
316
// --- Terminal State Subscription ---------------------------------------------
317
318
/**
319
* Subscription to a terminal at an agent-host terminal URI.
320
* Server-only mutations — no write-ahead (terminal I/O is side-effect-only).
321
*/
322
export class TerminalStateSubscription extends BaseAgentSubscription<TerminalState> {
323
324
private readonly _terminalUri: string;
325
326
constructor(terminalUri: string, clientId: string, log: (msg: string) => void) {
327
super(clientId, log);
328
this._terminalUri = terminalUri;
329
}
330
331
protected override _applyReducer(state: TerminalState, action: StateAction): TerminalState {
332
return terminalReducer(state, action as TerminalAction, this._log);
333
}
334
335
protected override _isRelevantAction(action: StateAction): boolean {
336
return action.type.startsWith('terminal/') && (action as { terminal: string }).terminal === this._terminalUri;
337
}
338
}
339
340
// --- Subscription Manager ----------------------------------------------------
341
342
/**
343
* Manages the lifecycle of resource subscriptions for an agent connection.
344
*
345
* Provides refcounted access via {@link getSubscription} — the subscription
346
* is created on first acquire, subscribes to the server, and stays alive
347
* until the last reference is disposed.
348
*
349
* The connection feeds action envelopes to all active subscriptions via
350
* {@link receiveEnvelope}.
351
*/
352
export class AgentSubscriptionManager extends Disposable {
353
354
// eslint-disable-next-line @typescript-eslint/no-explicit-any
355
private readonly _subscriptions = new ResourceMap<{ sub: BaseAgentSubscription<any>; refCount: number }>();
356
private readonly _rootState: RootStateSubscription;
357
private readonly _clientId: string;
358
private readonly _seqAllocator: () => number;
359
private readonly _log: (msg: string) => void;
360
private readonly _subscribe: (resource: URI) => Promise<IStateSnapshot>;
361
private readonly _unsubscribe: (resource: URI) => void;
362
363
constructor(
364
clientId: string,
365
seqAllocator: () => number,
366
log: (msg: string) => void,
367
subscribe: (resource: URI) => Promise<IStateSnapshot>,
368
unsubscribe: (resource: URI) => void,
369
) {
370
super();
371
this._clientId = clientId;
372
this._seqAllocator = seqAllocator;
373
this._log = log;
374
this._subscribe = subscribe;
375
this._unsubscribe = unsubscribe;
376
this._rootState = this._register(new RootStateSubscription(clientId, log));
377
}
378
379
/** The always-live root state subscription. */
380
get rootState(): IAgentSubscription<RootState> {
381
return this._rootState;
382
}
383
384
/**
385
* Initialize the root state from a snapshot received during the
386
* connection handshake.
387
*/
388
handleRootSnapshot(state: RootState, fromSeq: number): void {
389
this._rootState.handleSnapshot(state, fromSeq);
390
}
391
392
/**
393
* Returns an existing subscription without affecting its refcount.
394
* Returns `undefined` if no subscription is active for the given resource.
395
*/
396
getSubscriptionUnmanaged<T>(resource: URI): IAgentSubscription<T> | undefined {
397
const entry = this._subscriptions.get(resource);
398
return entry?.sub as unknown as IAgentSubscription<T> | undefined;
399
}
400
401
/**
402
* Get or create a refcounted subscription to any resource. Disposing
403
* the returned reference decrements the refcount; when it reaches zero
404
* the subscription is torn down and the server is notified.
405
*/
406
getSubscription<T>(kind: StateComponents, resource: URI): IReference<IAgentSubscription<T>> {
407
const existing = this._subscriptions.get(resource);
408
if (existing) {
409
existing.refCount++;
410
return {
411
object: existing.sub,
412
dispose: () => this._releaseSubscription(resource),
413
};
414
}
415
416
// Create new subscription based on caller-specified kind
417
const key = resource.toString();
418
const sub = this._createSubscription(kind, key);
419
const entry = { sub, refCount: 1 };
420
this._subscriptions.set(resource, entry);
421
422
// Kick off server subscription asynchronously.
423
// Capture the entry reference so we can validate it hasn't been
424
// replaced by a new subscription for the same key (race guard).
425
this._subscribe(resource).then(snapshot => {
426
if (this._subscriptions.get(resource) === entry) {
427
sub.handleSnapshot(snapshot.state as never, snapshot.fromSeq);
428
}
429
}).catch(err => {
430
if (this._subscriptions.get(resource) === entry) {
431
sub.setError(err instanceof Error ? err : new Error(String(err)));
432
}
433
});
434
435
return {
436
object: sub,
437
dispose: () => this._releaseSubscription(resource),
438
};
439
}
440
441
/**
442
* Route an incoming action envelope to all active subscriptions.
443
*/
444
receiveEnvelope(envelope: ActionEnvelope): void {
445
// Root state gets all root actions
446
this._rootState.receiveEnvelope(envelope);
447
// Other subscriptions get filtered actions
448
for (const { sub } of this._subscriptions.values()) {
449
sub.receiveEnvelope(envelope);
450
}
451
}
452
453
/**
454
* Dispatch a client action. Applies optimistically to the relevant
455
* subscription if applicable, then returns the clientSeq.
456
*/
457
dispatchOptimistic(action: SessionAction | TerminalAction | IRootConfigChangedAction): number {
458
if (isSessionAction(action)) {
459
const entry = this._subscriptions.get(URI.parse(action.session));
460
if (entry && entry.sub instanceof SessionStateSubscription) {
461
return entry.sub.applyOptimistic(action);
462
}
463
}
464
return this._seqAllocator();
465
}
466
467
// eslint-disable-next-line @typescript-eslint/no-explicit-any
468
private _createSubscription(kind: StateComponents, key: string): BaseAgentSubscription<any> {
469
switch (kind) {
470
case StateComponents.Session:
471
return new SessionStateSubscription(key, this._clientId, this._seqAllocator, this._log);
472
case StateComponents.Terminal:
473
return new TerminalStateSubscription(key, this._clientId, this._log);
474
default:
475
return new TerminalStateSubscription(key, this._clientId, this._log);
476
}
477
}
478
479
private _releaseSubscription(resource: URI): void {
480
const entry = this._subscriptions.get(resource);
481
if (!entry) {
482
return;
483
}
484
entry.refCount--;
485
if (entry.refCount <= 0) {
486
this._subscriptions.delete(resource);
487
try { this._unsubscribe(resource); } catch { /* best-effort */ }
488
if (entry.sub instanceof SessionStateSubscription) {
489
entry.sub.clearPending();
490
}
491
entry.sub.dispose();
492
}
493
}
494
495
override dispose(): void {
496
for (const [resource, entry] of this._subscriptions) {
497
try { this._unsubscribe(resource); } catch { /* best-effort */ }
498
entry.sub.dispose();
499
}
500
this._subscriptions.clear();
501
super.dispose();
502
}
503
}
504
505
// --- Observable Adapter ------------------------------------------------------
506
507
/**
508
* Adapts an {@link IAgentSubscription} into an {@link IObservable} of the
509
* subscription's value. Errors and the pre-snapshot phase are surfaced as
510
* `undefined`; consumers that need the error itself should read
511
* {@link IAgentSubscription.value} directly.
512
*/
513
export function observableFromSubscription<T>(owner: object | undefined, sub: IAgentSubscription<T>): IObservable<T | undefined> {
514
return observableFromEvent(owner, sub.onDidChange, () => {
515
const v = sub.value;
516
return v instanceof Error ? undefined : v;
517
});
518
}
519
520