Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/src/vs/base/parts/ipc/common/ipc.ts
4780 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
import { getRandomElement } from '../../../common/arrays.js';
7
import { CancelablePromise, createCancelablePromise, timeout } from '../../../common/async.js';
8
import { VSBuffer } from '../../../common/buffer.js';
9
import { CancellationToken, CancellationTokenSource } from '../../../common/cancellation.js';
10
import { memoize } from '../../../common/decorators.js';
11
import { CancellationError, ErrorNoTelemetry } from '../../../common/errors.js';
12
import { Emitter, Event, EventMultiplexer, Relay } from '../../../common/event.js';
13
import { createSingleCallFunction } from '../../../common/functional.js';
14
import { DisposableStore, dispose, IDisposable, toDisposable } from '../../../common/lifecycle.js';
15
import { revive } from '../../../common/marshalling.js';
16
import * as strings from '../../../common/strings.js';
17
import { isFunction, isUndefinedOrNull } from '../../../common/types.js';
18
19
/**
20
* An `IChannel` is an abstraction over a collection of commands.
21
* You can `call` several commands on a channel, each taking at
22
* most one single argument. A `call` always returns a promise
23
* with at most one single return value.
24
*/
25
export interface IChannel {
26
call<T>(command: string, arg?: any, cancellationToken?: CancellationToken): Promise<T>;
27
listen<T>(event: string, arg?: any): Event<T>;
28
}
29
30
/**
31
* An `IServerChannel` is the counter part to `IChannel`,
32
* on the server-side. You should implement this interface
33
* if you'd like to handle remote promises or events.
34
*/
35
export interface IServerChannel<TContext = string> {
36
call<T>(ctx: TContext, command: string, arg?: any, cancellationToken?: CancellationToken): Promise<T>;
37
listen<T>(ctx: TContext, event: string, arg?: any): Event<T>;
38
}
39
40
const enum RequestType {
41
Promise = 100,
42
PromiseCancel = 101,
43
EventListen = 102,
44
EventDispose = 103
45
}
46
47
function requestTypeToStr(type: RequestType): string {
48
switch (type) {
49
case RequestType.Promise:
50
return 'req';
51
case RequestType.PromiseCancel:
52
return 'cancel';
53
case RequestType.EventListen:
54
return 'subscribe';
55
case RequestType.EventDispose:
56
return 'unsubscribe';
57
}
58
}
59
60
type IRawPromiseRequest = { type: RequestType.Promise; id: number; channelName: string; name: string; arg: any };
61
type IRawPromiseCancelRequest = { type: RequestType.PromiseCancel; id: number };
62
type IRawEventListenRequest = { type: RequestType.EventListen; id: number; channelName: string; name: string; arg: any };
63
type IRawEventDisposeRequest = { type: RequestType.EventDispose; id: number };
64
type IRawRequest = IRawPromiseRequest | IRawPromiseCancelRequest | IRawEventListenRequest | IRawEventDisposeRequest;
65
66
const enum ResponseType {
67
Initialize = 200,
68
PromiseSuccess = 201,
69
PromiseError = 202,
70
PromiseErrorObj = 203,
71
EventFire = 204
72
}
73
74
function responseTypeToStr(type: ResponseType): string {
75
switch (type) {
76
case ResponseType.Initialize:
77
return `init`;
78
case ResponseType.PromiseSuccess:
79
return `reply:`;
80
case ResponseType.PromiseError:
81
case ResponseType.PromiseErrorObj:
82
return `replyErr:`;
83
case ResponseType.EventFire:
84
return `event:`;
85
}
86
}
87
88
type IRawInitializeResponse = { type: ResponseType.Initialize };
89
type IRawPromiseSuccessResponse = { type: ResponseType.PromiseSuccess; id: number; data: any };
90
type IRawPromiseErrorResponse = { type: ResponseType.PromiseError; id: number; data: { message: string; name: string; stack: string[] | undefined } };
91
type IRawPromiseErrorObjResponse = { type: ResponseType.PromiseErrorObj; id: number; data: any };
92
type IRawEventFireResponse = { type: ResponseType.EventFire; id: number; data: any };
93
type IRawResponse = IRawInitializeResponse | IRawPromiseSuccessResponse | IRawPromiseErrorResponse | IRawPromiseErrorObjResponse | IRawEventFireResponse;
94
95
interface IHandler {
96
(response: IRawResponse): void;
97
}
98
99
export interface IMessagePassingProtocol {
100
send(buffer: VSBuffer): void;
101
readonly onMessage: Event<VSBuffer>;
102
/**
103
* Wait for the write buffer (if applicable) to become empty.
104
*/
105
drain?(): Promise<void>;
106
}
107
108
enum State {
109
Uninitialized,
110
Idle
111
}
112
113
/**
114
* An `IChannelServer` hosts a collection of channels. You are
115
* able to register channels onto it, provided a channel name.
116
*/
117
export interface IChannelServer<TContext = string> {
118
registerChannel(channelName: string, channel: IServerChannel<TContext>): void;
119
}
120
121
/**
122
* An `IChannelClient` has access to a collection of channels. You
123
* are able to get those channels, given their channel name.
124
*/
125
export interface IChannelClient {
126
getChannel<T extends IChannel>(channelName: string): T;
127
}
128
129
export interface Client<TContext> {
130
readonly ctx: TContext;
131
}
132
133
export interface IConnectionHub<TContext> {
134
readonly connections: Connection<TContext>[];
135
readonly onDidAddConnection: Event<Connection<TContext>>;
136
readonly onDidRemoveConnection: Event<Connection<TContext>>;
137
}
138
139
/**
140
* An `IClientRouter` is responsible for routing calls to specific
141
* channels, in scenarios in which there are multiple possible
142
* channels (each from a separate client) to pick from.
143
*/
144
export interface IClientRouter<TContext = string> {
145
routeCall(hub: IConnectionHub<TContext>, command: string, arg?: any, cancellationToken?: CancellationToken): Promise<Client<TContext>>;
146
routeEvent(hub: IConnectionHub<TContext>, event: string, arg?: any): Promise<Client<TContext>>;
147
}
148
149
/**
150
* Similar to the `IChannelClient`, you can get channels from this
151
* collection of channels. The difference being that in the
152
* `IRoutingChannelClient`, there are multiple clients providing
153
* the same channel. You'll need to pass in an `IClientRouter` in
154
* order to pick the right one.
155
*/
156
export interface IRoutingChannelClient<TContext = string> {
157
getChannel<T extends IChannel>(channelName: string, router?: IClientRouter<TContext>): T;
158
}
159
160
interface IReader {
161
read(bytes: number): VSBuffer;
162
}
163
164
interface IWriter {
165
write(buffer: VSBuffer): void;
166
}
167
168
169
/**
170
* @see https://en.wikipedia.org/wiki/Variable-length_quantity
171
*/
172
function readIntVQL(reader: IReader) {
173
let value = 0;
174
for (let n = 0; ; n += 7) {
175
const next = reader.read(1);
176
value |= (next.buffer[0] & 0b01111111) << n;
177
if (!(next.buffer[0] & 0b10000000)) {
178
return value;
179
}
180
}
181
}
182
183
const vqlZero = createOneByteBuffer(0);
184
185
/**
186
* @see https://en.wikipedia.org/wiki/Variable-length_quantity
187
*/
188
function writeInt32VQL(writer: IWriter, value: number) {
189
if (value === 0) {
190
writer.write(vqlZero);
191
return;
192
}
193
194
let len = 0;
195
for (let v2 = value; v2 !== 0; v2 = v2 >>> 7) {
196
len++;
197
}
198
199
const scratch = VSBuffer.alloc(len);
200
for (let i = 0; value !== 0; i++) {
201
scratch.buffer[i] = value & 0b01111111;
202
value = value >>> 7;
203
if (value > 0) {
204
scratch.buffer[i] |= 0b10000000;
205
}
206
}
207
208
writer.write(scratch);
209
}
210
211
export class BufferReader implements IReader {
212
213
private pos = 0;
214
215
constructor(private buffer: VSBuffer) { }
216
217
read(bytes: number): VSBuffer {
218
const result = this.buffer.slice(this.pos, this.pos + bytes);
219
this.pos += result.byteLength;
220
return result;
221
}
222
}
223
224
export class BufferWriter implements IWriter {
225
226
private buffers: VSBuffer[] = [];
227
228
get buffer(): VSBuffer {
229
return VSBuffer.concat(this.buffers);
230
}
231
232
write(buffer: VSBuffer): void {
233
this.buffers.push(buffer);
234
}
235
}
236
237
enum DataType {
238
Undefined = 0,
239
String = 1,
240
Buffer = 2,
241
VSBuffer = 3,
242
Array = 4,
243
Object = 5,
244
Int = 6
245
}
246
247
function createOneByteBuffer(value: number): VSBuffer {
248
const result = VSBuffer.alloc(1);
249
result.writeUInt8(value, 0);
250
return result;
251
}
252
253
const BufferPresets = {
254
Undefined: createOneByteBuffer(DataType.Undefined),
255
String: createOneByteBuffer(DataType.String),
256
Buffer: createOneByteBuffer(DataType.Buffer),
257
VSBuffer: createOneByteBuffer(DataType.VSBuffer),
258
Array: createOneByteBuffer(DataType.Array),
259
Object: createOneByteBuffer(DataType.Object),
260
Uint: createOneByteBuffer(DataType.Int),
261
};
262
263
export function serialize(writer: IWriter, data: any): void {
264
if (typeof data === 'undefined') {
265
writer.write(BufferPresets.Undefined);
266
} else if (typeof data === 'string') {
267
const buffer = VSBuffer.fromString(data);
268
writer.write(BufferPresets.String);
269
writeInt32VQL(writer, buffer.byteLength);
270
writer.write(buffer);
271
} else if (VSBuffer.isNativeBuffer(data)) {
272
const buffer = VSBuffer.wrap(data);
273
writer.write(BufferPresets.Buffer);
274
writeInt32VQL(writer, buffer.byteLength);
275
writer.write(buffer);
276
} else if (data instanceof VSBuffer) {
277
writer.write(BufferPresets.VSBuffer);
278
writeInt32VQL(writer, data.byteLength);
279
writer.write(data);
280
} else if (Array.isArray(data)) {
281
writer.write(BufferPresets.Array);
282
writeInt32VQL(writer, data.length);
283
284
for (const el of data) {
285
serialize(writer, el);
286
}
287
} else if (typeof data === 'number' && (data | 0) === data) {
288
// write a vql if it's a number that we can do bitwise operations on
289
writer.write(BufferPresets.Uint);
290
writeInt32VQL(writer, data);
291
} else {
292
const buffer = VSBuffer.fromString(JSON.stringify(data));
293
writer.write(BufferPresets.Object);
294
writeInt32VQL(writer, buffer.byteLength);
295
writer.write(buffer);
296
}
297
}
298
299
export function deserialize(reader: IReader): any {
300
const type = reader.read(1).readUInt8(0);
301
302
switch (type) {
303
case DataType.Undefined: return undefined;
304
case DataType.String: return reader.read(readIntVQL(reader)).toString();
305
case DataType.Buffer: return reader.read(readIntVQL(reader)).buffer;
306
case DataType.VSBuffer: return reader.read(readIntVQL(reader));
307
case DataType.Array: {
308
const length = readIntVQL(reader);
309
const result: any[] = [];
310
311
for (let i = 0; i < length; i++) {
312
result.push(deserialize(reader));
313
}
314
315
return result;
316
}
317
case DataType.Object: return JSON.parse(reader.read(readIntVQL(reader)).toString());
318
case DataType.Int: return readIntVQL(reader);
319
}
320
}
321
322
interface PendingRequest {
323
request: IRawPromiseRequest | IRawEventListenRequest;
324
timeoutTimer: Timeout;
325
}
326
327
export class ChannelServer<TContext = string> implements IChannelServer<TContext>, IDisposable {
328
329
private channels = new Map<string, IServerChannel<TContext>>();
330
private activeRequests = new Map<number, IDisposable>();
331
private protocolListener: IDisposable | null;
332
333
// Requests might come in for channels which are not yet registered.
334
// They will timeout after `timeoutDelay`.
335
private pendingRequests = new Map<string, PendingRequest[]>();
336
337
constructor(private protocol: IMessagePassingProtocol, private ctx: TContext, private logger: IIPCLogger | null = null, private timeoutDelay = 1000) {
338
this.protocolListener = this.protocol.onMessage(msg => this.onRawMessage(msg));
339
this.sendResponse({ type: ResponseType.Initialize });
340
}
341
342
registerChannel(channelName: string, channel: IServerChannel<TContext>): void {
343
this.channels.set(channelName, channel);
344
345
// https://github.com/microsoft/vscode/issues/72531
346
setTimeout(() => this.flushPendingRequests(channelName), 0);
347
}
348
349
private sendResponse(response: IRawResponse): void {
350
switch (response.type) {
351
case ResponseType.Initialize: {
352
const msgLength = this.send([response.type]);
353
this.logger?.logOutgoing(msgLength, 0, RequestInitiator.OtherSide, responseTypeToStr(response.type));
354
return;
355
}
356
357
case ResponseType.PromiseSuccess:
358
case ResponseType.PromiseError:
359
case ResponseType.EventFire:
360
case ResponseType.PromiseErrorObj: {
361
const msgLength = this.send([response.type, response.id], response.data);
362
this.logger?.logOutgoing(msgLength, response.id, RequestInitiator.OtherSide, responseTypeToStr(response.type), response.data);
363
return;
364
}
365
}
366
}
367
368
private send(header: unknown, body: any = undefined): number {
369
const writer = new BufferWriter();
370
serialize(writer, header);
371
serialize(writer, body);
372
return this.sendBuffer(writer.buffer);
373
}
374
375
private sendBuffer(message: VSBuffer): number {
376
try {
377
this.protocol.send(message);
378
return message.byteLength;
379
} catch (err) {
380
// noop
381
return 0;
382
}
383
}
384
385
private onRawMessage(message: VSBuffer): void {
386
const reader = new BufferReader(message);
387
const header = deserialize(reader);
388
const body = deserialize(reader);
389
const type = header[0] as RequestType;
390
391
switch (type) {
392
case RequestType.Promise:
393
this.logger?.logIncoming(message.byteLength, header[1], RequestInitiator.OtherSide, `${requestTypeToStr(type)}: ${header[2]}.${header[3]}`, body);
394
return this.onPromise({ type, id: header[1], channelName: header[2], name: header[3], arg: body });
395
case RequestType.EventListen:
396
this.logger?.logIncoming(message.byteLength, header[1], RequestInitiator.OtherSide, `${requestTypeToStr(type)}: ${header[2]}.${header[3]}`, body);
397
return this.onEventListen({ type, id: header[1], channelName: header[2], name: header[3], arg: body });
398
case RequestType.PromiseCancel:
399
this.logger?.logIncoming(message.byteLength, header[1], RequestInitiator.OtherSide, `${requestTypeToStr(type)}`);
400
return this.disposeActiveRequest({ type, id: header[1] });
401
case RequestType.EventDispose:
402
this.logger?.logIncoming(message.byteLength, header[1], RequestInitiator.OtherSide, `${requestTypeToStr(type)}`);
403
return this.disposeActiveRequest({ type, id: header[1] });
404
}
405
}
406
407
private onPromise(request: IRawPromiseRequest): void {
408
const channel = this.channels.get(request.channelName);
409
410
if (!channel) {
411
this.collectPendingRequest(request);
412
return;
413
}
414
415
const cancellationTokenSource = new CancellationTokenSource();
416
let promise: Promise<any>;
417
418
try {
419
promise = channel.call(this.ctx, request.name, request.arg, cancellationTokenSource.token);
420
} catch (err) {
421
promise = Promise.reject(err);
422
}
423
424
const id = request.id;
425
426
promise.then(data => {
427
this.sendResponse({ id, data, type: ResponseType.PromiseSuccess });
428
}, err => {
429
if (err instanceof Error) {
430
this.sendResponse({
431
id, data: {
432
message: err.message,
433
name: err.name,
434
stack: err.stack ? err.stack.split('\n') : undefined
435
}, type: ResponseType.PromiseError
436
});
437
} else {
438
this.sendResponse({ id, data: err, type: ResponseType.PromiseErrorObj });
439
}
440
}).finally(() => {
441
disposable.dispose();
442
this.activeRequests.delete(request.id);
443
});
444
445
const disposable = toDisposable(() => cancellationTokenSource.cancel());
446
this.activeRequests.set(request.id, disposable);
447
}
448
449
private onEventListen(request: IRawEventListenRequest): void {
450
const channel = this.channels.get(request.channelName);
451
452
if (!channel) {
453
this.collectPendingRequest(request);
454
return;
455
}
456
457
const id = request.id;
458
const event = channel.listen(this.ctx, request.name, request.arg);
459
const disposable = event(data => this.sendResponse({ id, data, type: ResponseType.EventFire }));
460
461
this.activeRequests.set(request.id, disposable);
462
}
463
464
private disposeActiveRequest(request: IRawRequest): void {
465
const disposable = this.activeRequests.get(request.id);
466
467
if (disposable) {
468
disposable.dispose();
469
this.activeRequests.delete(request.id);
470
}
471
}
472
473
private collectPendingRequest(request: IRawPromiseRequest | IRawEventListenRequest): void {
474
let pendingRequests = this.pendingRequests.get(request.channelName);
475
476
if (!pendingRequests) {
477
pendingRequests = [];
478
this.pendingRequests.set(request.channelName, pendingRequests);
479
}
480
481
const timer = setTimeout(() => {
482
console.error(`Unknown channel: ${request.channelName}`);
483
484
if (request.type === RequestType.Promise) {
485
this.sendResponse({
486
id: request.id,
487
data: { name: 'Unknown channel', message: `Channel name '${request.channelName}' timed out after ${this.timeoutDelay}ms`, stack: undefined },
488
type: ResponseType.PromiseError
489
});
490
}
491
}, this.timeoutDelay);
492
493
pendingRequests.push({ request, timeoutTimer: timer });
494
}
495
496
private flushPendingRequests(channelName: string): void {
497
const requests = this.pendingRequests.get(channelName);
498
499
if (requests) {
500
for (const request of requests) {
501
clearTimeout(request.timeoutTimer);
502
503
switch (request.request.type) {
504
case RequestType.Promise: this.onPromise(request.request); break;
505
case RequestType.EventListen: this.onEventListen(request.request); break;
506
}
507
}
508
509
this.pendingRequests.delete(channelName);
510
}
511
}
512
513
public dispose(): void {
514
if (this.protocolListener) {
515
this.protocolListener.dispose();
516
this.protocolListener = null;
517
}
518
dispose(this.activeRequests.values());
519
this.activeRequests.clear();
520
}
521
}
522
523
export const enum RequestInitiator {
524
LocalSide = 0,
525
OtherSide = 1
526
}
527
528
export interface IIPCLogger {
529
logIncoming(msgLength: number, requestId: number, initiator: RequestInitiator, str: string, data?: any): void;
530
logOutgoing(msgLength: number, requestId: number, initiator: RequestInitiator, str: string, data?: any): void;
531
}
532
533
export class ChannelClient implements IChannelClient, IDisposable {
534
535
private isDisposed = false;
536
private state: State = State.Uninitialized;
537
private activeRequests = new Set<IDisposable>();
538
private handlers = new Map<number, IHandler>();
539
private lastRequestId = 0;
540
private protocolListener: IDisposable | null;
541
private logger: IIPCLogger | null;
542
543
private readonly _onDidInitialize = new Emitter<void>();
544
readonly onDidInitialize = this._onDidInitialize.event;
545
546
constructor(private protocol: IMessagePassingProtocol, logger: IIPCLogger | null = null) {
547
this.protocolListener = this.protocol.onMessage(msg => this.onBuffer(msg));
548
this.logger = logger;
549
}
550
551
getChannel<T extends IChannel>(channelName: string): T {
552
const that = this;
553
554
// eslint-disable-next-line local/code-no-dangerous-type-assertions
555
return {
556
call(command: string, arg?: any, cancellationToken?: CancellationToken) {
557
if (that.isDisposed) {
558
return Promise.reject(new CancellationError());
559
}
560
return that.requestPromise(channelName, command, arg, cancellationToken);
561
},
562
listen(event: string, arg: any) {
563
if (that.isDisposed) {
564
return Event.None;
565
}
566
return that.requestEvent(channelName, event, arg);
567
}
568
} as T;
569
}
570
571
private requestPromise(channelName: string, name: string, arg?: any, cancellationToken = CancellationToken.None): Promise<unknown> {
572
const id = this.lastRequestId++;
573
const type = RequestType.Promise;
574
const request: IRawRequest = { id, type, channelName, name, arg };
575
576
if (cancellationToken.isCancellationRequested) {
577
return Promise.reject(new CancellationError());
578
}
579
580
let disposable: IDisposable;
581
let disposableWithRequestCancel: IDisposable;
582
583
const result = new Promise((c, e) => {
584
if (cancellationToken.isCancellationRequested) {
585
return e(new CancellationError());
586
}
587
588
const doRequest = () => {
589
const handler: IHandler = response => {
590
switch (response.type) {
591
case ResponseType.PromiseSuccess:
592
this.handlers.delete(id);
593
c(response.data);
594
break;
595
596
case ResponseType.PromiseError: {
597
this.handlers.delete(id);
598
const error = new Error(response.data.message);
599
error.stack = Array.isArray(response.data.stack) ? response.data.stack.join('\n') : response.data.stack;
600
error.name = response.data.name;
601
e(error);
602
break;
603
}
604
case ResponseType.PromiseErrorObj:
605
this.handlers.delete(id);
606
e(response.data);
607
break;
608
}
609
};
610
611
this.handlers.set(id, handler);
612
this.sendRequest(request);
613
};
614
615
let uninitializedPromise: CancelablePromise<void> | null = null;
616
if (this.state === State.Idle) {
617
doRequest();
618
} else {
619
uninitializedPromise = createCancelablePromise(_ => this.whenInitialized());
620
uninitializedPromise.then(() => {
621
uninitializedPromise = null;
622
doRequest();
623
});
624
}
625
626
const cancel = () => {
627
if (uninitializedPromise) {
628
uninitializedPromise.cancel();
629
uninitializedPromise = null;
630
} else {
631
this.sendRequest({ id, type: RequestType.PromiseCancel });
632
}
633
634
e(new CancellationError());
635
};
636
637
disposable = cancellationToken.onCancellationRequested(cancel);
638
disposableWithRequestCancel = {
639
dispose: createSingleCallFunction(() => {
640
cancel();
641
disposable.dispose();
642
})
643
};
644
645
this.activeRequests.add(disposableWithRequestCancel);
646
});
647
648
return result.finally(() => {
649
disposable?.dispose(); // Seen as undefined in tests.
650
this.activeRequests.delete(disposableWithRequestCancel);
651
});
652
}
653
654
private requestEvent(channelName: string, name: string, arg?: any): Event<any> {
655
const id = this.lastRequestId++;
656
const type = RequestType.EventListen;
657
const request: IRawRequest = { id, type, channelName, name, arg };
658
659
let uninitializedPromise: CancelablePromise<void> | null = null;
660
661
const emitter = new Emitter<any>({
662
onWillAddFirstListener: () => {
663
const doRequest = () => {
664
this.activeRequests.add(emitter);
665
this.sendRequest(request);
666
};
667
if (this.state === State.Idle) {
668
doRequest();
669
} else {
670
uninitializedPromise = createCancelablePromise(_ => this.whenInitialized());
671
uninitializedPromise.then(() => {
672
uninitializedPromise = null;
673
doRequest();
674
});
675
}
676
},
677
onDidRemoveLastListener: () => {
678
if (uninitializedPromise) {
679
uninitializedPromise.cancel();
680
uninitializedPromise = null;
681
} else {
682
this.activeRequests.delete(emitter);
683
this.sendRequest({ id, type: RequestType.EventDispose });
684
}
685
this.handlers.delete(id);
686
}
687
});
688
689
const handler: IHandler = (res: IRawResponse) => emitter.fire((res as IRawEventFireResponse).data);
690
this.handlers.set(id, handler);
691
692
return emitter.event;
693
}
694
695
private sendRequest(request: IRawRequest): void {
696
switch (request.type) {
697
case RequestType.Promise:
698
case RequestType.EventListen: {
699
const msgLength = this.send([request.type, request.id, request.channelName, request.name], request.arg);
700
this.logger?.logOutgoing(msgLength, request.id, RequestInitiator.LocalSide, `${requestTypeToStr(request.type)}: ${request.channelName}.${request.name}`, request.arg);
701
return;
702
}
703
704
case RequestType.PromiseCancel:
705
case RequestType.EventDispose: {
706
const msgLength = this.send([request.type, request.id]);
707
this.logger?.logOutgoing(msgLength, request.id, RequestInitiator.LocalSide, requestTypeToStr(request.type));
708
return;
709
}
710
}
711
}
712
713
private send(header: unknown, body: any = undefined): number {
714
const writer = new BufferWriter();
715
serialize(writer, header);
716
serialize(writer, body);
717
return this.sendBuffer(writer.buffer);
718
}
719
720
private sendBuffer(message: VSBuffer): number {
721
try {
722
this.protocol.send(message);
723
return message.byteLength;
724
} catch (err) {
725
// noop
726
return 0;
727
}
728
}
729
730
private onBuffer(message: VSBuffer): void {
731
const reader = new BufferReader(message);
732
const header = deserialize(reader);
733
const body = deserialize(reader);
734
const type: ResponseType = header[0];
735
736
switch (type) {
737
case ResponseType.Initialize:
738
this.logger?.logIncoming(message.byteLength, 0, RequestInitiator.LocalSide, responseTypeToStr(type));
739
return this.onResponse({ type: header[0] });
740
741
case ResponseType.PromiseSuccess:
742
case ResponseType.PromiseError:
743
case ResponseType.EventFire:
744
case ResponseType.PromiseErrorObj:
745
this.logger?.logIncoming(message.byteLength, header[1], RequestInitiator.LocalSide, responseTypeToStr(type), body);
746
return this.onResponse({ type: header[0], id: header[1], data: body });
747
}
748
}
749
750
private onResponse(response: IRawResponse): void {
751
if (response.type === ResponseType.Initialize) {
752
this.state = State.Idle;
753
this._onDidInitialize.fire();
754
return;
755
}
756
757
const handler = this.handlers.get(response.id);
758
759
handler?.(response);
760
}
761
762
@memoize
763
get onDidInitializePromise(): Promise<void> {
764
return Event.toPromise(this.onDidInitialize);
765
}
766
767
private whenInitialized(): Promise<void> {
768
if (this.state === State.Idle) {
769
return Promise.resolve();
770
} else {
771
return this.onDidInitializePromise;
772
}
773
}
774
775
dispose(): void {
776
this.isDisposed = true;
777
if (this.protocolListener) {
778
this.protocolListener.dispose();
779
this.protocolListener = null;
780
}
781
dispose(this.activeRequests.values());
782
this.activeRequests.clear();
783
}
784
}
785
786
export interface ClientConnectionEvent {
787
protocol: IMessagePassingProtocol;
788
readonly onDidClientDisconnect: Event<void>;
789
}
790
791
interface Connection<TContext> extends Client<TContext> {
792
readonly channelServer: ChannelServer<TContext>;
793
readonly channelClient: ChannelClient;
794
}
795
796
/**
797
* An `IPCServer` is both a channel server and a routing channel
798
* client.
799
*
800
* As the owner of a protocol, you should extend both this
801
* and the `IPCClient` classes to get IPC implementations
802
* for your protocol.
803
*/
804
export class IPCServer<TContext = string> implements IChannelServer<TContext>, IRoutingChannelClient<TContext>, IConnectionHub<TContext>, IDisposable {
805
806
private channels = new Map<string, IServerChannel<TContext>>();
807
private _connections = new Set<Connection<TContext>>();
808
809
private readonly _onDidAddConnection = new Emitter<Connection<TContext>>();
810
readonly onDidAddConnection: Event<Connection<TContext>> = this._onDidAddConnection.event;
811
812
private readonly _onDidRemoveConnection = new Emitter<Connection<TContext>>();
813
readonly onDidRemoveConnection: Event<Connection<TContext>> = this._onDidRemoveConnection.event;
814
815
private readonly disposables = new DisposableStore();
816
817
get connections(): Connection<TContext>[] {
818
const result: Connection<TContext>[] = [];
819
this._connections.forEach(ctx => result.push(ctx));
820
return result;
821
}
822
823
constructor(onDidClientConnect: Event<ClientConnectionEvent>, ipcLogger?: IIPCLogger | null, timeoutDelay?: number) {
824
this.disposables.add(onDidClientConnect(({ protocol, onDidClientDisconnect }) => {
825
const onFirstMessage = Event.once(protocol.onMessage);
826
827
this.disposables.add(onFirstMessage(msg => {
828
const reader = new BufferReader(msg);
829
const ctx = deserialize(reader) as TContext;
830
831
const channelServer = new ChannelServer(protocol, ctx, ipcLogger, timeoutDelay);
832
const channelClient = new ChannelClient(protocol, ipcLogger);
833
834
this.channels.forEach((channel, name) => channelServer.registerChannel(name, channel));
835
836
const connection: Connection<TContext> = { channelServer, channelClient, ctx };
837
this._connections.add(connection);
838
this._onDidAddConnection.fire(connection);
839
840
this.disposables.add(onDidClientDisconnect(() => {
841
channelServer.dispose();
842
channelClient.dispose();
843
this._connections.delete(connection);
844
this._onDidRemoveConnection.fire(connection);
845
}));
846
}));
847
}));
848
}
849
850
/**
851
* Get a channel from a remote client. When passed a router,
852
* one can specify which client it wants to call and listen to/from.
853
* Otherwise, when calling without a router, a random client will
854
* be selected and when listening without a router, every client
855
* will be listened to.
856
*/
857
getChannel<T extends IChannel>(channelName: string, router: IClientRouter<TContext>): T;
858
getChannel<T extends IChannel>(channelName: string, clientFilter: (client: Client<TContext>) => boolean): T;
859
getChannel<T extends IChannel>(channelName: string, routerOrClientFilter: IClientRouter<TContext> | ((client: Client<TContext>) => boolean)): T {
860
const that = this;
861
862
// eslint-disable-next-line local/code-no-dangerous-type-assertions
863
return {
864
call(command: string, arg?: any, cancellationToken?: CancellationToken): Promise<T> {
865
let connectionPromise: Promise<Client<TContext>>;
866
867
if (isFunction(routerOrClientFilter)) {
868
// when no router is provided, we go random client picking
869
const connection = getRandomElement(that.connections.filter(routerOrClientFilter));
870
871
connectionPromise = connection
872
// if we found a client, let's call on it
873
? Promise.resolve(connection)
874
// else, let's wait for a client to come along
875
: Event.toPromise(Event.filter(that.onDidAddConnection, routerOrClientFilter));
876
} else {
877
connectionPromise = routerOrClientFilter.routeCall(that, command, arg);
878
}
879
880
const channelPromise = connectionPromise
881
.then(connection => (connection as Connection<TContext>).channelClient.getChannel(channelName));
882
883
return getDelayedChannel(channelPromise)
884
.call(command, arg, cancellationToken);
885
},
886
listen(event: string, arg: any): Event<T> {
887
if (isFunction(routerOrClientFilter)) {
888
return that.getMulticastEvent(channelName, routerOrClientFilter, event, arg);
889
}
890
891
const channelPromise = routerOrClientFilter.routeEvent(that, event, arg)
892
.then(connection => (connection as Connection<TContext>).channelClient.getChannel(channelName));
893
894
return getDelayedChannel(channelPromise)
895
.listen(event, arg);
896
}
897
} as T;
898
}
899
900
private getMulticastEvent<T extends IChannel>(channelName: string, clientFilter: (client: Client<TContext>) => boolean, eventName: string, arg: any): Event<T> {
901
const that = this;
902
let disposables: DisposableStore | undefined;
903
904
// Create an emitter which hooks up to all clients
905
// as soon as first listener is added. It also
906
// disconnects from all clients as soon as the last listener
907
// is removed.
908
const emitter = new Emitter<T>({
909
onWillAddFirstListener: () => {
910
disposables = new DisposableStore();
911
912
// The event multiplexer is useful since the active
913
// client list is dynamic. We need to hook up and disconnection
914
// to/from clients as they come and go.
915
const eventMultiplexer = new EventMultiplexer<T>();
916
const map = new Map<Connection<TContext>, IDisposable>();
917
918
const onDidAddConnection = (connection: Connection<TContext>) => {
919
const channel = connection.channelClient.getChannel(channelName);
920
const event = channel.listen<T>(eventName, arg);
921
const disposable = eventMultiplexer.add(event);
922
923
map.set(connection, disposable);
924
};
925
926
const onDidRemoveConnection = (connection: Connection<TContext>) => {
927
const disposable = map.get(connection);
928
929
if (!disposable) {
930
return;
931
}
932
933
disposable.dispose();
934
map.delete(connection);
935
};
936
937
that.connections.filter(clientFilter).forEach(onDidAddConnection);
938
Event.filter(that.onDidAddConnection, clientFilter)(onDidAddConnection, undefined, disposables);
939
that.onDidRemoveConnection(onDidRemoveConnection, undefined, disposables);
940
eventMultiplexer.event(emitter.fire, emitter, disposables);
941
942
disposables.add(eventMultiplexer);
943
},
944
onDidRemoveLastListener: () => {
945
disposables?.dispose();
946
disposables = undefined;
947
}
948
});
949
that.disposables.add(emitter);
950
951
return emitter.event;
952
}
953
954
registerChannel(channelName: string, channel: IServerChannel<TContext>): void {
955
this.channels.set(channelName, channel);
956
957
for (const connection of this._connections) {
958
connection.channelServer.registerChannel(channelName, channel);
959
}
960
}
961
962
dispose(): void {
963
this.disposables.dispose();
964
965
for (const connection of this._connections) {
966
connection.channelClient.dispose();
967
connection.channelServer.dispose();
968
}
969
970
this._connections.clear();
971
this.channels.clear();
972
this._onDidAddConnection.dispose();
973
this._onDidRemoveConnection.dispose();
974
}
975
}
976
977
/**
978
* An `IPCClient` is both a channel client and a channel server.
979
*
980
* As the owner of a protocol, you should extend both this
981
* and the `IPCServer` classes to get IPC implementations
982
* for your protocol.
983
*/
984
export class IPCClient<TContext = string> implements IChannelClient, IChannelServer<TContext>, IDisposable {
985
986
private channelClient: ChannelClient;
987
private channelServer: ChannelServer<TContext>;
988
989
constructor(protocol: IMessagePassingProtocol, ctx: TContext, ipcLogger: IIPCLogger | null = null) {
990
const writer = new BufferWriter();
991
serialize(writer, ctx);
992
protocol.send(writer.buffer);
993
994
this.channelClient = new ChannelClient(protocol, ipcLogger);
995
this.channelServer = new ChannelServer(protocol, ctx, ipcLogger);
996
}
997
998
getChannel<T extends IChannel>(channelName: string): T {
999
return this.channelClient.getChannel(channelName);
1000
}
1001
1002
registerChannel(channelName: string, channel: IServerChannel<TContext>): void {
1003
this.channelServer.registerChannel(channelName, channel);
1004
}
1005
1006
dispose(): void {
1007
this.channelClient.dispose();
1008
this.channelServer.dispose();
1009
}
1010
}
1011
1012
export function getDelayedChannel<T extends IChannel>(promise: Promise<T>): T {
1013
// eslint-disable-next-line local/code-no-dangerous-type-assertions
1014
return {
1015
call(command: string, arg?: any, cancellationToken?: CancellationToken): Promise<T> {
1016
return promise.then(c => c.call<T>(command, arg, cancellationToken));
1017
},
1018
1019
listen<T>(event: string, arg?: any): Event<T> {
1020
const relay = new Relay<any>();
1021
promise.then(c => relay.input = c.listen(event, arg));
1022
return relay.event;
1023
}
1024
} as T;
1025
}
1026
1027
export function getNextTickChannel<T extends IChannel>(channel: T): T {
1028
let didTick = false;
1029
1030
// eslint-disable-next-line local/code-no-dangerous-type-assertions
1031
return {
1032
call<T>(command: string, arg?: any, cancellationToken?: CancellationToken): Promise<T> {
1033
if (didTick) {
1034
return channel.call(command, arg, cancellationToken);
1035
}
1036
1037
return timeout(0)
1038
.then(() => didTick = true)
1039
.then(() => channel.call<T>(command, arg, cancellationToken));
1040
},
1041
listen<T>(event: string, arg?: any): Event<T> {
1042
if (didTick) {
1043
return channel.listen<T>(event, arg);
1044
}
1045
1046
const relay = new Relay<T>();
1047
1048
timeout(0)
1049
.then(() => didTick = true)
1050
.then(() => relay.input = channel.listen<T>(event, arg));
1051
1052
return relay.event;
1053
}
1054
} as T;
1055
}
1056
1057
export class StaticRouter<TContext = string> implements IClientRouter<TContext> {
1058
1059
constructor(private fn: (ctx: TContext) => boolean | Promise<boolean>) { }
1060
1061
routeCall(hub: IConnectionHub<TContext>): Promise<Client<TContext>> {
1062
return this.route(hub);
1063
}
1064
1065
routeEvent(hub: IConnectionHub<TContext>): Promise<Client<TContext>> {
1066
return this.route(hub);
1067
}
1068
1069
private async route(hub: IConnectionHub<TContext>): Promise<Client<TContext>> {
1070
for (const connection of hub.connections) {
1071
if (await Promise.resolve(this.fn(connection.ctx))) {
1072
return Promise.resolve(connection);
1073
}
1074
}
1075
1076
await Event.toPromise(hub.onDidAddConnection);
1077
return await this.route(hub);
1078
}
1079
}
1080
1081
/**
1082
* Use ProxyChannels to automatically wrapping and unwrapping
1083
* services to/from IPC channels, instead of manually wrapping
1084
* each service method and event.
1085
*
1086
* Restrictions:
1087
* - If marshalling is enabled, only `URI` and `RegExp` is converted
1088
* automatically for you
1089
* - Events must follow the naming convention `onUpperCase`
1090
* - `CancellationToken` is currently not supported
1091
* - If a context is provided, you can use `AddFirstParameterToFunctions`
1092
* utility to signal this in the receiving side type
1093
*/
1094
export namespace ProxyChannel {
1095
1096
export interface IProxyOptions {
1097
1098
/**
1099
* Disables automatic marshalling of `URI`.
1100
* If marshalling is disabled, `UriComponents`
1101
* must be used instead.
1102
*/
1103
disableMarshalling?: boolean;
1104
}
1105
1106
export interface ICreateServiceChannelOptions extends IProxyOptions { }
1107
1108
export function fromService<TContext>(service: unknown, disposables: DisposableStore, options?: ICreateServiceChannelOptions): IServerChannel<TContext> {
1109
const handler = service as { [key: string]: unknown };
1110
const disableMarshalling = options?.disableMarshalling;
1111
1112
// Buffer any event that should be supported by
1113
// iterating over all property keys and finding them
1114
// However, this will not work for services that
1115
// are lazy and use a Proxy within. For that we
1116
// still need to check later (see below).
1117
const mapEventNameToEvent = new Map<string, Event<unknown>>();
1118
for (const key in handler) {
1119
if (propertyIsEvent(key)) {
1120
mapEventNameToEvent.set(key, Event.buffer(handler[key] as Event<unknown>, true, undefined, disposables));
1121
}
1122
}
1123
1124
return new class implements IServerChannel {
1125
1126
listen<T>(_: unknown, event: string, arg: any): Event<T> {
1127
const eventImpl = mapEventNameToEvent.get(event);
1128
if (eventImpl) {
1129
return eventImpl as Event<T>;
1130
}
1131
1132
const target = handler[event];
1133
if (typeof target === 'function') {
1134
if (propertyIsDynamicEvent(event)) {
1135
return target.call(handler, arg);
1136
}
1137
1138
if (propertyIsEvent(event)) {
1139
mapEventNameToEvent.set(event, Event.buffer(handler[event] as Event<unknown>, true, undefined, disposables));
1140
1141
return mapEventNameToEvent.get(event) as Event<T>;
1142
}
1143
}
1144
1145
throw new ErrorNoTelemetry(`Event not found: ${event}`);
1146
}
1147
1148
call(_: unknown, command: string, args?: any[]): Promise<any> {
1149
const target = handler[command];
1150
if (typeof target === 'function') {
1151
1152
// Revive unless marshalling disabled
1153
if (!disableMarshalling && Array.isArray(args)) {
1154
for (let i = 0; i < args.length; i++) {
1155
args[i] = revive(args[i]);
1156
}
1157
}
1158
1159
let res = target.apply(handler, args);
1160
if (!(res instanceof Promise)) {
1161
res = Promise.resolve(res);
1162
}
1163
return res;
1164
}
1165
1166
throw new ErrorNoTelemetry(`Method not found: ${command}`);
1167
}
1168
};
1169
}
1170
1171
export interface ICreateProxyServiceOptions extends IProxyOptions {
1172
1173
/**
1174
* If provided, will add the value of `context`
1175
* to each method call to the target.
1176
*/
1177
context?: unknown;
1178
1179
/**
1180
* If provided, will not proxy any of the properties
1181
* that are part of the Map but rather return that value.
1182
*/
1183
properties?: Map<string, unknown>;
1184
}
1185
1186
export function toService<T extends object>(channel: IChannel, options?: ICreateProxyServiceOptions): T {
1187
const disableMarshalling = options?.disableMarshalling;
1188
1189
return new Proxy({}, {
1190
get(_target: T, propKey: PropertyKey) {
1191
if (typeof propKey === 'string') {
1192
1193
// Check for predefined values
1194
if (options?.properties?.has(propKey)) {
1195
return options.properties.get(propKey);
1196
}
1197
1198
// Dynamic Event
1199
if (propertyIsDynamicEvent(propKey)) {
1200
return function (arg: unknown) {
1201
return channel.listen(propKey, arg);
1202
};
1203
}
1204
1205
// Event
1206
if (propertyIsEvent(propKey)) {
1207
return channel.listen(propKey);
1208
}
1209
1210
// Function
1211
return async function (...args: any[]) {
1212
1213
// Add context if any
1214
let methodArgs: any[];
1215
if (options && !isUndefinedOrNull(options.context)) {
1216
methodArgs = [options.context, ...args];
1217
} else {
1218
methodArgs = args;
1219
}
1220
1221
const result = await channel.call(propKey, methodArgs);
1222
1223
// Revive unless marshalling disabled
1224
if (!disableMarshalling) {
1225
return revive(result);
1226
}
1227
1228
return result;
1229
};
1230
}
1231
1232
throw new ErrorNoTelemetry(`Property not found: ${String(propKey)}`);
1233
}
1234
}) as T;
1235
}
1236
1237
function propertyIsEvent(name: string): boolean {
1238
// Assume a property is an event if it has a form of "onSomething"
1239
return name[0] === 'o' && name[1] === 'n' && strings.isUpperAsciiLetter(name.charCodeAt(2));
1240
}
1241
1242
function propertyIsDynamicEvent(name: string): boolean {
1243
// Assume a property is a dynamic event (a method that returns an event) if it has a form of "onDynamicSomething"
1244
return /^onDynamic/.test(name) && strings.isUpperAsciiLetter(name.charCodeAt(9));
1245
}
1246
}
1247
1248
const colorTables = [
1249
['#2977B1', '#FC802D', '#34A13A', '#D3282F', '#9366BA'],
1250
['#8B564C', '#E177C0', '#7F7F7F', '#BBBE3D', '#2EBECD']
1251
];
1252
1253
function prettyWithoutArrays(data: unknown): any {
1254
if (Array.isArray(data)) {
1255
return data;
1256
}
1257
if (data && typeof data === 'object' && typeof data.toString === 'function') {
1258
const result = data.toString();
1259
if (result !== '[object Object]') {
1260
return result;
1261
}
1262
}
1263
return data;
1264
}
1265
1266
function pretty(data: unknown): any {
1267
if (Array.isArray(data)) {
1268
return data.map(prettyWithoutArrays);
1269
}
1270
return prettyWithoutArrays(data);
1271
}
1272
1273
function logWithColors(direction: string, totalLength: number, msgLength: number, req: number, initiator: RequestInitiator, str: string, data: any): void {
1274
data = pretty(data);
1275
1276
const colorTable = colorTables[initiator];
1277
const color = colorTable[req % colorTable.length];
1278
let args = [`%c[${direction}]%c[${String(totalLength).padStart(7, ' ')}]%c[len: ${String(msgLength).padStart(5, ' ')}]%c${String(req).padStart(5, ' ')} - ${str}`, 'color: darkgreen', 'color: grey', 'color: grey', `color: ${color}`];
1279
if (/\($/.test(str)) {
1280
args = args.concat(data);
1281
args.push(')');
1282
} else {
1283
args.push(data);
1284
}
1285
console.log.apply(console, args as [string, ...string[]]);
1286
}
1287
1288
export class IPCLogger implements IIPCLogger {
1289
private _totalIncoming = 0;
1290
private _totalOutgoing = 0;
1291
1292
constructor(
1293
private readonly _outgoingPrefix: string,
1294
private readonly _incomingPrefix: string,
1295
) { }
1296
1297
public logOutgoing(msgLength: number, requestId: number, initiator: RequestInitiator, str: string, data?: any): void {
1298
this._totalOutgoing += msgLength;
1299
logWithColors(this._outgoingPrefix, this._totalOutgoing, msgLength, requestId, initiator, str, data);
1300
}
1301
1302
public logIncoming(msgLength: number, requestId: number, initiator: RequestInitiator, str: string, data?: any): void {
1303
this._totalIncoming += msgLength;
1304
logWithColors(this._incomingPrefix, this._totalIncoming, msgLength, requestId, initiator, str, data);
1305
}
1306
}
1307
1308