Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/gitpod-protocol/src/messaging/browser/connection.ts
2501 views
1
/*
2
* Copyright (C) 2017 TypeFox and others.
3
*
4
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
5
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
*/
7
8
import { Logger, ConsoleLogger, toSocket, IWebSocket } from "vscode-ws-jsonrpc";
9
import { createMessageConnection } from "vscode-jsonrpc";
10
import { AbstractMessageWriter } from "vscode-jsonrpc/lib/messageWriter";
11
import { AbstractMessageReader } from "vscode-jsonrpc/lib/messageReader";
12
import { JsonRpcProxyFactory, JsonRpcProxy } from "../proxy-factory";
13
import { ConnectionEventHandler, ConnectionHandler } from "../handler";
14
import ReconnectingWebSocket, { Event } from "reconnecting-websocket";
15
import { log } from "../../util/logging";
16
17
export interface WebSocketOptions {
18
onerror?: (event: Event) => void;
19
onListening?: (socket: ReconnectingWebSocket) => void;
20
}
21
22
export class WebSocketConnectionProvider {
23
/**
24
* Create a proxy object to remote interface of T type
25
* over a web socket connection for the given path.
26
*
27
* An optional target can be provided to handle
28
* notifications and requests from a remote side.
29
*/
30
createProxy<T extends object>(
31
path: string | Promise<string>,
32
target?: object,
33
options?: WebSocketOptions,
34
): JsonRpcProxy<T> {
35
const factory = new JsonRpcProxyFactory<T>(target);
36
const startListening = (path: string) => {
37
const socket = this.listen(
38
{
39
path,
40
onConnection: (c) => factory.listen(c),
41
},
42
{
43
onTransportDidClose: () => factory.fireConnectionClosed(),
44
onTransportDidOpen: () => factory.fireConnectionOpened(),
45
},
46
options,
47
);
48
if (options?.onListening) {
49
options.onListening(socket as any as ReconnectingWebSocket);
50
}
51
};
52
53
if (typeof path === "string") {
54
startListening(path);
55
} else {
56
path.then((path) => startListening(path));
57
}
58
return factory.createProxy();
59
}
60
61
/**
62
* Install a connection handler for the given path.
63
*/
64
listen(handler: ConnectionHandler, eventHandler: ConnectionEventHandler, options?: WebSocketOptions): WebSocket {
65
const url = handler.path;
66
const webSocket = this.createWebSocket(url);
67
68
const logger = this.createLogger();
69
if (options && options.onerror) {
70
const onerror = options.onerror;
71
webSocket.addEventListener("error", (event) => {
72
onerror(event);
73
});
74
} else {
75
webSocket.addEventListener("error", (error: Event) => {
76
logger.error(JSON.stringify(error));
77
});
78
}
79
doListen(webSocket as any as ReconnectingWebSocket, handler, eventHandler, logger);
80
return webSocket;
81
}
82
83
protected createLogger(): Logger {
84
return new ConsoleLogger();
85
}
86
87
/**
88
* Creates a web socket for the given url
89
*/
90
createWebSocket(url: string, WebSocketConstructor = WebSocket): WebSocket {
91
return new ReconnectingWebSocket(url, undefined, {
92
maxReconnectionDelay: 10000,
93
minReconnectionDelay: 1000,
94
reconnectionDelayGrowFactor: 1.3,
95
maxRetries: Infinity,
96
debug: false,
97
WebSocket: WebSocketConstructor,
98
}) as any;
99
}
100
}
101
102
// The following was extracted from vscode-ws-jsonrpc to make these changes:
103
// - switch from WebSocket to ReconnectingWebSocket
104
// - webSocket.onopen: making sure it's only ever called once so we're re-using MessageConnection
105
// - WebSocketMessageWriter: buffer and re-try messages instead of throwing an error immidiately
106
// - WebSocketMessageReader: don't close MessageConnection on 'socket.onclose'
107
function doListen(
108
resocket: ReconnectingWebSocket,
109
handler: ConnectionHandler,
110
eventHandler: ConnectionEventHandler,
111
logger: Logger,
112
) {
113
resocket.addEventListener("close", () => eventHandler.onTransportDidClose());
114
115
let alreadyOpened = false;
116
resocket.onopen = () => {
117
// trigerr "open" every time we re-open the underlying websocket
118
eventHandler.onTransportDidOpen();
119
120
// make sure we're only ever creating one MessageConnection, irregardless of how many times we have to re-open the underlying (reconnecting) websocket
121
if (alreadyOpened) {
122
return;
123
}
124
alreadyOpened = true;
125
126
const connection = createWebSocketConnection(resocket, logger);
127
handler.onConnection(connection);
128
};
129
}
130
131
function createWebSocketConnection(resocket: ReconnectingWebSocket, logger: Logger) {
132
const socket = toSocket(resocket as any);
133
const messageReader = new NonClosingWebSocketMessageReader(socket);
134
const messageWriter = new BufferingWebSocketMessageWriter(resocket, logger);
135
const connection = createMessageConnection(messageReader, messageWriter, logger);
136
connection.onClose(() => connection.dispose());
137
return connection;
138
}
139
140
/**
141
* This takes vscode-ws-jsonrpc/lib/socket/writer/WebSocketMessageWriter and adds a buffer
142
*/
143
class BufferingWebSocketMessageWriter extends AbstractMessageWriter {
144
protected readonly socket: ReconnectingWebSocket;
145
protected readonly logger: Logger;
146
protected errorCount: number = 0;
147
148
protected buffer: any[] = [];
149
150
constructor(socket: ReconnectingWebSocket, logger: Logger) {
151
super();
152
this.socket = socket;
153
this.logger = logger;
154
155
socket.addEventListener("open", (event: Event) => this.flushBuffer());
156
}
157
158
write(msg: any) {
159
if (this.socket.readyState !== ReconnectingWebSocket.OPEN) {
160
this.bufferMsg(msg);
161
return;
162
}
163
164
try {
165
const content = JSON.stringify(msg);
166
this.socket.send(content);
167
} catch (e) {
168
this.errorCount++;
169
this.fireError(e, msg, this.errorCount);
170
171
this.bufferMsg(msg);
172
}
173
}
174
175
protected flushBuffer() {
176
if (this.buffer.length === 0) {
177
return;
178
}
179
180
const buffer = [...this.buffer];
181
this.buffer = [];
182
for (const msg of buffer) {
183
this.write(msg);
184
}
185
//this.logger.info(`flushed buffer (${this.buffer.length})`)
186
}
187
188
protected bufferMsg(msg: any) {
189
this.buffer.push(msg);
190
//this.logger.info(`buffered message (${this.buffer.length})`);
191
}
192
}
193
194
/**
195
* This takes vscode-ws-jsonrpc/lib/socket/reader/WebSocketMessageReader and removes the "onClose -> fireClose" connection
196
*/
197
class NonClosingWebSocketMessageReader extends AbstractMessageReader {
198
protected readonly socket: IWebSocket;
199
protected readonly events: any[] = [];
200
protected state: "initial" | "listening" | "closed" = "initial";
201
protected callback: (message: any) => void = () => {};
202
203
constructor(socket: IWebSocket) {
204
super();
205
this.socket = socket;
206
this.socket.onMessage((message) => this.readMessage(message));
207
this.socket.onError((error) => this.fireError(error));
208
this.socket.onClose((code, reason) => {
209
if (code !== 1000) {
210
const error = {
211
name: "" + code,
212
message: `Error during socket reconnect: code = ${code}, reason = ${reason}`,
213
};
214
this.fireError(error);
215
}
216
// this.fireClose(); // <-- reason for this class to be copied over
217
});
218
}
219
listen(callback: (message: any) => void) {
220
if (this.state === "initial") {
221
this.state = "listening";
222
this.callback = callback;
223
while (this.events.length !== 0) {
224
const event = this.events.pop();
225
if (event.message) {
226
this.readMessage(event.message);
227
} else if (event.error) {
228
this.fireError(event.error);
229
} else {
230
this.fireClose();
231
}
232
}
233
}
234
}
235
readMessage(message: any) {
236
if (this.state === "initial") {
237
this.events.splice(0, 0, { message });
238
} else if (this.state === "listening") {
239
try {
240
const data = JSON.parse(message);
241
this.callback(data);
242
} catch (error) {
243
log.debug("Failed to decode JSON-RPC message.", error);
244
}
245
}
246
}
247
fireError(error: any) {
248
if (this.state === "initial") {
249
this.events.splice(0, 0, { error });
250
} else if (this.state === "listening") {
251
super.fireError(error);
252
}
253
}
254
fireClose() {
255
if (this.state === "initial") {
256
this.events.splice(0, 0, {});
257
} else if (this.state === "listening") {
258
super.fireClose();
259
}
260
this.state = "closed";
261
}
262
}
263
264