Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
ulixee
GitHub Repository: ulixee/secret-agent
Path: blob/main/mitm/lib/SocketPool.ts
1030 views
1
import Queue from '@secret-agent/commons/Queue';
2
import Log from '@secret-agent/commons/Logger';
3
import { IBoundLog } from '@secret-agent/interfaces/ILog';
4
import MitmSocket from '@secret-agent/mitm-socket';
5
import Resolvable from '@secret-agent/commons/Resolvable';
6
import { CanceledPromiseError } from '@secret-agent/commons/interfaces/IPendingWaitEvent';
7
import { ClientHttp2Session } from 'http2';
8
import EventSubscriber from '@secret-agent/commons/EventSubscriber';
9
import RequestSession from '../handlers/RequestSession';
10
11
const { log } = Log(module);
12
13
export default class SocketPool {
14
public alpn: string;
15
public isClosing = false;
16
public readonly eventSubscriber = new EventSubscriber();
17
private all = new Set<MitmSocket>();
18
private pooled = 0;
19
private free = new Set<MitmSocket>();
20
private pending: Resolvable<void>[] = [];
21
private readonly http2Sessions: IHttp2Session[] = [];
22
private queue: Queue;
23
private logger: IBoundLog;
24
25
constructor(origin: string, readonly maxConnections, readonly session: RequestSession) {
26
this.logger = log.createChild(module, { sessionId: session.sessionId, origin });
27
this.queue = new Queue('SOCKET TO ORIGIN');
28
}
29
30
public freeSocket(socket: MitmSocket): void {
31
this.free.add(socket);
32
const pending = this.pending.shift();
33
if (pending) {
34
pending.resolve();
35
}
36
}
37
38
public async isHttp2(
39
isWebsocket: boolean,
40
createSocket: () => Promise<MitmSocket>,
41
): Promise<boolean> {
42
if (this.alpn) return this.alpn === 'h2';
43
if (this.queue.isActive) {
44
// eslint-disable-next-line require-await
45
const alpn = await this.queue.run(() => Promise.resolve(this.alpn));
46
if (alpn) return alpn === 'h2';
47
}
48
const socket = await this.getSocket(isWebsocket, createSocket);
49
this.freeSocket(socket);
50
return socket.isHttp2();
51
}
52
53
public getSocket(
54
isWebsocket: boolean,
55
createSocket: () => Promise<MitmSocket>,
56
): Promise<MitmSocket> {
57
return this.queue.run(async () => {
58
const http2Session = this.getHttp2Session();
59
if (http2Session && !isWebsocket) {
60
return Promise.resolve(http2Session.mitmSocket);
61
}
62
63
if (this.pooled >= this.maxConnections && (this.pending.length || this.free.size === 0)) {
64
const pending = new Resolvable<void>();
65
this.pending.push(pending);
66
await pending.promise;
67
}
68
69
if (this.free.size) {
70
const first = this.free.values().next().value;
71
this.free.delete(first);
72
return first;
73
}
74
75
const mitmSocket = await createSocket();
76
this.eventSubscriber.on(mitmSocket, 'close', this.onSocketClosed.bind(this, mitmSocket));
77
this.alpn = mitmSocket.alpn;
78
79
this.all.add(mitmSocket);
80
81
// don't put connections that can't be reused into the pool
82
if (!mitmSocket.isHttp2() && !isWebsocket) {
83
this.pooled += 1;
84
}
85
86
return mitmSocket;
87
});
88
}
89
90
public close(): void {
91
this.queue.willStop();
92
for (const pending of this.pending) {
93
pending.reject(new CanceledPromiseError('Shutting down socket pool'));
94
}
95
this.pending.length = 0;
96
for (const session of this.http2Sessions) {
97
try {
98
session.mitmSocket.close();
99
session.client.destroy();
100
session.client.unref();
101
} catch (err) {
102
// don't need to log closing sessions
103
}
104
}
105
this.http2Sessions.length = 0;
106
this.eventSubscriber.close();
107
for (const socket of this.all) {
108
socket.close();
109
}
110
this.all.clear();
111
this.free.clear();
112
this.queue.stop(new CanceledPromiseError('Shutting down socket pool'));
113
}
114
115
public getHttp2Session(): IHttp2Session | undefined {
116
return this.http2Sessions[0];
117
}
118
119
public registerHttp2Session(client: ClientHttp2Session, mitmSocket: MitmSocket): void {
120
if (this.http2Sessions.some(x => x.client === client)) return;
121
122
const entry = { mitmSocket, client };
123
this.http2Sessions.push(entry);
124
this.eventSubscriber.on(client, 'close', () => this.closeHttp2Session(entry));
125
this.eventSubscriber.on(client, 'goaway', () => this.closeHttp2Session(entry));
126
}
127
128
private onSocketClosed(socket: MitmSocket): void {
129
this.logger.stats('Socket closed');
130
this.session.emit('socket-close', { socket });
131
132
this.free.delete(socket);
133
if (this.all.delete(socket)) {
134
this.pooled -= 1;
135
}
136
137
if (this.session.isClosing || socket.isWebsocket) return;
138
139
if (this.pooled < this.maxConnections) this.pending.shift()?.resolve();
140
}
141
142
private closeHttp2Session(session: IHttp2Session): void {
143
const idx = this.http2Sessions.indexOf(session);
144
if (idx >= 0) this.http2Sessions.splice(idx, 1);
145
const { mitmSocket, client } = session;
146
client.close();
147
mitmSocket.close();
148
}
149
}
150
151
interface IHttp2Session {
152
client: ClientHttp2Session;
153
mitmSocket: MitmSocket;
154
}
155
156