Path: blob/main/src/vs/base/parts/ipc/common/ipc.net.ts
4780 views
/*---------------------------------------------------------------------------------------------1* Copyright (c) Microsoft Corporation. All rights reserved.2* Licensed under the MIT License. See License.txt in the project root for license information.3*--------------------------------------------------------------------------------------------*/45import { VSBuffer } from '../../../common/buffer.js';6import { Emitter, Event } from '../../../common/event.js';7import { Disposable, DisposableStore, IDisposable } from '../../../common/lifecycle.js';8import { IIPCLogger, IMessagePassingProtocol, IPCClient } from './ipc.js';910export const enum SocketDiagnosticsEventType {11Created = 'created',12Read = 'read',13Write = 'write',14Open = 'open',15Error = 'error',16Close = 'close',1718BrowserWebSocketBlobReceived = 'browserWebSocketBlobReceived',1920NodeEndReceived = 'nodeEndReceived',21NodeEndSent = 'nodeEndSent',22NodeDrainBegin = 'nodeDrainBegin',23NodeDrainEnd = 'nodeDrainEnd',2425zlibInflateError = 'zlibInflateError',26zlibInflateData = 'zlibInflateData',27zlibInflateInitialWrite = 'zlibInflateInitialWrite',28zlibInflateInitialFlushFired = 'zlibInflateInitialFlushFired',29zlibInflateWrite = 'zlibInflateWrite',30zlibInflateFlushFired = 'zlibInflateFlushFired',31zlibDeflateError = 'zlibDeflateError',32zlibDeflateData = 'zlibDeflateData',33zlibDeflateWrite = 'zlibDeflateWrite',34zlibDeflateFlushFired = 'zlibDeflateFlushFired',3536WebSocketNodeSocketWrite = 'webSocketNodeSocketWrite',37WebSocketNodeSocketPeekedHeader = 'webSocketNodeSocketPeekedHeader',38WebSocketNodeSocketReadHeader = 'webSocketNodeSocketReadHeader',39WebSocketNodeSocketReadData = 'webSocketNodeSocketReadData',40WebSocketNodeSocketUnmaskedData = 'webSocketNodeSocketUnmaskedData',41WebSocketNodeSocketDrainBegin = 'webSocketNodeSocketDrainBegin',42WebSocketNodeSocketDrainEnd = 'webSocketNodeSocketDrainEnd',4344ProtocolHeaderRead = 'protocolHeaderRead',45ProtocolMessageRead = 'protocolMessageRead',46ProtocolHeaderWrite = 'protocolHeaderWrite',47ProtocolMessageWrite = 'protocolMessageWrite',48ProtocolWrite = 'protocolWrite',49}5051export namespace SocketDiagnostics {5253export const enableDiagnostics = false;5455export interface IRecord {56timestamp: number;57id: string;58label: string;59type: SocketDiagnosticsEventType;60buff?: VSBuffer;61data?: any;62}6364export const records: IRecord[] = [];65const socketIds = new WeakMap<any, string>();66let lastUsedSocketId = 0;6768function getSocketId(nativeObject: unknown, label: string): string {69if (!socketIds.has(nativeObject)) {70const id = String(++lastUsedSocketId);71socketIds.set(nativeObject, id);72}73return socketIds.get(nativeObject)!;74}7576export function traceSocketEvent(nativeObject: unknown, socketDebugLabel: string, type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | any): void {77if (!enableDiagnostics) {78return;79}80const id = getSocketId(nativeObject, socketDebugLabel);8182if (data instanceof VSBuffer || data instanceof Uint8Array || data instanceof ArrayBuffer || ArrayBuffer.isView(data)) {83const copiedData = VSBuffer.alloc(data.byteLength);84copiedData.set(data);85records.push({ timestamp: Date.now(), id, label: socketDebugLabel, type, buff: copiedData });86} else {87// data is a custom object88records.push({ timestamp: Date.now(), id, label: socketDebugLabel, type, data: data });89}90}91}9293export const enum SocketCloseEventType {94NodeSocketCloseEvent = 0,95WebSocketCloseEvent = 196}9798export interface NodeSocketCloseEvent {99/**100* The type of the event101*/102readonly type: SocketCloseEventType.NodeSocketCloseEvent;103/**104* `true` if the socket had a transmission error.105*/106readonly hadError: boolean;107/**108* Underlying error.109*/110readonly error: Error | undefined;111}112113export interface WebSocketCloseEvent {114/**115* The type of the event116*/117readonly type: SocketCloseEventType.WebSocketCloseEvent;118/**119* Returns the WebSocket connection close code provided by the server.120*/121readonly code: number;122/**123* Returns the WebSocket connection close reason provided by the server.124*/125readonly reason: string;126/**127* Returns true if the connection closed cleanly; false otherwise.128*/129readonly wasClean: boolean;130/**131* Underlying event.132*/133readonly event: any | undefined;134}135136export type SocketCloseEvent = NodeSocketCloseEvent | WebSocketCloseEvent | undefined;137138export interface SocketTimeoutEvent {139readonly unacknowledgedMsgCount: number;140readonly timeSinceOldestUnacknowledgedMsg: number;141readonly timeSinceLastReceivedSomeData: number;142}143144export interface ISocket extends IDisposable {145onData(listener: (e: VSBuffer) => void): IDisposable;146onClose(listener: (e: SocketCloseEvent) => void): IDisposable;147onEnd(listener: () => void): IDisposable;148write(buffer: VSBuffer): void;149end(): void;150drain(): Promise<void>;151152traceSocketEvent(type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | any): void;153}154155let emptyBuffer: VSBuffer | null = null;156function getEmptyBuffer(): VSBuffer {157if (!emptyBuffer) {158emptyBuffer = VSBuffer.alloc(0);159}160return emptyBuffer;161}162163export class ChunkStream {164165private _chunks: VSBuffer[];166private _totalLength: number;167168public get byteLength() {169return this._totalLength;170}171172constructor() {173this._chunks = [];174this._totalLength = 0;175}176177public acceptChunk(buff: VSBuffer) {178this._chunks.push(buff);179this._totalLength += buff.byteLength;180}181182public read(byteCount: number): VSBuffer {183return this._read(byteCount, true);184}185186public peek(byteCount: number): VSBuffer {187return this._read(byteCount, false);188}189190private _read(byteCount: number, advance: boolean): VSBuffer {191192if (byteCount === 0) {193return getEmptyBuffer();194}195196if (byteCount > this._totalLength) {197throw new Error(`Cannot read so many bytes!`);198}199200if (this._chunks[0].byteLength === byteCount) {201// super fast path, precisely first chunk must be returned202const result = this._chunks[0];203if (advance) {204this._chunks.shift();205this._totalLength -= byteCount;206}207return result;208}209210if (this._chunks[0].byteLength > byteCount) {211// fast path, the reading is entirely within the first chunk212const result = this._chunks[0].slice(0, byteCount);213if (advance) {214this._chunks[0] = this._chunks[0].slice(byteCount);215this._totalLength -= byteCount;216}217return result;218}219220const result = VSBuffer.alloc(byteCount);221let resultOffset = 0;222let chunkIndex = 0;223while (byteCount > 0) {224const chunk = this._chunks[chunkIndex];225if (chunk.byteLength > byteCount) {226// this chunk will survive227const chunkPart = chunk.slice(0, byteCount);228result.set(chunkPart, resultOffset);229resultOffset += byteCount;230231if (advance) {232this._chunks[chunkIndex] = chunk.slice(byteCount);233this._totalLength -= byteCount;234}235236byteCount -= byteCount;237} else {238// this chunk will be entirely read239result.set(chunk, resultOffset);240resultOffset += chunk.byteLength;241242if (advance) {243this._chunks.shift();244this._totalLength -= chunk.byteLength;245} else {246chunkIndex++;247}248249byteCount -= chunk.byteLength;250}251}252return result;253}254}255256const enum ProtocolMessageType {257None = 0,258Regular = 1,259Control = 2,260Ack = 3,261Disconnect = 5,262ReplayRequest = 6,263Pause = 7,264Resume = 8,265KeepAlive = 9266}267268function protocolMessageTypeToString(messageType: ProtocolMessageType) {269switch (messageType) {270case ProtocolMessageType.None: return 'None';271case ProtocolMessageType.Regular: return 'Regular';272case ProtocolMessageType.Control: return 'Control';273case ProtocolMessageType.Ack: return 'Ack';274case ProtocolMessageType.Disconnect: return 'Disconnect';275case ProtocolMessageType.ReplayRequest: return 'ReplayRequest';276case ProtocolMessageType.Pause: return 'PauseWriting';277case ProtocolMessageType.Resume: return 'ResumeWriting';278case ProtocolMessageType.KeepAlive: return 'KeepAlive';279}280}281282export const enum ProtocolConstants {283HeaderLength = 13,284/**285* Send an Acknowledge message at most 2 seconds later...286*/287AcknowledgeTime = 2000, // 2 seconds288/**289* If there is a sent message that has been unacknowledged for 20 seconds,290* and we didn't see any incoming server data in the past 20 seconds,291* then consider the connection has timed out.292*/293TimeoutTime = 20000, // 20 seconds294/**295* If there is no reconnection within this time-frame, consider the connection permanently closed...296*/297ReconnectionGraceTime = 3 * 60 * 60 * 1000, // 3hrs298/**299* Maximal grace time between the first and the last reconnection...300*/301ReconnectionShortGraceTime = 5 * 60 * 1000, // 5min302/**303* Send a message every 5 seconds to avoid that the connection is closed by the OS.304*/305KeepAliveSendTime = 5000, // 5 seconds306}307308class ProtocolMessage {309310public writtenTime: number;311312constructor(313public readonly type: ProtocolMessageType,314public readonly id: number,315public readonly ack: number,316public readonly data: VSBuffer317) {318this.writtenTime = 0;319}320321public get size(): number {322return this.data.byteLength;323}324}325326class ProtocolReader extends Disposable {327328private readonly _socket: ISocket;329private _isDisposed: boolean;330private readonly _incomingData: ChunkStream;331public lastReadTime: number;332333private readonly _onMessage = this._register(new Emitter<ProtocolMessage>());334public readonly onMessage: Event<ProtocolMessage> = this._onMessage.event;335336private readonly _state = {337readHead: true,338readLen: ProtocolConstants.HeaderLength,339messageType: ProtocolMessageType.None,340id: 0,341ack: 0342};343344constructor(socket: ISocket) {345super();346this._socket = socket;347this._isDisposed = false;348this._incomingData = new ChunkStream();349this._register(this._socket.onData(data => this.acceptChunk(data)));350this.lastReadTime = Date.now();351}352353public acceptChunk(data: VSBuffer | null): void {354if (!data || data.byteLength === 0) {355return;356}357358this.lastReadTime = Date.now();359360this._incomingData.acceptChunk(data);361362while (this._incomingData.byteLength >= this._state.readLen) {363364const buff = this._incomingData.read(this._state.readLen);365366if (this._state.readHead) {367// buff is the header368369// save new state => next time will read the body370this._state.readHead = false;371this._state.readLen = buff.readUInt32BE(9);372this._state.messageType = buff.readUInt8(0);373this._state.id = buff.readUInt32BE(1);374this._state.ack = buff.readUInt32BE(5);375376this._socket.traceSocketEvent(SocketDiagnosticsEventType.ProtocolHeaderRead, { messageType: protocolMessageTypeToString(this._state.messageType), id: this._state.id, ack: this._state.ack, messageSize: this._state.readLen });377378} else {379// buff is the body380const messageType = this._state.messageType;381const id = this._state.id;382const ack = this._state.ack;383384// save new state => next time will read the header385this._state.readHead = true;386this._state.readLen = ProtocolConstants.HeaderLength;387this._state.messageType = ProtocolMessageType.None;388this._state.id = 0;389this._state.ack = 0;390391this._socket.traceSocketEvent(SocketDiagnosticsEventType.ProtocolMessageRead, buff);392393this._onMessage.fire(new ProtocolMessage(messageType, id, ack, buff));394395if (this._isDisposed) {396// check if an event listener lead to our disposal397break;398}399}400}401}402403public readEntireBuffer(): VSBuffer {404return this._incomingData.read(this._incomingData.byteLength);405}406407public override dispose(): void {408this._isDisposed = true;409super.dispose();410}411}412413class ProtocolWriter {414415private _isDisposed: boolean;416private _isPaused: boolean;417private readonly _socket: ISocket;418private _data: VSBuffer[];419private _totalLength: number;420public lastWriteTime: number;421422constructor(socket: ISocket) {423this._isDisposed = false;424this._isPaused = false;425this._socket = socket;426this._data = [];427this._totalLength = 0;428this.lastWriteTime = 0;429}430431public dispose(): void {432try {433this.flush();434} catch (err) {435// ignore error, since the socket could be already closed436}437this._isDisposed = true;438}439440public drain(): Promise<void> {441this.flush();442return this._socket.drain();443}444445public flush(): void {446// flush447this._writeNow();448}449450public pause(): void {451this._isPaused = true;452}453454public resume(): void {455this._isPaused = false;456this._scheduleWriting();457}458459public write(msg: ProtocolMessage) {460if (this._isDisposed) {461// ignore: there could be left-over promises which complete and then462// decide to write a response, etc...463return;464}465msg.writtenTime = Date.now();466this.lastWriteTime = Date.now();467const header = VSBuffer.alloc(ProtocolConstants.HeaderLength);468header.writeUInt8(msg.type, 0);469header.writeUInt32BE(msg.id, 1);470header.writeUInt32BE(msg.ack, 5);471header.writeUInt32BE(msg.data.byteLength, 9);472473this._socket.traceSocketEvent(SocketDiagnosticsEventType.ProtocolHeaderWrite, { messageType: protocolMessageTypeToString(msg.type), id: msg.id, ack: msg.ack, messageSize: msg.data.byteLength });474this._socket.traceSocketEvent(SocketDiagnosticsEventType.ProtocolMessageWrite, msg.data);475476this._writeSoon(header, msg.data);477}478479private _bufferAdd(head: VSBuffer, body: VSBuffer): boolean {480const wasEmpty = this._totalLength === 0;481this._data.push(head, body);482this._totalLength += head.byteLength + body.byteLength;483return wasEmpty;484}485486private _bufferTake(): VSBuffer {487const ret = VSBuffer.concat(this._data, this._totalLength);488this._data.length = 0;489this._totalLength = 0;490return ret;491}492493private _writeSoon(header: VSBuffer, data: VSBuffer): void {494if (this._bufferAdd(header, data)) {495this._scheduleWriting();496}497}498499private _writeNowTimeout: Timeout | null = null;500private _scheduleWriting(): void {501if (this._writeNowTimeout) {502return;503}504this._writeNowTimeout = setTimeout(() => {505this._writeNowTimeout = null;506this._writeNow();507});508}509510private _writeNow(): void {511if (this._totalLength === 0) {512return;513}514if (this._isPaused) {515return;516}517const data = this._bufferTake();518this._socket.traceSocketEvent(SocketDiagnosticsEventType.ProtocolWrite, { byteLength: data.byteLength });519this._socket.write(data);520}521}522523/**524* A message has the following format:525* ```526* /-------------------------------|------\527* | HEADER | |528* |-------------------------------| DATA |529* | TYPE | ID | ACK | DATA_LENGTH | |530* \-------------------------------|------/531* ```532* The header is 9 bytes and consists of:533* - TYPE is 1 byte (ProtocolMessageType) - the message type534* - ID is 4 bytes (u32be) - the message id (can be 0 to indicate to be ignored)535* - ACK is 4 bytes (u32be) - the acknowledged message id (can be 0 to indicate to be ignored)536* - DATA_LENGTH is 4 bytes (u32be) - the length in bytes of DATA537*538* Only Regular messages are counted, other messages are not counted, nor acknowledged.539*/540export class Protocol extends Disposable implements IMessagePassingProtocol {541542private _socket: ISocket;543private _socketWriter: ProtocolWriter;544private _socketReader: ProtocolReader;545546private readonly _onMessage = new Emitter<VSBuffer>();547readonly onMessage: Event<VSBuffer> = this._onMessage.event;548549private readonly _onDidDispose = new Emitter<void>();550readonly onDidDispose: Event<void> = this._onDidDispose.event;551552constructor(socket: ISocket) {553super();554this._socket = socket;555this._socketWriter = this._register(new ProtocolWriter(this._socket));556this._socketReader = this._register(new ProtocolReader(this._socket));557558this._register(this._socketReader.onMessage((msg) => {559if (msg.type === ProtocolMessageType.Regular) {560this._onMessage.fire(msg.data);561}562}));563564this._register(this._socket.onClose(() => this._onDidDispose.fire()));565}566567drain(): Promise<void> {568return this._socketWriter.drain();569}570571getSocket(): ISocket {572return this._socket;573}574575sendDisconnect(): void {576// Nothing to do...577}578579send(buffer: VSBuffer): void {580this._socketWriter.write(new ProtocolMessage(ProtocolMessageType.Regular, 0, 0, buffer));581}582}583584export class Client<TContext = string> extends IPCClient<TContext> {585586static fromSocket<TContext = string>(socket: ISocket, id: TContext): Client<TContext> {587return new Client(new Protocol(socket), id);588}589590get onDidDispose(): Event<void> { return this.protocol.onDidDispose; }591592constructor(private protocol: Protocol | PersistentProtocol, id: TContext, ipcLogger: IIPCLogger | null = null) {593super(protocol, id, ipcLogger);594}595596override dispose(): void {597super.dispose();598const socket = this.protocol.getSocket();599// should be sent gracefully with a .flush(), but try to send it out as a600// last resort here if nothing else:601this.protocol.sendDisconnect();602this.protocol.dispose();603socket.end();604}605}606607/**608* Will ensure no messages are lost if there are no event listeners.609*/610export class BufferedEmitter<T> {611private _emitter: Emitter<T>;612public readonly event: Event<T>;613614private _hasListeners = false;615private _isDeliveringMessages = false;616private _bufferedMessages: T[] = [];617618constructor() {619this._emitter = new Emitter<T>({620onWillAddFirstListener: () => {621this._hasListeners = true;622// it is important to deliver these messages after this call, but before623// other messages have a chance to be received (to guarantee in order delivery)624// that's why we're using here queueMicrotask and not other types of timeouts625queueMicrotask(() => this._deliverMessages());626},627onDidRemoveLastListener: () => {628this._hasListeners = false;629}630});631632this.event = this._emitter.event;633}634635private _deliverMessages(): void {636if (this._isDeliveringMessages) {637return;638}639this._isDeliveringMessages = true;640while (this._hasListeners && this._bufferedMessages.length > 0) {641this._emitter.fire(this._bufferedMessages.shift()!);642}643this._isDeliveringMessages = false;644}645646public fire(event: T): void {647if (this._hasListeners) {648if (this._bufferedMessages.length > 0) {649this._bufferedMessages.push(event);650} else {651this._emitter.fire(event);652}653} else {654this._bufferedMessages.push(event);655}656}657658public flushBuffer(): void {659this._bufferedMessages = [];660}661}662663class QueueElement<T> {664public readonly data: T;665public next: QueueElement<T> | null;666667constructor(data: T) {668this.data = data;669this.next = null;670}671}672673class Queue<T> {674675private _first: QueueElement<T> | null;676private _last: QueueElement<T> | null;677678constructor() {679this._first = null;680this._last = null;681}682683public length(): number {684let result = 0;685let current = this._first;686while (current) {687current = current.next;688result++;689}690return result;691}692693public peek(): T | null {694if (!this._first) {695return null;696}697return this._first.data;698}699700public toArray(): T[] {701const result: T[] = [];702let resultLen = 0;703let it = this._first;704while (it) {705result[resultLen++] = it.data;706it = it.next;707}708return result;709}710711public pop(): void {712if (!this._first) {713return;714}715if (this._first === this._last) {716this._first = null;717this._last = null;718return;719}720this._first = this._first.next;721}722723public push(item: T): void {724const element = new QueueElement(item);725if (!this._first) {726this._first = element;727this._last = element;728return;729}730this._last!.next = element;731this._last = element;732}733}734735class LoadEstimator {736737private static _HISTORY_LENGTH = 10;738private static _INSTANCE: LoadEstimator | null = null;739public static getInstance(): LoadEstimator {740if (!LoadEstimator._INSTANCE) {741LoadEstimator._INSTANCE = new LoadEstimator();742}743return LoadEstimator._INSTANCE;744}745746private lastRuns: number[];747748constructor() {749this.lastRuns = [];750const now = Date.now();751for (let i = 0; i < LoadEstimator._HISTORY_LENGTH; i++) {752this.lastRuns[i] = now - 1000 * i;753}754setInterval(() => {755for (let i = LoadEstimator._HISTORY_LENGTH; i >= 1; i--) {756this.lastRuns[i] = this.lastRuns[i - 1];757}758this.lastRuns[0] = Date.now();759}, 1000);760}761762/**763* returns an estimative number, from 0 (low load) to 1 (high load)764*/765private load(): number {766const now = Date.now();767const historyLimit = (1 + LoadEstimator._HISTORY_LENGTH) * 1000;768let score = 0;769for (let i = 0; i < LoadEstimator._HISTORY_LENGTH; i++) {770if (now - this.lastRuns[i] <= historyLimit) {771score++;772}773}774return 1 - score / LoadEstimator._HISTORY_LENGTH;775}776777public hasHighLoad(): boolean {778return this.load() >= 0.5;779}780}781782export interface ILoadEstimator {783hasHighLoad(): boolean;784}785786export interface PersistentProtocolOptions {787/**788* The socket to use.789*/790socket: ISocket;791/**792* The initial chunk of data that has already been received from the socket.793*/794initialChunk?: VSBuffer | null;795/**796* The CPU load estimator to use.797*/798loadEstimator?: ILoadEstimator;799/**800* Whether to send keep alive messages. Defaults to true.801*/802sendKeepAlive?: boolean;803}804805/**806* Same as Protocol, but will actually track messages and acks.807* Moreover, it will ensure no messages are lost if there are no event listeners.808*/809export class PersistentProtocol implements IMessagePassingProtocol {810811private _isReconnecting: boolean;812private _didSendDisconnect?: boolean;813814private _outgoingUnackMsg: Queue<ProtocolMessage>;815private _outgoingMsgId: number;816private _outgoingAckId: number;817private _outgoingAckTimeout: Timeout | null;818819private _incomingMsgId: number;820private _incomingAckId: number;821private _incomingMsgLastTime: number;822private _incomingAckTimeout: Timeout | null;823824private _keepAliveInterval: Timeout | null;825826private _lastReplayRequestTime: number;827private _lastSocketTimeoutTime: number;828829private _socket: ISocket;830private _socketWriter: ProtocolWriter;831private _socketReader: ProtocolReader;832// eslint-disable-next-line local/code-no-potentially-unsafe-disposables833private _socketDisposables: DisposableStore;834835private readonly _loadEstimator: ILoadEstimator;836private readonly _shouldSendKeepAlive: boolean;837838private readonly _onControlMessage = new BufferedEmitter<VSBuffer>();839readonly onControlMessage: Event<VSBuffer> = this._onControlMessage.event;840841private readonly _onMessage = new BufferedEmitter<VSBuffer>();842readonly onMessage: Event<VSBuffer> = this._onMessage.event;843844private readonly _onDidDispose = new BufferedEmitter<void>();845readonly onDidDispose: Event<void> = this._onDidDispose.event;846847private readonly _onSocketClose = new BufferedEmitter<SocketCloseEvent>();848readonly onSocketClose: Event<SocketCloseEvent> = this._onSocketClose.event;849850private readonly _onSocketTimeout = new BufferedEmitter<SocketTimeoutEvent>();851readonly onSocketTimeout: Event<SocketTimeoutEvent> = this._onSocketTimeout.event;852853public get unacknowledgedCount(): number {854return this._outgoingMsgId - this._outgoingAckId;855}856857constructor(opts: PersistentProtocolOptions) {858this._loadEstimator = opts.loadEstimator ?? LoadEstimator.getInstance();859this._shouldSendKeepAlive = opts.sendKeepAlive ?? true;860this._isReconnecting = false;861this._outgoingUnackMsg = new Queue<ProtocolMessage>();862this._outgoingMsgId = 0;863this._outgoingAckId = 0;864this._outgoingAckTimeout = null;865866this._incomingMsgId = 0;867this._incomingAckId = 0;868this._incomingMsgLastTime = 0;869this._incomingAckTimeout = null;870871this._lastReplayRequestTime = 0;872this._lastSocketTimeoutTime = Date.now();873874this._socketDisposables = new DisposableStore();875this._socket = opts.socket;876this._socketWriter = this._socketDisposables.add(new ProtocolWriter(this._socket));877this._socketReader = this._socketDisposables.add(new ProtocolReader(this._socket));878this._socketDisposables.add(this._socketReader.onMessage(msg => this._receiveMessage(msg)));879this._socketDisposables.add(this._socket.onClose(e => this._onSocketClose.fire(e)));880881if (opts.initialChunk) {882this._socketReader.acceptChunk(opts.initialChunk);883}884885if (this._shouldSendKeepAlive) {886this._keepAliveInterval = setInterval(() => {887this._sendKeepAlive();888}, ProtocolConstants.KeepAliveSendTime);889} else {890this._keepAliveInterval = null;891}892}893894dispose(): void {895if (this._outgoingAckTimeout) {896clearTimeout(this._outgoingAckTimeout);897this._outgoingAckTimeout = null;898}899if (this._incomingAckTimeout) {900clearTimeout(this._incomingAckTimeout);901this._incomingAckTimeout = null;902}903if (this._keepAliveInterval) {904clearInterval(this._keepAliveInterval);905this._keepAliveInterval = null;906}907this._socketDisposables.dispose();908}909910drain(): Promise<void> {911return this._socketWriter.drain();912}913914sendDisconnect(): void {915if (!this._didSendDisconnect) {916this._didSendDisconnect = true;917const msg = new ProtocolMessage(ProtocolMessageType.Disconnect, 0, 0, getEmptyBuffer());918this._socketWriter.write(msg);919this._socketWriter.flush();920}921}922923sendPause(): void {924const msg = new ProtocolMessage(ProtocolMessageType.Pause, 0, 0, getEmptyBuffer());925this._socketWriter.write(msg);926}927928sendResume(): void {929const msg = new ProtocolMessage(ProtocolMessageType.Resume, 0, 0, getEmptyBuffer());930this._socketWriter.write(msg);931}932933pauseSocketWriting() {934this._socketWriter.pause();935}936937public getSocket(): ISocket {938return this._socket;939}940941public getMillisSinceLastIncomingData(): number {942return Date.now() - this._socketReader.lastReadTime;943}944945public beginAcceptReconnection(socket: ISocket, initialDataChunk: VSBuffer | null): void {946this._isReconnecting = true;947948this._socketDisposables.dispose();949this._socketDisposables = new DisposableStore();950this._onControlMessage.flushBuffer();951this._onSocketClose.flushBuffer();952this._onSocketTimeout.flushBuffer();953this._socket.dispose();954955this._lastReplayRequestTime = 0;956this._lastSocketTimeoutTime = Date.now();957958this._socket = socket;959this._socketWriter = this._socketDisposables.add(new ProtocolWriter(this._socket));960this._socketReader = this._socketDisposables.add(new ProtocolReader(this._socket));961this._socketDisposables.add(this._socketReader.onMessage(msg => this._receiveMessage(msg)));962this._socketDisposables.add(this._socket.onClose(e => this._onSocketClose.fire(e)));963964this._socketReader.acceptChunk(initialDataChunk);965}966967public endAcceptReconnection(): void {968this._isReconnecting = false;969970// After a reconnection, let the other party know (again) which messages have been received.971// (perhaps the other party didn't receive a previous ACK)972this._incomingAckId = this._incomingMsgId;973const msg = new ProtocolMessage(ProtocolMessageType.Ack, 0, this._incomingAckId, getEmptyBuffer());974this._socketWriter.write(msg);975976// Send again all unacknowledged messages977const toSend = this._outgoingUnackMsg.toArray();978for (let i = 0, len = toSend.length; i < len; i++) {979this._socketWriter.write(toSend[i]);980}981this._recvAckCheck();982}983984public acceptDisconnect(): void {985this._onDidDispose.fire();986}987988private _receiveMessage(msg: ProtocolMessage): void {989if (msg.ack > this._outgoingAckId) {990this._outgoingAckId = msg.ack;991do {992const first = this._outgoingUnackMsg.peek();993if (first && first.id <= msg.ack) {994// this message has been confirmed, remove it995this._outgoingUnackMsg.pop();996} else {997break;998}999} while (true);1000}10011002switch (msg.type) {1003case ProtocolMessageType.None: {1004// N/A1005break;1006}1007case ProtocolMessageType.Regular: {1008if (msg.id > this._incomingMsgId) {1009if (msg.id !== this._incomingMsgId + 1) {1010// in case we missed some messages we ask the other party to resend them1011const now = Date.now();1012if (now - this._lastReplayRequestTime > 10000) {1013// send a replay request at most once every 10s1014this._lastReplayRequestTime = now;1015this._socketWriter.write(new ProtocolMessage(ProtocolMessageType.ReplayRequest, 0, 0, getEmptyBuffer()));1016}1017} else {1018this._incomingMsgId = msg.id;1019this._incomingMsgLastTime = Date.now();1020this._sendAckCheck();1021this._onMessage.fire(msg.data);1022}1023}1024break;1025}1026case ProtocolMessageType.Control: {1027this._onControlMessage.fire(msg.data);1028break;1029}1030case ProtocolMessageType.Ack: {1031// nothing to do, .ack is handled above already1032break;1033}1034case ProtocolMessageType.Disconnect: {1035this._onDidDispose.fire();1036break;1037}1038case ProtocolMessageType.ReplayRequest: {1039// Send again all unacknowledged messages1040const toSend = this._outgoingUnackMsg.toArray();1041for (let i = 0, len = toSend.length; i < len; i++) {1042this._socketWriter.write(toSend[i]);1043}1044this._recvAckCheck();1045break;1046}1047case ProtocolMessageType.Pause: {1048this._socketWriter.pause();1049break;1050}1051case ProtocolMessageType.Resume: {1052this._socketWriter.resume();1053break;1054}1055case ProtocolMessageType.KeepAlive: {1056// nothing to do1057break;1058}1059}1060}10611062readEntireBuffer(): VSBuffer {1063return this._socketReader.readEntireBuffer();1064}10651066flush(): void {1067this._socketWriter.flush();1068}10691070send(buffer: VSBuffer): void {1071const myId = ++this._outgoingMsgId;1072this._incomingAckId = this._incomingMsgId;1073const msg = new ProtocolMessage(ProtocolMessageType.Regular, myId, this._incomingAckId, buffer);1074this._outgoingUnackMsg.push(msg);1075if (!this._isReconnecting) {1076this._socketWriter.write(msg);1077this._recvAckCheck();1078}1079}10801081/**1082* Send a message which will not be part of the regular acknowledge flow.1083* Use this for early control messages which are repeated in case of reconnection.1084*/1085sendControl(buffer: VSBuffer): void {1086const msg = new ProtocolMessage(ProtocolMessageType.Control, 0, 0, buffer);1087this._socketWriter.write(msg);1088}10891090private _sendAckCheck(): void {1091if (this._incomingMsgId <= this._incomingAckId) {1092// nothink to acknowledge1093return;1094}10951096if (this._incomingAckTimeout) {1097// there will be a check in the near future1098return;1099}11001101const timeSinceLastIncomingMsg = Date.now() - this._incomingMsgLastTime;1102if (timeSinceLastIncomingMsg >= ProtocolConstants.AcknowledgeTime) {1103// sufficient time has passed since this message has been received,1104// and no message from our side needed to be sent in the meantime,1105// so we will send a message containing only an ack.1106this._sendAck();1107return;1108}11091110this._incomingAckTimeout = setTimeout(() => {1111this._incomingAckTimeout = null;1112this._sendAckCheck();1113}, ProtocolConstants.AcknowledgeTime - timeSinceLastIncomingMsg + 5);1114}11151116private _recvAckCheck(): void {1117if (this._outgoingMsgId <= this._outgoingAckId) {1118// everything has been acknowledged1119return;1120}11211122if (this._outgoingAckTimeout) {1123// there will be a check in the near future1124return;1125}11261127if (this._isReconnecting) {1128// do not cause a timeout during reconnection,1129// because messages will not be actually written until `endAcceptReconnection`1130return;1131}11321133const oldestUnacknowledgedMsg = this._outgoingUnackMsg.peek()!;1134const timeSinceOldestUnacknowledgedMsg = Date.now() - oldestUnacknowledgedMsg.writtenTime;1135const timeSinceLastReceivedSomeData = Date.now() - this._socketReader.lastReadTime;1136const timeSinceLastTimeout = Date.now() - this._lastSocketTimeoutTime;11371138if (1139timeSinceOldestUnacknowledgedMsg >= ProtocolConstants.TimeoutTime1140&& timeSinceLastReceivedSomeData >= ProtocolConstants.TimeoutTime1141&& timeSinceLastTimeout >= ProtocolConstants.TimeoutTime1142) {1143// It's been a long time since our sent message was acknowledged1144// and a long time since we received some data11451146// But this might be caused by the event loop being busy and failing to read messages1147if (!this._loadEstimator.hasHighLoad()) {1148// Trash the socket1149this._lastSocketTimeoutTime = Date.now();1150this._onSocketTimeout.fire({1151unacknowledgedMsgCount: this._outgoingUnackMsg.length(),1152timeSinceOldestUnacknowledgedMsg,1153timeSinceLastReceivedSomeData1154});1155return;1156}1157}11581159const minimumTimeUntilTimeout = Math.max(1160ProtocolConstants.TimeoutTime - timeSinceOldestUnacknowledgedMsg,1161ProtocolConstants.TimeoutTime - timeSinceLastReceivedSomeData,1162ProtocolConstants.TimeoutTime - timeSinceLastTimeout,11635001164);11651166this._outgoingAckTimeout = setTimeout(() => {1167this._outgoingAckTimeout = null;1168this._recvAckCheck();1169}, minimumTimeUntilTimeout);1170}11711172private _sendAck(): void {1173if (this._incomingMsgId <= this._incomingAckId) {1174// nothink to acknowledge1175return;1176}11771178this._incomingAckId = this._incomingMsgId;1179const msg = new ProtocolMessage(ProtocolMessageType.Ack, 0, this._incomingAckId, getEmptyBuffer());1180this._socketWriter.write(msg);1181}11821183private _sendKeepAlive(): void {1184this._incomingAckId = this._incomingMsgId;1185const msg = new ProtocolMessage(ProtocolMessageType.KeepAlive, 0, this._incomingAckId, getEmptyBuffer());1186this._socketWriter.write(msg);1187}1188}118911901191