Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/src/vs/base/parts/ipc/common/ipc.net.ts
4780 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
import { VSBuffer } from '../../../common/buffer.js';
7
import { Emitter, Event } from '../../../common/event.js';
8
import { Disposable, DisposableStore, IDisposable } from '../../../common/lifecycle.js';
9
import { IIPCLogger, IMessagePassingProtocol, IPCClient } from './ipc.js';
10
11
export const enum SocketDiagnosticsEventType {
12
Created = 'created',
13
Read = 'read',
14
Write = 'write',
15
Open = 'open',
16
Error = 'error',
17
Close = 'close',
18
19
BrowserWebSocketBlobReceived = 'browserWebSocketBlobReceived',
20
21
NodeEndReceived = 'nodeEndReceived',
22
NodeEndSent = 'nodeEndSent',
23
NodeDrainBegin = 'nodeDrainBegin',
24
NodeDrainEnd = 'nodeDrainEnd',
25
26
zlibInflateError = 'zlibInflateError',
27
zlibInflateData = 'zlibInflateData',
28
zlibInflateInitialWrite = 'zlibInflateInitialWrite',
29
zlibInflateInitialFlushFired = 'zlibInflateInitialFlushFired',
30
zlibInflateWrite = 'zlibInflateWrite',
31
zlibInflateFlushFired = 'zlibInflateFlushFired',
32
zlibDeflateError = 'zlibDeflateError',
33
zlibDeflateData = 'zlibDeflateData',
34
zlibDeflateWrite = 'zlibDeflateWrite',
35
zlibDeflateFlushFired = 'zlibDeflateFlushFired',
36
37
WebSocketNodeSocketWrite = 'webSocketNodeSocketWrite',
38
WebSocketNodeSocketPeekedHeader = 'webSocketNodeSocketPeekedHeader',
39
WebSocketNodeSocketReadHeader = 'webSocketNodeSocketReadHeader',
40
WebSocketNodeSocketReadData = 'webSocketNodeSocketReadData',
41
WebSocketNodeSocketUnmaskedData = 'webSocketNodeSocketUnmaskedData',
42
WebSocketNodeSocketDrainBegin = 'webSocketNodeSocketDrainBegin',
43
WebSocketNodeSocketDrainEnd = 'webSocketNodeSocketDrainEnd',
44
45
ProtocolHeaderRead = 'protocolHeaderRead',
46
ProtocolMessageRead = 'protocolMessageRead',
47
ProtocolHeaderWrite = 'protocolHeaderWrite',
48
ProtocolMessageWrite = 'protocolMessageWrite',
49
ProtocolWrite = 'protocolWrite',
50
}
51
52
export namespace SocketDiagnostics {
53
54
export const enableDiagnostics = false;
55
56
export interface IRecord {
57
timestamp: number;
58
id: string;
59
label: string;
60
type: SocketDiagnosticsEventType;
61
buff?: VSBuffer;
62
data?: any;
63
}
64
65
export const records: IRecord[] = [];
66
const socketIds = new WeakMap<any, string>();
67
let lastUsedSocketId = 0;
68
69
function getSocketId(nativeObject: unknown, label: string): string {
70
if (!socketIds.has(nativeObject)) {
71
const id = String(++lastUsedSocketId);
72
socketIds.set(nativeObject, id);
73
}
74
return socketIds.get(nativeObject)!;
75
}
76
77
export function traceSocketEvent(nativeObject: unknown, socketDebugLabel: string, type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | any): void {
78
if (!enableDiagnostics) {
79
return;
80
}
81
const id = getSocketId(nativeObject, socketDebugLabel);
82
83
if (data instanceof VSBuffer || data instanceof Uint8Array || data instanceof ArrayBuffer || ArrayBuffer.isView(data)) {
84
const copiedData = VSBuffer.alloc(data.byteLength);
85
copiedData.set(data);
86
records.push({ timestamp: Date.now(), id, label: socketDebugLabel, type, buff: copiedData });
87
} else {
88
// data is a custom object
89
records.push({ timestamp: Date.now(), id, label: socketDebugLabel, type, data: data });
90
}
91
}
92
}
93
94
export const enum SocketCloseEventType {
95
NodeSocketCloseEvent = 0,
96
WebSocketCloseEvent = 1
97
}
98
99
export interface NodeSocketCloseEvent {
100
/**
101
* The type of the event
102
*/
103
readonly type: SocketCloseEventType.NodeSocketCloseEvent;
104
/**
105
* `true` if the socket had a transmission error.
106
*/
107
readonly hadError: boolean;
108
/**
109
* Underlying error.
110
*/
111
readonly error: Error | undefined;
112
}
113
114
export interface WebSocketCloseEvent {
115
/**
116
* The type of the event
117
*/
118
readonly type: SocketCloseEventType.WebSocketCloseEvent;
119
/**
120
* Returns the WebSocket connection close code provided by the server.
121
*/
122
readonly code: number;
123
/**
124
* Returns the WebSocket connection close reason provided by the server.
125
*/
126
readonly reason: string;
127
/**
128
* Returns true if the connection closed cleanly; false otherwise.
129
*/
130
readonly wasClean: boolean;
131
/**
132
* Underlying event.
133
*/
134
readonly event: any | undefined;
135
}
136
137
export type SocketCloseEvent = NodeSocketCloseEvent | WebSocketCloseEvent | undefined;
138
139
export interface SocketTimeoutEvent {
140
readonly unacknowledgedMsgCount: number;
141
readonly timeSinceOldestUnacknowledgedMsg: number;
142
readonly timeSinceLastReceivedSomeData: number;
143
}
144
145
export interface ISocket extends IDisposable {
146
onData(listener: (e: VSBuffer) => void): IDisposable;
147
onClose(listener: (e: SocketCloseEvent) => void): IDisposable;
148
onEnd(listener: () => void): IDisposable;
149
write(buffer: VSBuffer): void;
150
end(): void;
151
drain(): Promise<void>;
152
153
traceSocketEvent(type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | any): void;
154
}
155
156
let emptyBuffer: VSBuffer | null = null;
157
function getEmptyBuffer(): VSBuffer {
158
if (!emptyBuffer) {
159
emptyBuffer = VSBuffer.alloc(0);
160
}
161
return emptyBuffer;
162
}
163
164
export class ChunkStream {
165
166
private _chunks: VSBuffer[];
167
private _totalLength: number;
168
169
public get byteLength() {
170
return this._totalLength;
171
}
172
173
constructor() {
174
this._chunks = [];
175
this._totalLength = 0;
176
}
177
178
public acceptChunk(buff: VSBuffer) {
179
this._chunks.push(buff);
180
this._totalLength += buff.byteLength;
181
}
182
183
public read(byteCount: number): VSBuffer {
184
return this._read(byteCount, true);
185
}
186
187
public peek(byteCount: number): VSBuffer {
188
return this._read(byteCount, false);
189
}
190
191
private _read(byteCount: number, advance: boolean): VSBuffer {
192
193
if (byteCount === 0) {
194
return getEmptyBuffer();
195
}
196
197
if (byteCount > this._totalLength) {
198
throw new Error(`Cannot read so many bytes!`);
199
}
200
201
if (this._chunks[0].byteLength === byteCount) {
202
// super fast path, precisely first chunk must be returned
203
const result = this._chunks[0];
204
if (advance) {
205
this._chunks.shift();
206
this._totalLength -= byteCount;
207
}
208
return result;
209
}
210
211
if (this._chunks[0].byteLength > byteCount) {
212
// fast path, the reading is entirely within the first chunk
213
const result = this._chunks[0].slice(0, byteCount);
214
if (advance) {
215
this._chunks[0] = this._chunks[0].slice(byteCount);
216
this._totalLength -= byteCount;
217
}
218
return result;
219
}
220
221
const result = VSBuffer.alloc(byteCount);
222
let resultOffset = 0;
223
let chunkIndex = 0;
224
while (byteCount > 0) {
225
const chunk = this._chunks[chunkIndex];
226
if (chunk.byteLength > byteCount) {
227
// this chunk will survive
228
const chunkPart = chunk.slice(0, byteCount);
229
result.set(chunkPart, resultOffset);
230
resultOffset += byteCount;
231
232
if (advance) {
233
this._chunks[chunkIndex] = chunk.slice(byteCount);
234
this._totalLength -= byteCount;
235
}
236
237
byteCount -= byteCount;
238
} else {
239
// this chunk will be entirely read
240
result.set(chunk, resultOffset);
241
resultOffset += chunk.byteLength;
242
243
if (advance) {
244
this._chunks.shift();
245
this._totalLength -= chunk.byteLength;
246
} else {
247
chunkIndex++;
248
}
249
250
byteCount -= chunk.byteLength;
251
}
252
}
253
return result;
254
}
255
}
256
257
const enum ProtocolMessageType {
258
None = 0,
259
Regular = 1,
260
Control = 2,
261
Ack = 3,
262
Disconnect = 5,
263
ReplayRequest = 6,
264
Pause = 7,
265
Resume = 8,
266
KeepAlive = 9
267
}
268
269
function protocolMessageTypeToString(messageType: ProtocolMessageType) {
270
switch (messageType) {
271
case ProtocolMessageType.None: return 'None';
272
case ProtocolMessageType.Regular: return 'Regular';
273
case ProtocolMessageType.Control: return 'Control';
274
case ProtocolMessageType.Ack: return 'Ack';
275
case ProtocolMessageType.Disconnect: return 'Disconnect';
276
case ProtocolMessageType.ReplayRequest: return 'ReplayRequest';
277
case ProtocolMessageType.Pause: return 'PauseWriting';
278
case ProtocolMessageType.Resume: return 'ResumeWriting';
279
case ProtocolMessageType.KeepAlive: return 'KeepAlive';
280
}
281
}
282
283
export const enum ProtocolConstants {
284
HeaderLength = 13,
285
/**
286
* Send an Acknowledge message at most 2 seconds later...
287
*/
288
AcknowledgeTime = 2000, // 2 seconds
289
/**
290
* If there is a sent message that has been unacknowledged for 20 seconds,
291
* and we didn't see any incoming server data in the past 20 seconds,
292
* then consider the connection has timed out.
293
*/
294
TimeoutTime = 20000, // 20 seconds
295
/**
296
* If there is no reconnection within this time-frame, consider the connection permanently closed...
297
*/
298
ReconnectionGraceTime = 3 * 60 * 60 * 1000, // 3hrs
299
/**
300
* Maximal grace time between the first and the last reconnection...
301
*/
302
ReconnectionShortGraceTime = 5 * 60 * 1000, // 5min
303
/**
304
* Send a message every 5 seconds to avoid that the connection is closed by the OS.
305
*/
306
KeepAliveSendTime = 5000, // 5 seconds
307
}
308
309
class ProtocolMessage {
310
311
public writtenTime: number;
312
313
constructor(
314
public readonly type: ProtocolMessageType,
315
public readonly id: number,
316
public readonly ack: number,
317
public readonly data: VSBuffer
318
) {
319
this.writtenTime = 0;
320
}
321
322
public get size(): number {
323
return this.data.byteLength;
324
}
325
}
326
327
class ProtocolReader extends Disposable {
328
329
private readonly _socket: ISocket;
330
private _isDisposed: boolean;
331
private readonly _incomingData: ChunkStream;
332
public lastReadTime: number;
333
334
private readonly _onMessage = this._register(new Emitter<ProtocolMessage>());
335
public readonly onMessage: Event<ProtocolMessage> = this._onMessage.event;
336
337
private readonly _state = {
338
readHead: true,
339
readLen: ProtocolConstants.HeaderLength,
340
messageType: ProtocolMessageType.None,
341
id: 0,
342
ack: 0
343
};
344
345
constructor(socket: ISocket) {
346
super();
347
this._socket = socket;
348
this._isDisposed = false;
349
this._incomingData = new ChunkStream();
350
this._register(this._socket.onData(data => this.acceptChunk(data)));
351
this.lastReadTime = Date.now();
352
}
353
354
public acceptChunk(data: VSBuffer | null): void {
355
if (!data || data.byteLength === 0) {
356
return;
357
}
358
359
this.lastReadTime = Date.now();
360
361
this._incomingData.acceptChunk(data);
362
363
while (this._incomingData.byteLength >= this._state.readLen) {
364
365
const buff = this._incomingData.read(this._state.readLen);
366
367
if (this._state.readHead) {
368
// buff is the header
369
370
// save new state => next time will read the body
371
this._state.readHead = false;
372
this._state.readLen = buff.readUInt32BE(9);
373
this._state.messageType = buff.readUInt8(0);
374
this._state.id = buff.readUInt32BE(1);
375
this._state.ack = buff.readUInt32BE(5);
376
377
this._socket.traceSocketEvent(SocketDiagnosticsEventType.ProtocolHeaderRead, { messageType: protocolMessageTypeToString(this._state.messageType), id: this._state.id, ack: this._state.ack, messageSize: this._state.readLen });
378
379
} else {
380
// buff is the body
381
const messageType = this._state.messageType;
382
const id = this._state.id;
383
const ack = this._state.ack;
384
385
// save new state => next time will read the header
386
this._state.readHead = true;
387
this._state.readLen = ProtocolConstants.HeaderLength;
388
this._state.messageType = ProtocolMessageType.None;
389
this._state.id = 0;
390
this._state.ack = 0;
391
392
this._socket.traceSocketEvent(SocketDiagnosticsEventType.ProtocolMessageRead, buff);
393
394
this._onMessage.fire(new ProtocolMessage(messageType, id, ack, buff));
395
396
if (this._isDisposed) {
397
// check if an event listener lead to our disposal
398
break;
399
}
400
}
401
}
402
}
403
404
public readEntireBuffer(): VSBuffer {
405
return this._incomingData.read(this._incomingData.byteLength);
406
}
407
408
public override dispose(): void {
409
this._isDisposed = true;
410
super.dispose();
411
}
412
}
413
414
class ProtocolWriter {
415
416
private _isDisposed: boolean;
417
private _isPaused: boolean;
418
private readonly _socket: ISocket;
419
private _data: VSBuffer[];
420
private _totalLength: number;
421
public lastWriteTime: number;
422
423
constructor(socket: ISocket) {
424
this._isDisposed = false;
425
this._isPaused = false;
426
this._socket = socket;
427
this._data = [];
428
this._totalLength = 0;
429
this.lastWriteTime = 0;
430
}
431
432
public dispose(): void {
433
try {
434
this.flush();
435
} catch (err) {
436
// ignore error, since the socket could be already closed
437
}
438
this._isDisposed = true;
439
}
440
441
public drain(): Promise<void> {
442
this.flush();
443
return this._socket.drain();
444
}
445
446
public flush(): void {
447
// flush
448
this._writeNow();
449
}
450
451
public pause(): void {
452
this._isPaused = true;
453
}
454
455
public resume(): void {
456
this._isPaused = false;
457
this._scheduleWriting();
458
}
459
460
public write(msg: ProtocolMessage) {
461
if (this._isDisposed) {
462
// ignore: there could be left-over promises which complete and then
463
// decide to write a response, etc...
464
return;
465
}
466
msg.writtenTime = Date.now();
467
this.lastWriteTime = Date.now();
468
const header = VSBuffer.alloc(ProtocolConstants.HeaderLength);
469
header.writeUInt8(msg.type, 0);
470
header.writeUInt32BE(msg.id, 1);
471
header.writeUInt32BE(msg.ack, 5);
472
header.writeUInt32BE(msg.data.byteLength, 9);
473
474
this._socket.traceSocketEvent(SocketDiagnosticsEventType.ProtocolHeaderWrite, { messageType: protocolMessageTypeToString(msg.type), id: msg.id, ack: msg.ack, messageSize: msg.data.byteLength });
475
this._socket.traceSocketEvent(SocketDiagnosticsEventType.ProtocolMessageWrite, msg.data);
476
477
this._writeSoon(header, msg.data);
478
}
479
480
private _bufferAdd(head: VSBuffer, body: VSBuffer): boolean {
481
const wasEmpty = this._totalLength === 0;
482
this._data.push(head, body);
483
this._totalLength += head.byteLength + body.byteLength;
484
return wasEmpty;
485
}
486
487
private _bufferTake(): VSBuffer {
488
const ret = VSBuffer.concat(this._data, this._totalLength);
489
this._data.length = 0;
490
this._totalLength = 0;
491
return ret;
492
}
493
494
private _writeSoon(header: VSBuffer, data: VSBuffer): void {
495
if (this._bufferAdd(header, data)) {
496
this._scheduleWriting();
497
}
498
}
499
500
private _writeNowTimeout: Timeout | null = null;
501
private _scheduleWriting(): void {
502
if (this._writeNowTimeout) {
503
return;
504
}
505
this._writeNowTimeout = setTimeout(() => {
506
this._writeNowTimeout = null;
507
this._writeNow();
508
});
509
}
510
511
private _writeNow(): void {
512
if (this._totalLength === 0) {
513
return;
514
}
515
if (this._isPaused) {
516
return;
517
}
518
const data = this._bufferTake();
519
this._socket.traceSocketEvent(SocketDiagnosticsEventType.ProtocolWrite, { byteLength: data.byteLength });
520
this._socket.write(data);
521
}
522
}
523
524
/**
525
* A message has the following format:
526
* ```
527
* /-------------------------------|------\
528
* | HEADER | |
529
* |-------------------------------| DATA |
530
* | TYPE | ID | ACK | DATA_LENGTH | |
531
* \-------------------------------|------/
532
* ```
533
* The header is 9 bytes and consists of:
534
* - TYPE is 1 byte (ProtocolMessageType) - the message type
535
* - ID is 4 bytes (u32be) - the message id (can be 0 to indicate to be ignored)
536
* - ACK is 4 bytes (u32be) - the acknowledged message id (can be 0 to indicate to be ignored)
537
* - DATA_LENGTH is 4 bytes (u32be) - the length in bytes of DATA
538
*
539
* Only Regular messages are counted, other messages are not counted, nor acknowledged.
540
*/
541
export class Protocol extends Disposable implements IMessagePassingProtocol {
542
543
private _socket: ISocket;
544
private _socketWriter: ProtocolWriter;
545
private _socketReader: ProtocolReader;
546
547
private readonly _onMessage = new Emitter<VSBuffer>();
548
readonly onMessage: Event<VSBuffer> = this._onMessage.event;
549
550
private readonly _onDidDispose = new Emitter<void>();
551
readonly onDidDispose: Event<void> = this._onDidDispose.event;
552
553
constructor(socket: ISocket) {
554
super();
555
this._socket = socket;
556
this._socketWriter = this._register(new ProtocolWriter(this._socket));
557
this._socketReader = this._register(new ProtocolReader(this._socket));
558
559
this._register(this._socketReader.onMessage((msg) => {
560
if (msg.type === ProtocolMessageType.Regular) {
561
this._onMessage.fire(msg.data);
562
}
563
}));
564
565
this._register(this._socket.onClose(() => this._onDidDispose.fire()));
566
}
567
568
drain(): Promise<void> {
569
return this._socketWriter.drain();
570
}
571
572
getSocket(): ISocket {
573
return this._socket;
574
}
575
576
sendDisconnect(): void {
577
// Nothing to do...
578
}
579
580
send(buffer: VSBuffer): void {
581
this._socketWriter.write(new ProtocolMessage(ProtocolMessageType.Regular, 0, 0, buffer));
582
}
583
}
584
585
export class Client<TContext = string> extends IPCClient<TContext> {
586
587
static fromSocket<TContext = string>(socket: ISocket, id: TContext): Client<TContext> {
588
return new Client(new Protocol(socket), id);
589
}
590
591
get onDidDispose(): Event<void> { return this.protocol.onDidDispose; }
592
593
constructor(private protocol: Protocol | PersistentProtocol, id: TContext, ipcLogger: IIPCLogger | null = null) {
594
super(protocol, id, ipcLogger);
595
}
596
597
override dispose(): void {
598
super.dispose();
599
const socket = this.protocol.getSocket();
600
// should be sent gracefully with a .flush(), but try to send it out as a
601
// last resort here if nothing else:
602
this.protocol.sendDisconnect();
603
this.protocol.dispose();
604
socket.end();
605
}
606
}
607
608
/**
609
* Will ensure no messages are lost if there are no event listeners.
610
*/
611
export class BufferedEmitter<T> {
612
private _emitter: Emitter<T>;
613
public readonly event: Event<T>;
614
615
private _hasListeners = false;
616
private _isDeliveringMessages = false;
617
private _bufferedMessages: T[] = [];
618
619
constructor() {
620
this._emitter = new Emitter<T>({
621
onWillAddFirstListener: () => {
622
this._hasListeners = true;
623
// it is important to deliver these messages after this call, but before
624
// other messages have a chance to be received (to guarantee in order delivery)
625
// that's why we're using here queueMicrotask and not other types of timeouts
626
queueMicrotask(() => this._deliverMessages());
627
},
628
onDidRemoveLastListener: () => {
629
this._hasListeners = false;
630
}
631
});
632
633
this.event = this._emitter.event;
634
}
635
636
private _deliverMessages(): void {
637
if (this._isDeliveringMessages) {
638
return;
639
}
640
this._isDeliveringMessages = true;
641
while (this._hasListeners && this._bufferedMessages.length > 0) {
642
this._emitter.fire(this._bufferedMessages.shift()!);
643
}
644
this._isDeliveringMessages = false;
645
}
646
647
public fire(event: T): void {
648
if (this._hasListeners) {
649
if (this._bufferedMessages.length > 0) {
650
this._bufferedMessages.push(event);
651
} else {
652
this._emitter.fire(event);
653
}
654
} else {
655
this._bufferedMessages.push(event);
656
}
657
}
658
659
public flushBuffer(): void {
660
this._bufferedMessages = [];
661
}
662
}
663
664
class QueueElement<T> {
665
public readonly data: T;
666
public next: QueueElement<T> | null;
667
668
constructor(data: T) {
669
this.data = data;
670
this.next = null;
671
}
672
}
673
674
class Queue<T> {
675
676
private _first: QueueElement<T> | null;
677
private _last: QueueElement<T> | null;
678
679
constructor() {
680
this._first = null;
681
this._last = null;
682
}
683
684
public length(): number {
685
let result = 0;
686
let current = this._first;
687
while (current) {
688
current = current.next;
689
result++;
690
}
691
return result;
692
}
693
694
public peek(): T | null {
695
if (!this._first) {
696
return null;
697
}
698
return this._first.data;
699
}
700
701
public toArray(): T[] {
702
const result: T[] = [];
703
let resultLen = 0;
704
let it = this._first;
705
while (it) {
706
result[resultLen++] = it.data;
707
it = it.next;
708
}
709
return result;
710
}
711
712
public pop(): void {
713
if (!this._first) {
714
return;
715
}
716
if (this._first === this._last) {
717
this._first = null;
718
this._last = null;
719
return;
720
}
721
this._first = this._first.next;
722
}
723
724
public push(item: T): void {
725
const element = new QueueElement(item);
726
if (!this._first) {
727
this._first = element;
728
this._last = element;
729
return;
730
}
731
this._last!.next = element;
732
this._last = element;
733
}
734
}
735
736
class LoadEstimator {
737
738
private static _HISTORY_LENGTH = 10;
739
private static _INSTANCE: LoadEstimator | null = null;
740
public static getInstance(): LoadEstimator {
741
if (!LoadEstimator._INSTANCE) {
742
LoadEstimator._INSTANCE = new LoadEstimator();
743
}
744
return LoadEstimator._INSTANCE;
745
}
746
747
private lastRuns: number[];
748
749
constructor() {
750
this.lastRuns = [];
751
const now = Date.now();
752
for (let i = 0; i < LoadEstimator._HISTORY_LENGTH; i++) {
753
this.lastRuns[i] = now - 1000 * i;
754
}
755
setInterval(() => {
756
for (let i = LoadEstimator._HISTORY_LENGTH; i >= 1; i--) {
757
this.lastRuns[i] = this.lastRuns[i - 1];
758
}
759
this.lastRuns[0] = Date.now();
760
}, 1000);
761
}
762
763
/**
764
* returns an estimative number, from 0 (low load) to 1 (high load)
765
*/
766
private load(): number {
767
const now = Date.now();
768
const historyLimit = (1 + LoadEstimator._HISTORY_LENGTH) * 1000;
769
let score = 0;
770
for (let i = 0; i < LoadEstimator._HISTORY_LENGTH; i++) {
771
if (now - this.lastRuns[i] <= historyLimit) {
772
score++;
773
}
774
}
775
return 1 - score / LoadEstimator._HISTORY_LENGTH;
776
}
777
778
public hasHighLoad(): boolean {
779
return this.load() >= 0.5;
780
}
781
}
782
783
export interface ILoadEstimator {
784
hasHighLoad(): boolean;
785
}
786
787
export interface PersistentProtocolOptions {
788
/**
789
* The socket to use.
790
*/
791
socket: ISocket;
792
/**
793
* The initial chunk of data that has already been received from the socket.
794
*/
795
initialChunk?: VSBuffer | null;
796
/**
797
* The CPU load estimator to use.
798
*/
799
loadEstimator?: ILoadEstimator;
800
/**
801
* Whether to send keep alive messages. Defaults to true.
802
*/
803
sendKeepAlive?: boolean;
804
}
805
806
/**
807
* Same as Protocol, but will actually track messages and acks.
808
* Moreover, it will ensure no messages are lost if there are no event listeners.
809
*/
810
export class PersistentProtocol implements IMessagePassingProtocol {
811
812
private _isReconnecting: boolean;
813
private _didSendDisconnect?: boolean;
814
815
private _outgoingUnackMsg: Queue<ProtocolMessage>;
816
private _outgoingMsgId: number;
817
private _outgoingAckId: number;
818
private _outgoingAckTimeout: Timeout | null;
819
820
private _incomingMsgId: number;
821
private _incomingAckId: number;
822
private _incomingMsgLastTime: number;
823
private _incomingAckTimeout: Timeout | null;
824
825
private _keepAliveInterval: Timeout | null;
826
827
private _lastReplayRequestTime: number;
828
private _lastSocketTimeoutTime: number;
829
830
private _socket: ISocket;
831
private _socketWriter: ProtocolWriter;
832
private _socketReader: ProtocolReader;
833
// eslint-disable-next-line local/code-no-potentially-unsafe-disposables
834
private _socketDisposables: DisposableStore;
835
836
private readonly _loadEstimator: ILoadEstimator;
837
private readonly _shouldSendKeepAlive: boolean;
838
839
private readonly _onControlMessage = new BufferedEmitter<VSBuffer>();
840
readonly onControlMessage: Event<VSBuffer> = this._onControlMessage.event;
841
842
private readonly _onMessage = new BufferedEmitter<VSBuffer>();
843
readonly onMessage: Event<VSBuffer> = this._onMessage.event;
844
845
private readonly _onDidDispose = new BufferedEmitter<void>();
846
readonly onDidDispose: Event<void> = this._onDidDispose.event;
847
848
private readonly _onSocketClose = new BufferedEmitter<SocketCloseEvent>();
849
readonly onSocketClose: Event<SocketCloseEvent> = this._onSocketClose.event;
850
851
private readonly _onSocketTimeout = new BufferedEmitter<SocketTimeoutEvent>();
852
readonly onSocketTimeout: Event<SocketTimeoutEvent> = this._onSocketTimeout.event;
853
854
public get unacknowledgedCount(): number {
855
return this._outgoingMsgId - this._outgoingAckId;
856
}
857
858
constructor(opts: PersistentProtocolOptions) {
859
this._loadEstimator = opts.loadEstimator ?? LoadEstimator.getInstance();
860
this._shouldSendKeepAlive = opts.sendKeepAlive ?? true;
861
this._isReconnecting = false;
862
this._outgoingUnackMsg = new Queue<ProtocolMessage>();
863
this._outgoingMsgId = 0;
864
this._outgoingAckId = 0;
865
this._outgoingAckTimeout = null;
866
867
this._incomingMsgId = 0;
868
this._incomingAckId = 0;
869
this._incomingMsgLastTime = 0;
870
this._incomingAckTimeout = null;
871
872
this._lastReplayRequestTime = 0;
873
this._lastSocketTimeoutTime = Date.now();
874
875
this._socketDisposables = new DisposableStore();
876
this._socket = opts.socket;
877
this._socketWriter = this._socketDisposables.add(new ProtocolWriter(this._socket));
878
this._socketReader = this._socketDisposables.add(new ProtocolReader(this._socket));
879
this._socketDisposables.add(this._socketReader.onMessage(msg => this._receiveMessage(msg)));
880
this._socketDisposables.add(this._socket.onClose(e => this._onSocketClose.fire(e)));
881
882
if (opts.initialChunk) {
883
this._socketReader.acceptChunk(opts.initialChunk);
884
}
885
886
if (this._shouldSendKeepAlive) {
887
this._keepAliveInterval = setInterval(() => {
888
this._sendKeepAlive();
889
}, ProtocolConstants.KeepAliveSendTime);
890
} else {
891
this._keepAliveInterval = null;
892
}
893
}
894
895
dispose(): void {
896
if (this._outgoingAckTimeout) {
897
clearTimeout(this._outgoingAckTimeout);
898
this._outgoingAckTimeout = null;
899
}
900
if (this._incomingAckTimeout) {
901
clearTimeout(this._incomingAckTimeout);
902
this._incomingAckTimeout = null;
903
}
904
if (this._keepAliveInterval) {
905
clearInterval(this._keepAliveInterval);
906
this._keepAliveInterval = null;
907
}
908
this._socketDisposables.dispose();
909
}
910
911
drain(): Promise<void> {
912
return this._socketWriter.drain();
913
}
914
915
sendDisconnect(): void {
916
if (!this._didSendDisconnect) {
917
this._didSendDisconnect = true;
918
const msg = new ProtocolMessage(ProtocolMessageType.Disconnect, 0, 0, getEmptyBuffer());
919
this._socketWriter.write(msg);
920
this._socketWriter.flush();
921
}
922
}
923
924
sendPause(): void {
925
const msg = new ProtocolMessage(ProtocolMessageType.Pause, 0, 0, getEmptyBuffer());
926
this._socketWriter.write(msg);
927
}
928
929
sendResume(): void {
930
const msg = new ProtocolMessage(ProtocolMessageType.Resume, 0, 0, getEmptyBuffer());
931
this._socketWriter.write(msg);
932
}
933
934
pauseSocketWriting() {
935
this._socketWriter.pause();
936
}
937
938
public getSocket(): ISocket {
939
return this._socket;
940
}
941
942
public getMillisSinceLastIncomingData(): number {
943
return Date.now() - this._socketReader.lastReadTime;
944
}
945
946
public beginAcceptReconnection(socket: ISocket, initialDataChunk: VSBuffer | null): void {
947
this._isReconnecting = true;
948
949
this._socketDisposables.dispose();
950
this._socketDisposables = new DisposableStore();
951
this._onControlMessage.flushBuffer();
952
this._onSocketClose.flushBuffer();
953
this._onSocketTimeout.flushBuffer();
954
this._socket.dispose();
955
956
this._lastReplayRequestTime = 0;
957
this._lastSocketTimeoutTime = Date.now();
958
959
this._socket = socket;
960
this._socketWriter = this._socketDisposables.add(new ProtocolWriter(this._socket));
961
this._socketReader = this._socketDisposables.add(new ProtocolReader(this._socket));
962
this._socketDisposables.add(this._socketReader.onMessage(msg => this._receiveMessage(msg)));
963
this._socketDisposables.add(this._socket.onClose(e => this._onSocketClose.fire(e)));
964
965
this._socketReader.acceptChunk(initialDataChunk);
966
}
967
968
public endAcceptReconnection(): void {
969
this._isReconnecting = false;
970
971
// After a reconnection, let the other party know (again) which messages have been received.
972
// (perhaps the other party didn't receive a previous ACK)
973
this._incomingAckId = this._incomingMsgId;
974
const msg = new ProtocolMessage(ProtocolMessageType.Ack, 0, this._incomingAckId, getEmptyBuffer());
975
this._socketWriter.write(msg);
976
977
// Send again all unacknowledged messages
978
const toSend = this._outgoingUnackMsg.toArray();
979
for (let i = 0, len = toSend.length; i < len; i++) {
980
this._socketWriter.write(toSend[i]);
981
}
982
this._recvAckCheck();
983
}
984
985
public acceptDisconnect(): void {
986
this._onDidDispose.fire();
987
}
988
989
private _receiveMessage(msg: ProtocolMessage): void {
990
if (msg.ack > this._outgoingAckId) {
991
this._outgoingAckId = msg.ack;
992
do {
993
const first = this._outgoingUnackMsg.peek();
994
if (first && first.id <= msg.ack) {
995
// this message has been confirmed, remove it
996
this._outgoingUnackMsg.pop();
997
} else {
998
break;
999
}
1000
} while (true);
1001
}
1002
1003
switch (msg.type) {
1004
case ProtocolMessageType.None: {
1005
// N/A
1006
break;
1007
}
1008
case ProtocolMessageType.Regular: {
1009
if (msg.id > this._incomingMsgId) {
1010
if (msg.id !== this._incomingMsgId + 1) {
1011
// in case we missed some messages we ask the other party to resend them
1012
const now = Date.now();
1013
if (now - this._lastReplayRequestTime > 10000) {
1014
// send a replay request at most once every 10s
1015
this._lastReplayRequestTime = now;
1016
this._socketWriter.write(new ProtocolMessage(ProtocolMessageType.ReplayRequest, 0, 0, getEmptyBuffer()));
1017
}
1018
} else {
1019
this._incomingMsgId = msg.id;
1020
this._incomingMsgLastTime = Date.now();
1021
this._sendAckCheck();
1022
this._onMessage.fire(msg.data);
1023
}
1024
}
1025
break;
1026
}
1027
case ProtocolMessageType.Control: {
1028
this._onControlMessage.fire(msg.data);
1029
break;
1030
}
1031
case ProtocolMessageType.Ack: {
1032
// nothing to do, .ack is handled above already
1033
break;
1034
}
1035
case ProtocolMessageType.Disconnect: {
1036
this._onDidDispose.fire();
1037
break;
1038
}
1039
case ProtocolMessageType.ReplayRequest: {
1040
// Send again all unacknowledged messages
1041
const toSend = this._outgoingUnackMsg.toArray();
1042
for (let i = 0, len = toSend.length; i < len; i++) {
1043
this._socketWriter.write(toSend[i]);
1044
}
1045
this._recvAckCheck();
1046
break;
1047
}
1048
case ProtocolMessageType.Pause: {
1049
this._socketWriter.pause();
1050
break;
1051
}
1052
case ProtocolMessageType.Resume: {
1053
this._socketWriter.resume();
1054
break;
1055
}
1056
case ProtocolMessageType.KeepAlive: {
1057
// nothing to do
1058
break;
1059
}
1060
}
1061
}
1062
1063
readEntireBuffer(): VSBuffer {
1064
return this._socketReader.readEntireBuffer();
1065
}
1066
1067
flush(): void {
1068
this._socketWriter.flush();
1069
}
1070
1071
send(buffer: VSBuffer): void {
1072
const myId = ++this._outgoingMsgId;
1073
this._incomingAckId = this._incomingMsgId;
1074
const msg = new ProtocolMessage(ProtocolMessageType.Regular, myId, this._incomingAckId, buffer);
1075
this._outgoingUnackMsg.push(msg);
1076
if (!this._isReconnecting) {
1077
this._socketWriter.write(msg);
1078
this._recvAckCheck();
1079
}
1080
}
1081
1082
/**
1083
* Send a message which will not be part of the regular acknowledge flow.
1084
* Use this for early control messages which are repeated in case of reconnection.
1085
*/
1086
sendControl(buffer: VSBuffer): void {
1087
const msg = new ProtocolMessage(ProtocolMessageType.Control, 0, 0, buffer);
1088
this._socketWriter.write(msg);
1089
}
1090
1091
private _sendAckCheck(): void {
1092
if (this._incomingMsgId <= this._incomingAckId) {
1093
// nothink to acknowledge
1094
return;
1095
}
1096
1097
if (this._incomingAckTimeout) {
1098
// there will be a check in the near future
1099
return;
1100
}
1101
1102
const timeSinceLastIncomingMsg = Date.now() - this._incomingMsgLastTime;
1103
if (timeSinceLastIncomingMsg >= ProtocolConstants.AcknowledgeTime) {
1104
// sufficient time has passed since this message has been received,
1105
// and no message from our side needed to be sent in the meantime,
1106
// so we will send a message containing only an ack.
1107
this._sendAck();
1108
return;
1109
}
1110
1111
this._incomingAckTimeout = setTimeout(() => {
1112
this._incomingAckTimeout = null;
1113
this._sendAckCheck();
1114
}, ProtocolConstants.AcknowledgeTime - timeSinceLastIncomingMsg + 5);
1115
}
1116
1117
private _recvAckCheck(): void {
1118
if (this._outgoingMsgId <= this._outgoingAckId) {
1119
// everything has been acknowledged
1120
return;
1121
}
1122
1123
if (this._outgoingAckTimeout) {
1124
// there will be a check in the near future
1125
return;
1126
}
1127
1128
if (this._isReconnecting) {
1129
// do not cause a timeout during reconnection,
1130
// because messages will not be actually written until `endAcceptReconnection`
1131
return;
1132
}
1133
1134
const oldestUnacknowledgedMsg = this._outgoingUnackMsg.peek()!;
1135
const timeSinceOldestUnacknowledgedMsg = Date.now() - oldestUnacknowledgedMsg.writtenTime;
1136
const timeSinceLastReceivedSomeData = Date.now() - this._socketReader.lastReadTime;
1137
const timeSinceLastTimeout = Date.now() - this._lastSocketTimeoutTime;
1138
1139
if (
1140
timeSinceOldestUnacknowledgedMsg >= ProtocolConstants.TimeoutTime
1141
&& timeSinceLastReceivedSomeData >= ProtocolConstants.TimeoutTime
1142
&& timeSinceLastTimeout >= ProtocolConstants.TimeoutTime
1143
) {
1144
// It's been a long time since our sent message was acknowledged
1145
// and a long time since we received some data
1146
1147
// But this might be caused by the event loop being busy and failing to read messages
1148
if (!this._loadEstimator.hasHighLoad()) {
1149
// Trash the socket
1150
this._lastSocketTimeoutTime = Date.now();
1151
this._onSocketTimeout.fire({
1152
unacknowledgedMsgCount: this._outgoingUnackMsg.length(),
1153
timeSinceOldestUnacknowledgedMsg,
1154
timeSinceLastReceivedSomeData
1155
});
1156
return;
1157
}
1158
}
1159
1160
const minimumTimeUntilTimeout = Math.max(
1161
ProtocolConstants.TimeoutTime - timeSinceOldestUnacknowledgedMsg,
1162
ProtocolConstants.TimeoutTime - timeSinceLastReceivedSomeData,
1163
ProtocolConstants.TimeoutTime - timeSinceLastTimeout,
1164
500
1165
);
1166
1167
this._outgoingAckTimeout = setTimeout(() => {
1168
this._outgoingAckTimeout = null;
1169
this._recvAckCheck();
1170
}, minimumTimeUntilTimeout);
1171
}
1172
1173
private _sendAck(): void {
1174
if (this._incomingMsgId <= this._incomingAckId) {
1175
// nothink to acknowledge
1176
return;
1177
}
1178
1179
this._incomingAckId = this._incomingMsgId;
1180
const msg = new ProtocolMessage(ProtocolMessageType.Ack, 0, this._incomingAckId, getEmptyBuffer());
1181
this._socketWriter.write(msg);
1182
}
1183
1184
private _sendKeepAlive(): void {
1185
this._incomingAckId = this._incomingMsgId;
1186
const msg = new ProtocolMessage(ProtocolMessageType.KeepAlive, 0, this._incomingAckId, getEmptyBuffer());
1187
this._socketWriter.write(msg);
1188
}
1189
}
1190
1191