Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/src/vs/base/parts/ipc/test/node/ipc.net.test.ts
4780 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
import assert from 'assert';
7
import sinon from 'sinon';
8
import { EventEmitter } from 'events';
9
import { AddressInfo, connect, createServer, Server, Socket } from 'net';
10
import { tmpdir } from 'os';
11
import { Barrier, timeout } from '../../../../common/async.js';
12
import { VSBuffer } from '../../../../common/buffer.js';
13
import { Emitter, Event } from '../../../../common/event.js';
14
import { Disposable, DisposableStore, toDisposable } from '../../../../common/lifecycle.js';
15
import { ILoadEstimator, PersistentProtocol, Protocol, ProtocolConstants, SocketCloseEvent, SocketDiagnosticsEventType } from '../../common/ipc.net.js';
16
import { createRandomIPCHandle, createStaticIPCHandle, NodeSocket, WebSocketNodeSocket } from '../../node/ipc.net.js';
17
import { flakySuite } from '../../../../test/common/testUtils.js';
18
import { runWithFakedTimers } from '../../../../test/common/timeTravelScheduler.js';
19
import { ensureNoDisposablesAreLeakedInTestSuite } from '../../../../test/common/utils.js';
20
21
class MessageStream extends Disposable {
22
23
private _currentComplete: ((data: VSBuffer) => void) | null;
24
private _messages: VSBuffer[];
25
26
constructor(x: Protocol | PersistentProtocol) {
27
super();
28
this._currentComplete = null;
29
this._messages = [];
30
this._register(x.onMessage(data => {
31
this._messages.push(data);
32
this._trigger();
33
}));
34
}
35
36
private _trigger(): void {
37
if (!this._currentComplete) {
38
return;
39
}
40
if (this._messages.length === 0) {
41
return;
42
}
43
const complete = this._currentComplete;
44
const msg = this._messages.shift()!;
45
46
this._currentComplete = null;
47
complete(msg);
48
}
49
50
public waitForOne(): Promise<VSBuffer> {
51
return new Promise<VSBuffer>((complete) => {
52
this._currentComplete = complete;
53
this._trigger();
54
});
55
}
56
}
57
58
class EtherStream extends EventEmitter {
59
constructor(
60
private readonly _ether: Ether,
61
private readonly _name: 'a' | 'b'
62
) {
63
super();
64
}
65
66
write(data: Buffer, cb?: Function): boolean {
67
if (!Buffer.isBuffer(data)) {
68
throw new Error(`Invalid data`);
69
}
70
this._ether.write(this._name, data);
71
return true;
72
}
73
74
destroy(): void {
75
}
76
}
77
78
class Ether {
79
80
private readonly _a: EtherStream;
81
private readonly _b: EtherStream;
82
83
private _ab: Buffer[];
84
private _ba: Buffer[];
85
86
public get a(): Socket {
87
// eslint-disable-next-line local/code-no-any-casts
88
return <any>this._a;
89
}
90
91
public get b(): Socket {
92
// eslint-disable-next-line local/code-no-any-casts
93
return <any>this._b;
94
}
95
96
constructor(
97
private readonly _wireLatency = 0
98
) {
99
this._a = new EtherStream(this, 'a');
100
this._b = new EtherStream(this, 'b');
101
this._ab = [];
102
this._ba = [];
103
}
104
105
public write(from: 'a' | 'b', data: Buffer): void {
106
setTimeout(() => {
107
if (from === 'a') {
108
this._ab.push(data);
109
} else {
110
this._ba.push(data);
111
}
112
113
setTimeout(() => this._deliver(), 0);
114
}, this._wireLatency);
115
}
116
117
private _deliver(): void {
118
119
if (this._ab.length > 0) {
120
const data = Buffer.concat(this._ab);
121
this._ab.length = 0;
122
this._b.emit('data', data);
123
setTimeout(() => this._deliver(), 0);
124
return;
125
}
126
127
if (this._ba.length > 0) {
128
const data = Buffer.concat(this._ba);
129
this._ba.length = 0;
130
this._a.emit('data', data);
131
setTimeout(() => this._deliver(), 0);
132
return;
133
}
134
135
}
136
}
137
138
suite('IPC, Socket Protocol', () => {
139
140
const ds = ensureNoDisposablesAreLeakedInTestSuite();
141
142
let ether: Ether;
143
144
setup(() => {
145
ether = new Ether();
146
});
147
148
test('read/write', async () => {
149
150
const a = new Protocol(new NodeSocket(ether.a));
151
const b = new Protocol(new NodeSocket(ether.b));
152
const bMessages = new MessageStream(b);
153
154
a.send(VSBuffer.fromString('foobarfarboo'));
155
const msg1 = await bMessages.waitForOne();
156
assert.strictEqual(msg1.toString(), 'foobarfarboo');
157
158
const buffer = VSBuffer.alloc(1);
159
buffer.writeUInt8(123, 0);
160
a.send(buffer);
161
const msg2 = await bMessages.waitForOne();
162
assert.strictEqual(msg2.readUInt8(0), 123);
163
164
bMessages.dispose();
165
a.dispose();
166
b.dispose();
167
});
168
169
170
test('read/write, object data', async () => {
171
172
const a = new Protocol(new NodeSocket(ether.a));
173
const b = new Protocol(new NodeSocket(ether.b));
174
const bMessages = new MessageStream(b);
175
176
const data = {
177
pi: Math.PI,
178
foo: 'bar',
179
more: true,
180
data: 'Hello World'.split('')
181
};
182
183
a.send(VSBuffer.fromString(JSON.stringify(data)));
184
const msg = await bMessages.waitForOne();
185
assert.deepStrictEqual(JSON.parse(msg.toString()), data);
186
187
bMessages.dispose();
188
a.dispose();
189
b.dispose();
190
});
191
192
193
194
test('issue #211462: destroy socket after end timeout', async () => {
195
const socket = new EventEmitter();
196
Object.assign(socket, { destroy: () => socket.emit('close') });
197
const protocol = ds.add(new Protocol(new NodeSocket(socket as Socket)));
198
199
const disposed = sinon.stub();
200
const timers = sinon.useFakeTimers();
201
202
ds.add(toDisposable(() => timers.restore()));
203
ds.add(protocol.onDidDispose(disposed));
204
205
socket.emit('end');
206
assert.ok(!disposed.called);
207
timers.tick(29_999);
208
assert.ok(!disposed.called);
209
timers.tick(1);
210
assert.ok(disposed.called);
211
});
212
});
213
214
suite('PersistentProtocol reconnection', () => {
215
216
ensureNoDisposablesAreLeakedInTestSuite();
217
218
test('acks get piggybacked with messages', async () => {
219
const ether = new Ether();
220
const a = new PersistentProtocol({ socket: new NodeSocket(ether.a) });
221
const aMessages = new MessageStream(a);
222
const b = new PersistentProtocol({ socket: new NodeSocket(ether.b) });
223
const bMessages = new MessageStream(b);
224
225
a.send(VSBuffer.fromString('a1'));
226
assert.strictEqual(a.unacknowledgedCount, 1);
227
assert.strictEqual(b.unacknowledgedCount, 0);
228
229
a.send(VSBuffer.fromString('a2'));
230
assert.strictEqual(a.unacknowledgedCount, 2);
231
assert.strictEqual(b.unacknowledgedCount, 0);
232
233
a.send(VSBuffer.fromString('a3'));
234
assert.strictEqual(a.unacknowledgedCount, 3);
235
assert.strictEqual(b.unacknowledgedCount, 0);
236
237
const a1 = await bMessages.waitForOne();
238
assert.strictEqual(a1.toString(), 'a1');
239
assert.strictEqual(a.unacknowledgedCount, 3);
240
assert.strictEqual(b.unacknowledgedCount, 0);
241
242
const a2 = await bMessages.waitForOne();
243
assert.strictEqual(a2.toString(), 'a2');
244
assert.strictEqual(a.unacknowledgedCount, 3);
245
assert.strictEqual(b.unacknowledgedCount, 0);
246
247
const a3 = await bMessages.waitForOne();
248
assert.strictEqual(a3.toString(), 'a3');
249
assert.strictEqual(a.unacknowledgedCount, 3);
250
assert.strictEqual(b.unacknowledgedCount, 0);
251
252
b.send(VSBuffer.fromString('b1'));
253
assert.strictEqual(a.unacknowledgedCount, 3);
254
assert.strictEqual(b.unacknowledgedCount, 1);
255
256
const b1 = await aMessages.waitForOne();
257
assert.strictEqual(b1.toString(), 'b1');
258
assert.strictEqual(a.unacknowledgedCount, 0);
259
assert.strictEqual(b.unacknowledgedCount, 1);
260
261
a.send(VSBuffer.fromString('a4'));
262
assert.strictEqual(a.unacknowledgedCount, 1);
263
assert.strictEqual(b.unacknowledgedCount, 1);
264
265
const b2 = await bMessages.waitForOne();
266
assert.strictEqual(b2.toString(), 'a4');
267
assert.strictEqual(a.unacknowledgedCount, 1);
268
assert.strictEqual(b.unacknowledgedCount, 0);
269
270
aMessages.dispose();
271
bMessages.dispose();
272
a.dispose();
273
b.dispose();
274
});
275
276
test('ack gets sent after a while', async () => {
277
await runWithFakedTimers({ useFakeTimers: true, maxTaskCount: 100 }, async () => {
278
const loadEstimator: ILoadEstimator = {
279
hasHighLoad: () => false
280
};
281
const ether = new Ether();
282
const aSocket = new NodeSocket(ether.a);
283
const a = new PersistentProtocol({ socket: aSocket, loadEstimator });
284
const aMessages = new MessageStream(a);
285
const bSocket = new NodeSocket(ether.b);
286
const b = new PersistentProtocol({ socket: bSocket, loadEstimator });
287
const bMessages = new MessageStream(b);
288
289
// send one message A -> B
290
a.send(VSBuffer.fromString('a1'));
291
assert.strictEqual(a.unacknowledgedCount, 1);
292
assert.strictEqual(b.unacknowledgedCount, 0);
293
const a1 = await bMessages.waitForOne();
294
assert.strictEqual(a1.toString(), 'a1');
295
assert.strictEqual(a.unacknowledgedCount, 1);
296
assert.strictEqual(b.unacknowledgedCount, 0);
297
298
// wait for ack to arrive B -> A
299
await timeout(2 * ProtocolConstants.AcknowledgeTime);
300
assert.strictEqual(a.unacknowledgedCount, 0);
301
assert.strictEqual(b.unacknowledgedCount, 0);
302
303
aMessages.dispose();
304
bMessages.dispose();
305
a.dispose();
306
b.dispose();
307
});
308
});
309
310
test('messages that are never written to a socket should not cause an ack timeout', async () => {
311
await runWithFakedTimers(
312
{
313
useFakeTimers: true,
314
useSetImmediate: true,
315
maxTaskCount: 1000
316
},
317
async () => {
318
// Date.now() in fake timers starts at 0, which is very inconvenient
319
// since we want to test exactly that a certain field is not initialized with Date.now()
320
// As a workaround we wait such that Date.now() starts producing more realistic values
321
await timeout(60 * 60 * 1000);
322
323
const loadEstimator: ILoadEstimator = {
324
hasHighLoad: () => false
325
};
326
const ether = new Ether();
327
const aSocket = new NodeSocket(ether.a);
328
const a = new PersistentProtocol({ socket: aSocket, loadEstimator, sendKeepAlive: false });
329
const aMessages = new MessageStream(a);
330
const bSocket = new NodeSocket(ether.b);
331
const b = new PersistentProtocol({ socket: bSocket, loadEstimator, sendKeepAlive: false });
332
const bMessages = new MessageStream(b);
333
334
// send message a1 before reconnection to get _recvAckCheck() scheduled
335
a.send(VSBuffer.fromString('a1'));
336
assert.strictEqual(a.unacknowledgedCount, 1);
337
assert.strictEqual(b.unacknowledgedCount, 0);
338
339
// read message a1 at B
340
const a1 = await bMessages.waitForOne();
341
assert.strictEqual(a1.toString(), 'a1');
342
assert.strictEqual(a.unacknowledgedCount, 1);
343
assert.strictEqual(b.unacknowledgedCount, 0);
344
345
// send message b1 to send the ack for a1
346
b.send(VSBuffer.fromString('b1'));
347
assert.strictEqual(a.unacknowledgedCount, 1);
348
assert.strictEqual(b.unacknowledgedCount, 1);
349
350
// read message b1 at A to receive the ack for a1
351
const b1 = await aMessages.waitForOne();
352
assert.strictEqual(b1.toString(), 'b1');
353
assert.strictEqual(a.unacknowledgedCount, 0);
354
assert.strictEqual(b.unacknowledgedCount, 1);
355
356
// begin reconnection
357
aSocket.dispose();
358
const aSocket2 = new NodeSocket(ether.a);
359
a.beginAcceptReconnection(aSocket2, null);
360
361
let timeoutListenerCalled = false;
362
const socketTimeoutListener = a.onSocketTimeout(() => {
363
timeoutListenerCalled = true;
364
});
365
366
// send message 2 during reconnection
367
a.send(VSBuffer.fromString('a2'));
368
assert.strictEqual(a.unacknowledgedCount, 1);
369
assert.strictEqual(b.unacknowledgedCount, 1);
370
371
// wait for scheduled _recvAckCheck() to execute
372
await timeout(2 * ProtocolConstants.TimeoutTime);
373
374
assert.strictEqual(a.unacknowledgedCount, 1);
375
assert.strictEqual(b.unacknowledgedCount, 1);
376
assert.strictEqual(timeoutListenerCalled, false);
377
378
a.endAcceptReconnection();
379
assert.strictEqual(timeoutListenerCalled, false);
380
381
await timeout(2 * ProtocolConstants.TimeoutTime);
382
assert.strictEqual(a.unacknowledgedCount, 0);
383
assert.strictEqual(b.unacknowledgedCount, 0);
384
assert.strictEqual(timeoutListenerCalled, false);
385
386
socketTimeoutListener.dispose();
387
aMessages.dispose();
388
bMessages.dispose();
389
a.dispose();
390
b.dispose();
391
}
392
);
393
});
394
395
test('acks are always sent after a reconnection', async () => {
396
await runWithFakedTimers(
397
{
398
useFakeTimers: true,
399
useSetImmediate: true,
400
maxTaskCount: 1000
401
},
402
async () => {
403
404
const loadEstimator: ILoadEstimator = {
405
hasHighLoad: () => false
406
};
407
const wireLatency = 1000;
408
const ether = new Ether(wireLatency);
409
const aSocket = new NodeSocket(ether.a);
410
const a = new PersistentProtocol({ socket: aSocket, loadEstimator });
411
const aMessages = new MessageStream(a);
412
const bSocket = new NodeSocket(ether.b);
413
const b = new PersistentProtocol({ socket: bSocket, loadEstimator });
414
const bMessages = new MessageStream(b);
415
416
// send message a1 to have something unacknowledged
417
a.send(VSBuffer.fromString('a1'));
418
assert.strictEqual(a.unacknowledgedCount, 1);
419
assert.strictEqual(b.unacknowledgedCount, 0);
420
421
// read message a1 at B
422
const a1 = await bMessages.waitForOne();
423
assert.strictEqual(a1.toString(), 'a1');
424
assert.strictEqual(a.unacknowledgedCount, 1);
425
assert.strictEqual(b.unacknowledgedCount, 0);
426
427
// wait for B to send an ACK message,
428
// but resume before A receives it
429
await timeout(ProtocolConstants.AcknowledgeTime + wireLatency / 2);
430
assert.strictEqual(a.unacknowledgedCount, 1);
431
assert.strictEqual(b.unacknowledgedCount, 0);
432
433
// simulate complete reconnection
434
aSocket.dispose();
435
bSocket.dispose();
436
const ether2 = new Ether(wireLatency);
437
const aSocket2 = new NodeSocket(ether2.a);
438
const bSocket2 = new NodeSocket(ether2.b);
439
b.beginAcceptReconnection(bSocket2, null);
440
b.endAcceptReconnection();
441
a.beginAcceptReconnection(aSocket2, null);
442
a.endAcceptReconnection();
443
444
// wait for quite some time
445
await timeout(2 * ProtocolConstants.AcknowledgeTime + wireLatency);
446
assert.strictEqual(a.unacknowledgedCount, 0);
447
assert.strictEqual(b.unacknowledgedCount, 0);
448
449
aMessages.dispose();
450
bMessages.dispose();
451
a.dispose();
452
b.dispose();
453
}
454
);
455
});
456
457
test('onSocketTimeout is emitted at most once every 20s', async () => {
458
await runWithFakedTimers(
459
{
460
useFakeTimers: true,
461
useSetImmediate: true,
462
maxTaskCount: 1000
463
},
464
async () => {
465
466
const loadEstimator: ILoadEstimator = {
467
hasHighLoad: () => false
468
};
469
const ether = new Ether();
470
const aSocket = new NodeSocket(ether.a);
471
const a = new PersistentProtocol({ socket: aSocket, loadEstimator });
472
const aMessages = new MessageStream(a);
473
const bSocket = new NodeSocket(ether.b);
474
const b = new PersistentProtocol({ socket: bSocket, loadEstimator });
475
const bMessages = new MessageStream(b);
476
477
// never receive acks
478
b.pauseSocketWriting();
479
480
// send message a1 to have something unacknowledged
481
a.send(VSBuffer.fromString('a1'));
482
483
// wait for the first timeout to fire
484
await Event.toPromise(a.onSocketTimeout);
485
486
let timeoutFiredAgain = false;
487
const timeoutListener = a.onSocketTimeout(() => {
488
timeoutFiredAgain = true;
489
});
490
491
// send more messages
492
a.send(VSBuffer.fromString('a2'));
493
a.send(VSBuffer.fromString('a3'));
494
495
// wait for 10s
496
await timeout(ProtocolConstants.TimeoutTime / 2);
497
498
assert.strictEqual(timeoutFiredAgain, false);
499
500
timeoutListener.dispose();
501
aMessages.dispose();
502
bMessages.dispose();
503
a.dispose();
504
b.dispose();
505
}
506
);
507
});
508
509
test('writing can be paused', async () => {
510
await runWithFakedTimers({ useFakeTimers: true, maxTaskCount: 100 }, async () => {
511
const loadEstimator: ILoadEstimator = {
512
hasHighLoad: () => false
513
};
514
const ether = new Ether();
515
const aSocket = new NodeSocket(ether.a);
516
const a = new PersistentProtocol({ socket: aSocket, loadEstimator });
517
const aMessages = new MessageStream(a);
518
const bSocket = new NodeSocket(ether.b);
519
const b = new PersistentProtocol({ socket: bSocket, loadEstimator });
520
const bMessages = new MessageStream(b);
521
522
// send one message A -> B
523
a.send(VSBuffer.fromString('a1'));
524
const a1 = await bMessages.waitForOne();
525
assert.strictEqual(a1.toString(), 'a1');
526
527
// ask A to pause writing
528
b.sendPause();
529
530
// send a message B -> A
531
b.send(VSBuffer.fromString('b1'));
532
const b1 = await aMessages.waitForOne();
533
assert.strictEqual(b1.toString(), 'b1');
534
535
// send a message A -> B (this should be blocked at A)
536
a.send(VSBuffer.fromString('a2'));
537
538
// wait a long time and check that not even acks are written
539
await timeout(2 * ProtocolConstants.AcknowledgeTime);
540
assert.strictEqual(a.unacknowledgedCount, 1);
541
assert.strictEqual(b.unacknowledgedCount, 1);
542
543
// ask A to resume writing
544
b.sendResume();
545
546
// check that B receives message
547
const a2 = await bMessages.waitForOne();
548
assert.strictEqual(a2.toString(), 'a2');
549
550
// wait a long time and check that acks are written
551
await timeout(2 * ProtocolConstants.AcknowledgeTime);
552
assert.strictEqual(a.unacknowledgedCount, 0);
553
assert.strictEqual(b.unacknowledgedCount, 0);
554
555
aMessages.dispose();
556
bMessages.dispose();
557
a.dispose();
558
b.dispose();
559
});
560
});
561
});
562
563
flakySuite('IPC, create handle', () => {
564
565
test('createRandomIPCHandle', async () => {
566
return testIPCHandle(createRandomIPCHandle());
567
});
568
569
test('createStaticIPCHandle', async () => {
570
return testIPCHandle(createStaticIPCHandle(tmpdir(), 'test', '1.64.0'));
571
});
572
573
function testIPCHandle(handle: string): Promise<void> {
574
return new Promise<void>((resolve, reject) => {
575
const pipeName = createRandomIPCHandle();
576
577
const server = createServer();
578
579
server.on('error', () => {
580
return new Promise(() => server.close(() => reject()));
581
});
582
583
server.listen(pipeName, () => {
584
server.removeListener('error', reject);
585
586
return new Promise(() => {
587
server.close(() => resolve());
588
});
589
});
590
});
591
}
592
593
});
594
595
suite('WebSocketNodeSocket', () => {
596
597
const ds = ensureNoDisposablesAreLeakedInTestSuite();
598
599
function toUint8Array(data: number[]): Uint8Array {
600
const result = new Uint8Array(data.length);
601
for (let i = 0; i < data.length; i++) {
602
result[i] = data[i];
603
}
604
return result;
605
}
606
607
function fromUint8Array(data: Uint8Array): number[] {
608
const result = [];
609
for (let i = 0; i < data.length; i++) {
610
result[i] = data[i];
611
}
612
return result;
613
}
614
615
function fromCharCodeArray(data: number[]): string {
616
let result = '';
617
for (let i = 0; i < data.length; i++) {
618
result += String.fromCharCode(data[i]);
619
}
620
return result;
621
}
622
623
class FakeNodeSocket extends Disposable {
624
625
private readonly _onData = new Emitter<VSBuffer>();
626
public readonly onData = this._onData.event;
627
628
private readonly _onClose = new Emitter<SocketCloseEvent>();
629
public readonly onClose = this._onClose.event;
630
631
public writtenData: VSBuffer[] = [];
632
633
public traceSocketEvent(type: SocketDiagnosticsEventType, data?: VSBuffer | Uint8Array | ArrayBuffer | ArrayBufferView | any): void {
634
}
635
636
constructor() {
637
super();
638
}
639
640
public write(data: VSBuffer): void {
641
this.writtenData.push(data);
642
}
643
644
public fireData(data: number[]): void {
645
this._onData.fire(VSBuffer.wrap(toUint8Array(data)));
646
}
647
}
648
649
async function testReading(frames: number[][], permessageDeflate: boolean): Promise<string> {
650
const disposables = new DisposableStore();
651
const socket = new FakeNodeSocket();
652
// eslint-disable-next-line local/code-no-any-casts
653
const webSocket = disposables.add(new WebSocketNodeSocket(<any>socket, permessageDeflate, null, false));
654
655
const barrier = new Barrier();
656
let remainingFrameCount = frames.length;
657
658
let receivedData: string = '';
659
disposables.add(webSocket.onData((buff) => {
660
receivedData += fromCharCodeArray(fromUint8Array(buff.buffer));
661
remainingFrameCount--;
662
if (remainingFrameCount === 0) {
663
barrier.open();
664
}
665
}));
666
667
for (let i = 0; i < frames.length; i++) {
668
socket.fireData(frames[i]);
669
}
670
671
await barrier.wait();
672
673
disposables.dispose();
674
675
return receivedData;
676
}
677
678
test('A single-frame unmasked text message', async () => {
679
const frames = [
680
[0x81, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f] // contains "Hello"
681
];
682
const actual = await testReading(frames, false);
683
assert.deepStrictEqual(actual, 'Hello');
684
});
685
686
test('A single-frame masked text message', async () => {
687
const frames = [
688
[0x81, 0x85, 0x37, 0xfa, 0x21, 0x3d, 0x7f, 0x9f, 0x4d, 0x51, 0x58] // contains "Hello"
689
];
690
const actual = await testReading(frames, false);
691
assert.deepStrictEqual(actual, 'Hello');
692
});
693
694
test('A fragmented unmasked text message', async () => {
695
// contains "Hello"
696
const frames = [
697
[0x01, 0x03, 0x48, 0x65, 0x6c], // contains "Hel"
698
[0x80, 0x02, 0x6c, 0x6f], // contains "lo"
699
];
700
const actual = await testReading(frames, false);
701
assert.deepStrictEqual(actual, 'Hello');
702
});
703
704
suite('compression', () => {
705
test('A single-frame compressed text message', async () => {
706
// contains "Hello"
707
const frames = [
708
[0xc1, 0x07, 0xf2, 0x48, 0xcd, 0xc9, 0xc9, 0x07, 0x00], // contains "Hello"
709
];
710
const actual = await testReading(frames, true);
711
assert.deepStrictEqual(actual, 'Hello');
712
});
713
714
test('A fragmented compressed text message', async () => {
715
// contains "Hello"
716
const frames = [ // contains "Hello"
717
[0x41, 0x03, 0xf2, 0x48, 0xcd],
718
[0x80, 0x04, 0xc9, 0xc9, 0x07, 0x00]
719
];
720
const actual = await testReading(frames, true);
721
assert.deepStrictEqual(actual, 'Hello');
722
});
723
724
test('A single-frame non-compressed text message', async () => {
725
const frames = [
726
[0x81, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f] // contains "Hello"
727
];
728
const actual = await testReading(frames, true);
729
assert.deepStrictEqual(actual, 'Hello');
730
});
731
732
test('A single-frame compressed text message followed by a single-frame non-compressed text message', async () => {
733
const frames = [
734
[0xc1, 0x07, 0xf2, 0x48, 0xcd, 0xc9, 0xc9, 0x07, 0x00], // contains "Hello"
735
[0x81, 0x05, 0x77, 0x6f, 0x72, 0x6c, 0x64] // contains "world"
736
];
737
const actual = await testReading(frames, true);
738
assert.deepStrictEqual(actual, 'Helloworld');
739
});
740
});
741
742
test('Large buffers are split and sent in chunks', async () => {
743
744
let receivingSideOnDataCallCount = 0;
745
let receivingSideTotalBytes = 0;
746
const receivingSideSocketClosedBarrier = new Barrier();
747
748
const server = await listenOnRandomPort((socket) => {
749
// stop the server when the first connection is received
750
server.close();
751
752
const webSocketNodeSocket = new WebSocketNodeSocket(new NodeSocket(socket), true, null, false);
753
ds.add(webSocketNodeSocket.onData((data) => {
754
receivingSideOnDataCallCount++;
755
receivingSideTotalBytes += data.byteLength;
756
}));
757
758
ds.add(webSocketNodeSocket.onClose(() => {
759
webSocketNodeSocket.dispose();
760
receivingSideSocketClosedBarrier.open();
761
}));
762
});
763
764
const socket = connect({
765
host: '127.0.0.1',
766
port: (<AddressInfo>server.address()).port
767
});
768
769
const buff = generateRandomBuffer(1 * 1024 * 1024);
770
771
const webSocketNodeSocket = new WebSocketNodeSocket(new NodeSocket(socket), true, null, false);
772
webSocketNodeSocket.write(buff);
773
await webSocketNodeSocket.drain();
774
webSocketNodeSocket.dispose();
775
await receivingSideSocketClosedBarrier.wait();
776
777
assert.strictEqual(receivingSideTotalBytes, buff.byteLength);
778
assert.strictEqual(receivingSideOnDataCallCount, 4);
779
});
780
781
test('issue #194284: ping/pong opcodes are supported', async () => {
782
783
const disposables = new DisposableStore();
784
const socket = new FakeNodeSocket();
785
// eslint-disable-next-line local/code-no-any-casts
786
const webSocket = disposables.add(new WebSocketNodeSocket(<any>socket, false, null, false));
787
788
let receivedData: string = '';
789
disposables.add(webSocket.onData((buff) => {
790
receivedData += fromCharCodeArray(fromUint8Array(buff.buffer));
791
}));
792
793
// A single-frame non-compressed text message that contains "Hello"
794
socket.fireData([0x81, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f]);
795
796
// A ping message that contains "data"
797
socket.fireData([0x89, 0x04, 0x64, 0x61, 0x74, 0x61]);
798
799
// Another single-frame non-compressed text message that contains "Hello"
800
socket.fireData([0x81, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f]);
801
802
assert.strictEqual(receivedData, 'HelloHello');
803
assert.deepStrictEqual(
804
socket.writtenData.map(x => fromUint8Array(x.buffer)),
805
[
806
// A pong message that contains "data"
807
[0x8A, 0x04, 0x64, 0x61, 0x74, 0x61]
808
]
809
);
810
811
disposables.dispose();
812
813
return receivedData;
814
});
815
816
function generateRandomBuffer(size: number): VSBuffer {
817
const buff = VSBuffer.alloc(size);
818
for (let i = 0; i < size; i++) {
819
buff.writeUInt8(Math.floor(256 * Math.random()), i);
820
}
821
return buff;
822
}
823
824
function listenOnRandomPort(handler: (socket: Socket) => void): Promise<Server> {
825
return new Promise((resolve, reject) => {
826
const server = createServer(handler).listen(0);
827
server.on('listening', () => {
828
resolve(server);
829
});
830
server.on('error', (err) => {
831
reject(err);
832
});
833
});
834
}
835
});
836
837