Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/src/vs/base/parts/ipc/node/ipc.net.ts
4780 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
import { createHash } from 'crypto';
7
import type * as http from 'http';
8
import { Server as NetServer, Socket, createConnection, createServer } from 'net';
9
import { tmpdir } from 'os';
10
import { DeflateRaw, InflateRaw, ZlibOptions, createDeflateRaw, createInflateRaw } from 'zlib';
11
import { VSBuffer } from '../../../common/buffer.js';
12
import { onUnexpectedError } from '../../../common/errors.js';
13
import { Emitter, Event } from '../../../common/event.js';
14
import { Disposable, IDisposable } from '../../../common/lifecycle.js';
15
import { join } from '../../../common/path.js';
16
import { Platform, platform } from '../../../common/platform.js';
17
import { generateUuid } from '../../../common/uuid.js';
18
import { ClientConnectionEvent, IPCServer } from '../common/ipc.js';
19
import { ChunkStream, Client, ISocket, Protocol, SocketCloseEvent, SocketCloseEventType, SocketDiagnostics, SocketDiagnosticsEventType } from '../common/ipc.net.js';
20
21
export function upgradeToISocket(req: http.IncomingMessage, socket: Socket, {
22
debugLabel,
23
skipWebSocketFrames = false,
24
disableWebSocketCompression = false,
25
enableMessageSplitting = true,
26
}: {
27
debugLabel: string;
28
skipWebSocketFrames?: boolean;
29
disableWebSocketCompression?: boolean;
30
enableMessageSplitting?: boolean;
31
}): NodeSocket | WebSocketNodeSocket | undefined {
32
if (req.headers.upgrade === undefined || req.headers.upgrade.toLowerCase() !== 'websocket') {
33
socket.end('HTTP/1.1 400 Bad Request');
34
return;
35
}
36
37
// https://tools.ietf.org/html/rfc6455#section-4
38
const requestNonce = req.headers['sec-websocket-key'];
39
const hash = createHash('sha1');// CodeQL [SM04514] SHA1 must be used here to respect the WebSocket protocol specification
40
hash.update(requestNonce + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11');
41
const responseNonce = hash.digest('base64');
42
43
const responseHeaders = [
44
`HTTP/1.1 101 Switching Protocols`,
45
`Upgrade: websocket`,
46
`Connection: Upgrade`,
47
`Sec-WebSocket-Accept: ${responseNonce}`
48
];
49
50
// See https://tools.ietf.org/html/rfc7692#page-12
51
let permessageDeflate = false;
52
if (!skipWebSocketFrames && !disableWebSocketCompression && req.headers['sec-websocket-extensions']) {
53
const websocketExtensionOptions = Array.isArray(req.headers['sec-websocket-extensions']) ? req.headers['sec-websocket-extensions'] : [req.headers['sec-websocket-extensions']];
54
for (const websocketExtensionOption of websocketExtensionOptions) {
55
if (/\b((server_max_window_bits)|(server_no_context_takeover)|(client_no_context_takeover))\b/.test(websocketExtensionOption)) {
56
// sorry, the server does not support zlib parameter tweaks
57
continue;
58
}
59
if (/\b(permessage-deflate)\b/.test(websocketExtensionOption)) {
60
permessageDeflate = true;
61
responseHeaders.push(`Sec-WebSocket-Extensions: permessage-deflate`);
62
break;
63
}
64
if (/\b(x-webkit-deflate-frame)\b/.test(websocketExtensionOption)) {
65
permessageDeflate = true;
66
responseHeaders.push(`Sec-WebSocket-Extensions: x-webkit-deflate-frame`);
67
break;
68
}
69
}
70
}
71
72
socket.write(responseHeaders.join('\r\n') + '\r\n\r\n');
73
74
// Never timeout this socket due to inactivity!
75
socket.setTimeout(0);
76
// Disable Nagle's algorithm
77
socket.setNoDelay(true);
78
// Finally!
79
80
if (skipWebSocketFrames) {
81
return new NodeSocket(socket, debugLabel);
82
} else {
83
return new WebSocketNodeSocket(new NodeSocket(socket, debugLabel), permessageDeflate, null, true, enableMessageSplitting);
84
}
85
}
86
87
/**
88
* Maximum time to wait for a 'close' event to fire after the socket stream
89
* ends. For unix domain sockets, the close event may not fire consistently
90
* due to what appears to be a Node.js bug.
91
*
92
* @see https://github.com/microsoft/vscode/issues/211462#issuecomment-2155471996
93
*/
94
const socketEndTimeoutMs = 30_000;
95
96
export class NodeSocket implements ISocket {
97
98
public readonly debugLabel: string;
99
public readonly socket: Socket;
100
private readonly _errorListener: (err: NodeJS.ErrnoException) => void;
101
private readonly _closeListener: (hadError: boolean) => void;
102
private readonly _endListener: () => void;
103
private _canWrite = true;
104
105
public traceSocketEvent(type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | unknown): void {
106
SocketDiagnostics.traceSocketEvent(this.socket, this.debugLabel, type, data);
107
}
108
109
constructor(socket: Socket, debugLabel = '') {
110
this.debugLabel = debugLabel;
111
this.socket = socket;
112
this.traceSocketEvent(SocketDiagnosticsEventType.Created, { type: 'NodeSocket' });
113
this._errorListener = (err: NodeJS.ErrnoException) => {
114
this.traceSocketEvent(SocketDiagnosticsEventType.Error, { code: err?.code, message: err?.message });
115
if (err) {
116
if (err.code === 'EPIPE') {
117
// An EPIPE exception at the wrong time can lead to a renderer process crash
118
// so ignore the error since the socket will fire the close event soon anyways:
119
// > https://nodejs.org/api/errors.html#errors_common_system_errors
120
// > EPIPE (Broken pipe): A write on a pipe, socket, or FIFO for which there is no
121
// > process to read the data. Commonly encountered at the net and http layers,
122
// > indicative that the remote side of the stream being written to has been closed.
123
return;
124
}
125
onUnexpectedError(err);
126
}
127
};
128
this.socket.on('error', this._errorListener);
129
130
let endTimeoutHandle: Timeout | undefined;
131
this._closeListener = (hadError: boolean) => {
132
this.traceSocketEvent(SocketDiagnosticsEventType.Close, { hadError });
133
this._canWrite = false;
134
if (endTimeoutHandle) {
135
clearTimeout(endTimeoutHandle);
136
}
137
};
138
this.socket.on('close', this._closeListener);
139
140
this._endListener = () => {
141
this.traceSocketEvent(SocketDiagnosticsEventType.NodeEndReceived);
142
this._canWrite = false;
143
endTimeoutHandle = setTimeout(() => socket.destroy(), socketEndTimeoutMs);
144
};
145
this.socket.on('end', this._endListener);
146
}
147
148
public dispose(): void {
149
this.socket.off('error', this._errorListener);
150
this.socket.off('close', this._closeListener);
151
this.socket.off('end', this._endListener);
152
this.socket.destroy();
153
}
154
155
public onData(_listener: (e: VSBuffer) => void): IDisposable {
156
const listener = (buff: Buffer) => {
157
this.traceSocketEvent(SocketDiagnosticsEventType.Read, buff);
158
_listener(VSBuffer.wrap(buff));
159
};
160
this.socket.on('data', listener);
161
return {
162
dispose: () => this.socket.off('data', listener)
163
};
164
}
165
166
public onClose(listener: (e: SocketCloseEvent) => void): IDisposable {
167
const adapter = (hadError: boolean) => {
168
listener({
169
type: SocketCloseEventType.NodeSocketCloseEvent,
170
hadError: hadError,
171
error: undefined
172
});
173
};
174
this.socket.on('close', adapter);
175
return {
176
dispose: () => this.socket.off('close', adapter)
177
};
178
}
179
180
public onEnd(listener: () => void): IDisposable {
181
const adapter = () => {
182
listener();
183
};
184
this.socket.on('end', adapter);
185
return {
186
dispose: () => this.socket.off('end', adapter)
187
};
188
}
189
190
public write(buffer: VSBuffer): void {
191
// return early if socket has been destroyed in the meantime
192
if (this.socket.destroyed || !this._canWrite) {
193
return;
194
}
195
196
// we ignore the returned value from `write` because we would have to cached the data
197
// anyways and nodejs is already doing that for us:
198
// > https://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback
199
// > However, the false return value is only advisory and the writable stream will unconditionally
200
// > accept and buffer chunk even if it has not been allowed to drain.
201
try {
202
this.traceSocketEvent(SocketDiagnosticsEventType.Write, buffer);
203
this.socket.write(buffer.buffer, (err: NodeJS.ErrnoException | null | undefined) => {
204
if (err) {
205
if (err.code === 'EPIPE') {
206
// An EPIPE exception at the wrong time can lead to a renderer process crash
207
// so ignore the error since the socket will fire the close event soon anyways:
208
// > https://nodejs.org/api/errors.html#errors_common_system_errors
209
// > EPIPE (Broken pipe): A write on a pipe, socket, or FIFO for which there is no
210
// > process to read the data. Commonly encountered at the net and http layers,
211
// > indicative that the remote side of the stream being written to has been closed.
212
return;
213
}
214
onUnexpectedError(err);
215
}
216
});
217
} catch (err) {
218
if (err.code === 'EPIPE') {
219
// An EPIPE exception at the wrong time can lead to a renderer process crash
220
// so ignore the error since the socket will fire the close event soon anyways:
221
// > https://nodejs.org/api/errors.html#errors_common_system_errors
222
// > EPIPE (Broken pipe): A write on a pipe, socket, or FIFO for which there is no
223
// > process to read the data. Commonly encountered at the net and http layers,
224
// > indicative that the remote side of the stream being written to has been closed.
225
return;
226
}
227
onUnexpectedError(err);
228
}
229
}
230
231
public end(): void {
232
this.traceSocketEvent(SocketDiagnosticsEventType.NodeEndSent);
233
this.socket.end();
234
}
235
236
public drain(): Promise<void> {
237
this.traceSocketEvent(SocketDiagnosticsEventType.NodeDrainBegin);
238
return new Promise<void>((resolve, reject) => {
239
if (this.socket.bufferSize === 0) {
240
this.traceSocketEvent(SocketDiagnosticsEventType.NodeDrainEnd);
241
resolve();
242
return;
243
}
244
const finished = () => {
245
this.socket.off('close', finished);
246
this.socket.off('end', finished);
247
this.socket.off('error', finished);
248
this.socket.off('timeout', finished);
249
this.socket.off('drain', finished);
250
this.traceSocketEvent(SocketDiagnosticsEventType.NodeDrainEnd);
251
resolve();
252
};
253
this.socket.on('close', finished);
254
this.socket.on('end', finished);
255
this.socket.on('error', finished);
256
this.socket.on('timeout', finished);
257
this.socket.on('drain', finished);
258
});
259
}
260
}
261
262
const enum Constants {
263
MinHeaderByteSize = 2,
264
/**
265
* If we need to write a large buffer, we will split it into 256KB chunks and
266
* send each chunk as a websocket message. This is to prevent that the sending
267
* side is stuck waiting for the entire buffer to be compressed before writing
268
* to the underlying socket or that the receiving side is stuck waiting for the
269
* entire message to be received before processing the bytes.
270
*/
271
MaxWebSocketMessageLength = 256 * 1024 // 256 KB
272
}
273
274
const enum ReadState {
275
PeekHeader = 1,
276
ReadHeader = 2,
277
ReadBody = 3,
278
Fin = 4
279
}
280
281
interface ISocketTracer {
282
traceSocketEvent(type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | unknown): void;
283
}
284
285
interface FrameOptions {
286
compressed: boolean;
287
opcode: number;
288
}
289
290
/**
291
* See https://tools.ietf.org/html/rfc6455#section-5.2
292
*/
293
export class WebSocketNodeSocket extends Disposable implements ISocket, ISocketTracer {
294
295
public readonly socket: NodeSocket;
296
private readonly _flowManager: WebSocketFlowManager;
297
private readonly _incomingData: ChunkStream;
298
private readonly _onData = this._register(new Emitter<VSBuffer>());
299
private readonly _onClose = this._register(new Emitter<SocketCloseEvent>());
300
private readonly _maxSocketMessageLength: number;
301
private _isEnded = false;
302
303
private readonly _state = {
304
state: ReadState.PeekHeader,
305
readLen: Constants.MinHeaderByteSize,
306
fin: 0,
307
compressed: false,
308
firstFrameOfMessage: true,
309
mask: 0,
310
opcode: 0
311
};
312
313
public get permessageDeflate(): boolean {
314
return this._flowManager.permessageDeflate;
315
}
316
317
public get recordedInflateBytes(): VSBuffer {
318
return this._flowManager.recordedInflateBytes;
319
}
320
321
public traceSocketEvent(type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | unknown): void {
322
this.socket.traceSocketEvent(type, data);
323
}
324
325
/**
326
* Create a socket which can communicate using WebSocket frames.
327
*
328
* **NOTE**: When using the permessage-deflate WebSocket extension, if parts of inflating was done
329
* in a different zlib instance, we need to pass all those bytes into zlib, otherwise the inflate
330
* might hit an inflated portion referencing a distance too far back.
331
*
332
* @param socket The underlying socket
333
* @param permessageDeflate Use the permessage-deflate WebSocket extension
334
* @param inflateBytes "Seed" zlib inflate with these bytes.
335
* @param recordInflateBytes Record all bytes sent to inflate
336
*/
337
constructor(socket: NodeSocket, permessageDeflate: boolean, inflateBytes: VSBuffer | null, recordInflateBytes: boolean, enableMessageSplitting = true) {
338
super();
339
this.socket = socket;
340
this._maxSocketMessageLength = enableMessageSplitting ? Constants.MaxWebSocketMessageLength : Infinity;
341
this.traceSocketEvent(SocketDiagnosticsEventType.Created, { type: 'WebSocketNodeSocket', permessageDeflate, inflateBytesLength: inflateBytes?.byteLength || 0, recordInflateBytes });
342
this._flowManager = this._register(new WebSocketFlowManager(
343
this,
344
permessageDeflate,
345
inflateBytes,
346
recordInflateBytes,
347
this._onData,
348
(data, options) => this._write(data, options)
349
));
350
this._register(this._flowManager.onError((err) => {
351
// zlib errors are fatal, since we have no idea how to recover
352
console.error(err);
353
onUnexpectedError(err);
354
this._onClose.fire({
355
type: SocketCloseEventType.NodeSocketCloseEvent,
356
hadError: true,
357
error: err
358
});
359
}));
360
this._incomingData = new ChunkStream();
361
this._register(this.socket.onData(data => this._acceptChunk(data)));
362
this._register(this.socket.onClose(async (e) => {
363
// Delay surfacing the close event until the async inflating is done
364
// and all data has been emitted
365
if (this._flowManager.isProcessingReadQueue()) {
366
await Event.toPromise(this._flowManager.onDidFinishProcessingReadQueue);
367
}
368
this._onClose.fire(e);
369
}));
370
}
371
372
public override dispose(): void {
373
if (this._flowManager.isProcessingWriteQueue()) {
374
// Wait for any outstanding writes to finish before disposing
375
this._register(this._flowManager.onDidFinishProcessingWriteQueue(() => {
376
this.dispose();
377
}));
378
} else {
379
this.socket.dispose();
380
super.dispose();
381
}
382
}
383
384
public onData(listener: (e: VSBuffer) => void): IDisposable {
385
return this._onData.event(listener);
386
}
387
388
public onClose(listener: (e: SocketCloseEvent) => void): IDisposable {
389
return this._onClose.event(listener);
390
}
391
392
public onEnd(listener: () => void): IDisposable {
393
return this.socket.onEnd(listener);
394
}
395
396
public write(buffer: VSBuffer): void {
397
// If we write many logical messages (let's say 1000 messages of 100KB) during a single process tick, we do
398
// this thing where we install a process.nextTick timer and group all of them together and we then issue a
399
// single WebSocketNodeSocket.write with a 100MB buffer.
400
//
401
// The first problem is that the actual writing to the underlying node socket will only happen after all of
402
// the 100MB have been deflated (due to waiting on zlib flush). The second problem is on the reading side,
403
// where we will get a single WebSocketNodeSocket.onData event fired when all the 100MB have arrived,
404
// delaying processing the 1000 received messages until all have arrived, instead of processing them as each
405
// one arrives.
406
//
407
// We therefore split the buffer into chunks, and issue a write for each chunk.
408
409
let start = 0;
410
while (start < buffer.byteLength) {
411
this._flowManager.writeMessage(buffer.slice(start, Math.min(start + this._maxSocketMessageLength, buffer.byteLength)), { compressed: true, opcode: 0x02 /* Binary frame */ });
412
start += this._maxSocketMessageLength;
413
}
414
}
415
416
private _write(buffer: VSBuffer, { compressed, opcode }: FrameOptions): void {
417
if (this._isEnded) {
418
// Avoid ERR_STREAM_WRITE_AFTER_END
419
return;
420
}
421
422
this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketWrite, buffer);
423
let headerLen = Constants.MinHeaderByteSize;
424
if (buffer.byteLength < 126) {
425
headerLen += 0;
426
} else if (buffer.byteLength < 2 ** 16) {
427
headerLen += 2;
428
} else {
429
headerLen += 8;
430
}
431
const header = VSBuffer.alloc(headerLen);
432
433
// The RSV1 bit indicates a compressed frame
434
const compressedFlag = compressed ? 0b01000000 : 0;
435
const opcodeFlag = opcode & 0b00001111;
436
header.writeUInt8(0b10000000 | compressedFlag | opcodeFlag, 0);
437
if (buffer.byteLength < 126) {
438
header.writeUInt8(buffer.byteLength, 1);
439
} else if (buffer.byteLength < 2 ** 16) {
440
header.writeUInt8(126, 1);
441
let offset = 1;
442
header.writeUInt8((buffer.byteLength >>> 8) & 0b11111111, ++offset);
443
header.writeUInt8((buffer.byteLength >>> 0) & 0b11111111, ++offset);
444
} else {
445
header.writeUInt8(127, 1);
446
let offset = 1;
447
header.writeUInt8(0, ++offset);
448
header.writeUInt8(0, ++offset);
449
header.writeUInt8(0, ++offset);
450
header.writeUInt8(0, ++offset);
451
header.writeUInt8((buffer.byteLength >>> 24) & 0b11111111, ++offset);
452
header.writeUInt8((buffer.byteLength >>> 16) & 0b11111111, ++offset);
453
header.writeUInt8((buffer.byteLength >>> 8) & 0b11111111, ++offset);
454
header.writeUInt8((buffer.byteLength >>> 0) & 0b11111111, ++offset);
455
}
456
457
this.socket.write(VSBuffer.concat([header, buffer]));
458
}
459
460
public end(): void {
461
this._isEnded = true;
462
this.socket.end();
463
}
464
465
private _acceptChunk(data: VSBuffer): void {
466
if (data.byteLength === 0) {
467
return;
468
}
469
470
this._incomingData.acceptChunk(data);
471
472
while (this._incomingData.byteLength >= this._state.readLen) {
473
474
if (this._state.state === ReadState.PeekHeader) {
475
// peek to see if we can read the entire header
476
const peekHeader = this._incomingData.peek(this._state.readLen);
477
const firstByte = peekHeader.readUInt8(0);
478
const finBit = (firstByte & 0b10000000) >>> 7;
479
const rsv1Bit = (firstByte & 0b01000000) >>> 6;
480
const opcode = (firstByte & 0b00001111);
481
482
const secondByte = peekHeader.readUInt8(1);
483
const hasMask = (secondByte & 0b10000000) >>> 7;
484
const len = (secondByte & 0b01111111);
485
486
this._state.state = ReadState.ReadHeader;
487
this._state.readLen = Constants.MinHeaderByteSize + (hasMask ? 4 : 0) + (len === 126 ? 2 : 0) + (len === 127 ? 8 : 0);
488
this._state.fin = finBit;
489
if (this._state.firstFrameOfMessage) {
490
// if the frame is compressed, the RSV1 bit is set only for the first frame of the message
491
this._state.compressed = Boolean(rsv1Bit);
492
}
493
this._state.firstFrameOfMessage = Boolean(finBit);
494
this._state.mask = 0;
495
this._state.opcode = opcode;
496
497
this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketPeekedHeader, { headerSize: this._state.readLen, compressed: this._state.compressed, fin: this._state.fin, opcode: this._state.opcode });
498
499
} else if (this._state.state === ReadState.ReadHeader) {
500
// read entire header
501
const header = this._incomingData.read(this._state.readLen);
502
const secondByte = header.readUInt8(1);
503
const hasMask = (secondByte & 0b10000000) >>> 7;
504
let len = (secondByte & 0b01111111);
505
506
let offset = 1;
507
if (len === 126) {
508
len = (
509
header.readUInt8(++offset) * 2 ** 8
510
+ header.readUInt8(++offset)
511
);
512
} else if (len === 127) {
513
len = (
514
header.readUInt8(++offset) * 0
515
+ header.readUInt8(++offset) * 0
516
+ header.readUInt8(++offset) * 0
517
+ header.readUInt8(++offset) * 0
518
+ header.readUInt8(++offset) * 2 ** 24
519
+ header.readUInt8(++offset) * 2 ** 16
520
+ header.readUInt8(++offset) * 2 ** 8
521
+ header.readUInt8(++offset)
522
);
523
}
524
525
let mask = 0;
526
if (hasMask) {
527
mask = (
528
header.readUInt8(++offset) * 2 ** 24
529
+ header.readUInt8(++offset) * 2 ** 16
530
+ header.readUInt8(++offset) * 2 ** 8
531
+ header.readUInt8(++offset)
532
);
533
}
534
535
this._state.state = ReadState.ReadBody;
536
this._state.readLen = len;
537
this._state.mask = mask;
538
539
this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketPeekedHeader, { bodySize: this._state.readLen, compressed: this._state.compressed, fin: this._state.fin, mask: this._state.mask, opcode: this._state.opcode });
540
541
} else if (this._state.state === ReadState.ReadBody) {
542
// read body
543
544
const body = this._incomingData.read(this._state.readLen);
545
this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketReadData, body);
546
547
unmask(body, this._state.mask);
548
this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketUnmaskedData, body);
549
550
this._state.state = ReadState.PeekHeader;
551
this._state.readLen = Constants.MinHeaderByteSize;
552
this._state.mask = 0;
553
554
if (this._state.opcode <= 0x02 /* Continuation frame or Text frame or binary frame */) {
555
this._flowManager.acceptFrame(body, this._state.compressed, !!this._state.fin);
556
} else if (this._state.opcode === 0x09 /* Ping frame */) {
557
// Ping frames could be send by some browsers e.g. Firefox
558
this._flowManager.writeMessage(body, { compressed: false, opcode: 0x0A /* Pong frame */ });
559
}
560
}
561
}
562
}
563
564
public async drain(): Promise<void> {
565
this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketDrainBegin);
566
if (this._flowManager.isProcessingWriteQueue()) {
567
await Event.toPromise(this._flowManager.onDidFinishProcessingWriteQueue);
568
}
569
await this.socket.drain();
570
this.traceSocketEvent(SocketDiagnosticsEventType.WebSocketNodeSocketDrainEnd);
571
}
572
}
573
574
class WebSocketFlowManager extends Disposable {
575
576
private readonly _onError = this._register(new Emitter<Error>());
577
public readonly onError = this._onError.event;
578
579
private readonly _zlibInflateStream: ZlibInflateStream | null;
580
private readonly _zlibDeflateStream: ZlibDeflateStream | null;
581
private readonly _writeQueue: { data: VSBuffer; options: FrameOptions }[] = [];
582
private readonly _readQueue: { data: VSBuffer; isCompressed: boolean; isLastFrameOfMessage: boolean }[] = [];
583
584
private readonly _onDidFinishProcessingReadQueue = this._register(new Emitter<void>());
585
public readonly onDidFinishProcessingReadQueue = this._onDidFinishProcessingReadQueue.event;
586
587
private readonly _onDidFinishProcessingWriteQueue = this._register(new Emitter<void>());
588
public readonly onDidFinishProcessingWriteQueue = this._onDidFinishProcessingWriteQueue.event;
589
590
public get permessageDeflate(): boolean {
591
return Boolean(this._zlibInflateStream && this._zlibDeflateStream);
592
}
593
594
public get recordedInflateBytes(): VSBuffer {
595
if (this._zlibInflateStream) {
596
return this._zlibInflateStream.recordedInflateBytes;
597
}
598
return VSBuffer.alloc(0);
599
}
600
601
constructor(
602
private readonly _tracer: ISocketTracer,
603
permessageDeflate: boolean,
604
inflateBytes: VSBuffer | null,
605
recordInflateBytes: boolean,
606
private readonly _onData: Emitter<VSBuffer>,
607
private readonly _writeFn: (data: VSBuffer, options: FrameOptions) => void
608
) {
609
super();
610
if (permessageDeflate) {
611
// See https://tools.ietf.org/html/rfc7692#page-16
612
// To simplify our logic, we don't negotiate the window size
613
// and simply dedicate (2^15) / 32kb per web socket
614
this._zlibInflateStream = this._register(new ZlibInflateStream(this._tracer, recordInflateBytes, inflateBytes, { windowBits: 15 }));
615
this._zlibDeflateStream = this._register(new ZlibDeflateStream(this._tracer, { windowBits: 15 }));
616
this._register(this._zlibInflateStream.onError((err) => this._onError.fire(err)));
617
this._register(this._zlibDeflateStream.onError((err) => this._onError.fire(err)));
618
} else {
619
this._zlibInflateStream = null;
620
this._zlibDeflateStream = null;
621
}
622
}
623
624
public writeMessage(data: VSBuffer, options: FrameOptions): void {
625
this._writeQueue.push({ data, options });
626
this._processWriteQueue();
627
}
628
629
private _isProcessingWriteQueue = false;
630
private async _processWriteQueue(): Promise<void> {
631
if (this._isProcessingWriteQueue) {
632
return;
633
}
634
this._isProcessingWriteQueue = true;
635
while (this._writeQueue.length > 0) {
636
const { data, options } = this._writeQueue.shift()!;
637
if (this._zlibDeflateStream && options.compressed) {
638
const compressedData = await this._deflateMessage(this._zlibDeflateStream, data);
639
this._writeFn(compressedData, options);
640
} else {
641
this._writeFn(data, { ...options, compressed: false });
642
}
643
}
644
this._isProcessingWriteQueue = false;
645
this._onDidFinishProcessingWriteQueue.fire();
646
}
647
648
public isProcessingWriteQueue(): boolean {
649
return (this._isProcessingWriteQueue);
650
}
651
652
/**
653
* Subsequent calls should wait for the previous `_deflateBuffer` call to complete.
654
*/
655
private _deflateMessage(zlibDeflateStream: ZlibDeflateStream, buffer: VSBuffer): Promise<VSBuffer> {
656
return new Promise<VSBuffer>((resolve, reject) => {
657
zlibDeflateStream.write(buffer);
658
zlibDeflateStream.flush(data => resolve(data));
659
});
660
}
661
662
public acceptFrame(data: VSBuffer, isCompressed: boolean, isLastFrameOfMessage: boolean): void {
663
this._readQueue.push({ data, isCompressed, isLastFrameOfMessage });
664
this._processReadQueue();
665
}
666
667
private _isProcessingReadQueue = false;
668
private async _processReadQueue(): Promise<void> {
669
if (this._isProcessingReadQueue) {
670
return;
671
}
672
this._isProcessingReadQueue = true;
673
while (this._readQueue.length > 0) {
674
const frameInfo = this._readQueue.shift()!;
675
if (this._zlibInflateStream && frameInfo.isCompressed) {
676
// See https://datatracker.ietf.org/doc/html/rfc7692#section-9.2
677
// Even if permessageDeflate is negotiated, it is possible
678
// that the other side might decide to send uncompressed messages
679
// So only decompress messages that have the RSV 1 bit set
680
const data = await this._inflateFrame(this._zlibInflateStream, frameInfo.data, frameInfo.isLastFrameOfMessage);
681
this._onData.fire(data);
682
} else {
683
this._onData.fire(frameInfo.data);
684
}
685
}
686
this._isProcessingReadQueue = false;
687
this._onDidFinishProcessingReadQueue.fire();
688
}
689
690
public isProcessingReadQueue(): boolean {
691
return (this._isProcessingReadQueue);
692
}
693
694
/**
695
* Subsequent calls should wait for the previous `transformRead` call to complete.
696
*/
697
private _inflateFrame(zlibInflateStream: ZlibInflateStream, buffer: VSBuffer, isLastFrameOfMessage: boolean): Promise<VSBuffer> {
698
return new Promise<VSBuffer>((resolve, reject) => {
699
// See https://tools.ietf.org/html/rfc7692#section-7.2.2
700
zlibInflateStream.write(buffer);
701
if (isLastFrameOfMessage) {
702
zlibInflateStream.write(VSBuffer.fromByteArray([0x00, 0x00, 0xff, 0xff]));
703
}
704
zlibInflateStream.flush(data => resolve(data));
705
});
706
}
707
}
708
709
class ZlibInflateStream extends Disposable {
710
711
private readonly _onError = this._register(new Emitter<Error>());
712
public readonly onError = this._onError.event;
713
714
private readonly _zlibInflate: InflateRaw;
715
private readonly _recordedInflateBytes: VSBuffer[] = [];
716
private readonly _pendingInflateData: VSBuffer[] = [];
717
718
public get recordedInflateBytes(): VSBuffer {
719
if (this._recordInflateBytes) {
720
return VSBuffer.concat(this._recordedInflateBytes);
721
}
722
return VSBuffer.alloc(0);
723
}
724
725
constructor(
726
private readonly _tracer: ISocketTracer,
727
private readonly _recordInflateBytes: boolean,
728
inflateBytes: VSBuffer | null,
729
options: ZlibOptions
730
) {
731
super();
732
this._zlibInflate = createInflateRaw(options);
733
this._zlibInflate.on('error', (err: Error) => {
734
this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateError, { message: err?.message, code: (err as NodeJS.ErrnoException)?.code });
735
this._onError.fire(err);
736
});
737
this._zlibInflate.on('data', (data: Buffer) => {
738
this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateData, data);
739
this._pendingInflateData.push(VSBuffer.wrap(data));
740
});
741
if (inflateBytes) {
742
this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateInitialWrite, inflateBytes.buffer);
743
this._zlibInflate.write(inflateBytes.buffer);
744
this._zlibInflate.flush(() => {
745
this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateInitialFlushFired);
746
this._pendingInflateData.length = 0;
747
});
748
}
749
}
750
751
public write(buffer: VSBuffer): void {
752
if (this._recordInflateBytes) {
753
this._recordedInflateBytes.push(buffer.clone());
754
}
755
this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateWrite, buffer);
756
this._zlibInflate.write(buffer.buffer);
757
}
758
759
public flush(callback: (data: VSBuffer) => void): void {
760
this._zlibInflate.flush(() => {
761
this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibInflateFlushFired);
762
const data = VSBuffer.concat(this._pendingInflateData);
763
this._pendingInflateData.length = 0;
764
callback(data);
765
});
766
}
767
}
768
769
class ZlibDeflateStream extends Disposable {
770
771
private readonly _onError = this._register(new Emitter<Error>());
772
public readonly onError = this._onError.event;
773
774
private readonly _zlibDeflate: DeflateRaw;
775
private readonly _pendingDeflateData: VSBuffer[] = [];
776
777
constructor(
778
private readonly _tracer: ISocketTracer,
779
options: ZlibOptions
780
) {
781
super();
782
783
this._zlibDeflate = createDeflateRaw({
784
windowBits: 15
785
});
786
this._zlibDeflate.on('error', (err: Error) => {
787
this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibDeflateError, { message: err?.message, code: (err as NodeJS.ErrnoException)?.code });
788
this._onError.fire(err);
789
});
790
this._zlibDeflate.on('data', (data: Buffer) => {
791
this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibDeflateData, data);
792
this._pendingDeflateData.push(VSBuffer.wrap(data));
793
});
794
}
795
796
public write(buffer: VSBuffer): void {
797
this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibDeflateWrite, buffer.buffer);
798
this._zlibDeflate.write(<Buffer>buffer.buffer);
799
}
800
801
public flush(callback: (data: VSBuffer) => void): void {
802
// See https://zlib.net/manual.html#Constants
803
this._zlibDeflate.flush(/*Z_SYNC_FLUSH*/2, () => {
804
this._tracer.traceSocketEvent(SocketDiagnosticsEventType.zlibDeflateFlushFired);
805
806
let data = VSBuffer.concat(this._pendingDeflateData);
807
this._pendingDeflateData.length = 0;
808
809
// See https://tools.ietf.org/html/rfc7692#section-7.2.1
810
data = data.slice(0, data.byteLength - 4);
811
812
callback(data);
813
});
814
}
815
}
816
817
function unmask(buffer: VSBuffer, mask: number): void {
818
if (mask === 0) {
819
return;
820
}
821
const cnt = buffer.byteLength >>> 2;
822
for (let i = 0; i < cnt; i++) {
823
const v = buffer.readUInt32BE(i * 4);
824
buffer.writeUInt32BE(v ^ mask, i * 4);
825
}
826
const offset = cnt * 4;
827
const bytesLeft = buffer.byteLength - offset;
828
const m3 = (mask >>> 24) & 0b11111111;
829
const m2 = (mask >>> 16) & 0b11111111;
830
const m1 = (mask >>> 8) & 0b11111111;
831
if (bytesLeft >= 1) {
832
buffer.writeUInt8(buffer.readUInt8(offset) ^ m3, offset);
833
}
834
if (bytesLeft >= 2) {
835
buffer.writeUInt8(buffer.readUInt8(offset + 1) ^ m2, offset + 1);
836
}
837
if (bytesLeft >= 3) {
838
buffer.writeUInt8(buffer.readUInt8(offset + 2) ^ m1, offset + 2);
839
}
840
}
841
842
// Read this before there's any chance it is overwritten
843
// Related to https://github.com/microsoft/vscode/issues/30624
844
export const XDG_RUNTIME_DIR = process.env['XDG_RUNTIME_DIR'];
845
846
const safeIpcPathLengths: { [platform: number]: number } = {
847
[Platform.Linux]: 107,
848
[Platform.Mac]: 103
849
};
850
851
export function createRandomIPCHandle(): string {
852
const randomSuffix = generateUuid();
853
854
// Windows: use named pipe
855
if (process.platform === 'win32') {
856
return `\\\\.\\pipe\\vscode-ipc-${randomSuffix}-sock`;
857
}
858
859
// Mac & Unix: Use socket file
860
// Unix: Prefer XDG_RUNTIME_DIR over user data path
861
const basePath = process.platform !== 'darwin' && XDG_RUNTIME_DIR ? XDG_RUNTIME_DIR : tmpdir();
862
const result = join(basePath, `vscode-ipc-${randomSuffix}.sock`);
863
864
// Validate length
865
validateIPCHandleLength(result);
866
867
return result;
868
}
869
870
export function createStaticIPCHandle(directoryPath: string, type: string, version: string): string {
871
const scope = createHash('sha256').update(directoryPath).digest('hex');
872
const scopeForSocket = scope.substr(0, 8);
873
874
// Windows: use named pipe
875
if (process.platform === 'win32') {
876
return `\\\\.\\pipe\\${scopeForSocket}-${version}-${type}-sock`;
877
}
878
879
// Mac & Unix: Use socket file
880
// Unix: Prefer XDG_RUNTIME_DIR over user data path, unless portable
881
// Trim the version and type values for the socket to prevent too large
882
// file names causing issues: https://unix.stackexchange.com/q/367008
883
884
const versionForSocket = version.substr(0, 4);
885
const typeForSocket = type.substr(0, 6);
886
887
let result: string;
888
if (process.platform !== 'darwin' && XDG_RUNTIME_DIR && !process.env['VSCODE_PORTABLE']) {
889
result = join(XDG_RUNTIME_DIR, `vscode-${scopeForSocket}-${versionForSocket}-${typeForSocket}.sock`);
890
} else {
891
result = join(directoryPath, `${versionForSocket}-${typeForSocket}.sock`);
892
}
893
894
// Validate length
895
validateIPCHandleLength(result);
896
897
return result;
898
}
899
900
function validateIPCHandleLength(handle: string): void {
901
const limit = safeIpcPathLengths[platform];
902
if (typeof limit === 'number' && handle.length >= limit) {
903
// https://nodejs.org/api/net.html#net_identifying_paths_for_ipc_connections
904
console.warn(`WARNING: IPC handle "${handle}" is longer than ${limit} chars, try a shorter --user-data-dir`);
905
}
906
}
907
908
export class Server extends IPCServer {
909
910
private static toClientConnectionEvent(server: NetServer): Event<ClientConnectionEvent> {
911
const onConnection = Event.fromNodeEventEmitter<Socket>(server, 'connection');
912
913
return Event.map(onConnection, socket => ({
914
protocol: new Protocol(new NodeSocket(socket, 'ipc-server-connection')),
915
onDidClientDisconnect: Event.once(Event.fromNodeEventEmitter<void>(socket, 'close'))
916
}));
917
}
918
919
private server: NetServer | null;
920
921
constructor(server: NetServer) {
922
super(Server.toClientConnectionEvent(server));
923
this.server = server;
924
}
925
926
override dispose(): void {
927
super.dispose();
928
if (this.server) {
929
this.server.close();
930
this.server = null;
931
}
932
}
933
}
934
935
export function serve(port: number): Promise<Server>;
936
export function serve(namedPipe: string): Promise<Server>;
937
export function serve(hook: number | string): Promise<Server> {
938
return new Promise<Server>((resolve, reject) => {
939
const server = createServer();
940
941
server.on('error', reject);
942
server.listen(hook, () => {
943
server.removeListener('error', reject);
944
resolve(new Server(server));
945
});
946
});
947
}
948
949
export function connect(options: { host: string; port: number }, clientId: string): Promise<Client>;
950
export function connect(namedPipe: string, clientId: string): Promise<Client>;
951
export function connect(hook: { host: string; port: number } | string, clientId: string): Promise<Client> {
952
return new Promise<Client>((resolve, reject) => {
953
let socket: Socket;
954
955
const callbackHandler = () => {
956
socket.removeListener('error', reject);
957
resolve(Client.fromSocket(new NodeSocket(socket, `ipc-client${clientId}`), clientId));
958
};
959
960
if (typeof hook === 'string') {
961
socket = createConnection(hook, callbackHandler);
962
} else {
963
socket = createConnection(hook, callbackHandler);
964
}
965
966
socket.once('error', reject);
967
});
968
}
969
970