Path: blob/main/components/gitpod-protocol/src/messaging/browser/connection.ts
2501 views
/*1* Copyright (C) 2017 TypeFox and others.2*3* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.4* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.05*/67import { Logger, ConsoleLogger, toSocket, IWebSocket } from "vscode-ws-jsonrpc";8import { createMessageConnection } from "vscode-jsonrpc";9import { AbstractMessageWriter } from "vscode-jsonrpc/lib/messageWriter";10import { AbstractMessageReader } from "vscode-jsonrpc/lib/messageReader";11import { JsonRpcProxyFactory, JsonRpcProxy } from "../proxy-factory";12import { ConnectionEventHandler, ConnectionHandler } from "../handler";13import ReconnectingWebSocket, { Event } from "reconnecting-websocket";14import { log } from "../../util/logging";1516export interface WebSocketOptions {17onerror?: (event: Event) => void;18onListening?: (socket: ReconnectingWebSocket) => void;19}2021export class WebSocketConnectionProvider {22/**23* Create a proxy object to remote interface of T type24* over a web socket connection for the given path.25*26* An optional target can be provided to handle27* notifications and requests from a remote side.28*/29createProxy<T extends object>(30path: string | Promise<string>,31target?: object,32options?: WebSocketOptions,33): JsonRpcProxy<T> {34const factory = new JsonRpcProxyFactory<T>(target);35const startListening = (path: string) => {36const socket = this.listen(37{38path,39onConnection: (c) => factory.listen(c),40},41{42onTransportDidClose: () => factory.fireConnectionClosed(),43onTransportDidOpen: () => factory.fireConnectionOpened(),44},45options,46);47if (options?.onListening) {48options.onListening(socket as any as ReconnectingWebSocket);49}50};5152if (typeof path === "string") {53startListening(path);54} else {55path.then((path) => startListening(path));56}57return factory.createProxy();58}5960/**61* Install a connection handler for the given path.62*/63listen(handler: ConnectionHandler, eventHandler: ConnectionEventHandler, options?: WebSocketOptions): WebSocket {64const url = handler.path;65const webSocket = this.createWebSocket(url);6667const logger = this.createLogger();68if (options && options.onerror) {69const onerror = options.onerror;70webSocket.addEventListener("error", (event) => {71onerror(event);72});73} else {74webSocket.addEventListener("error", (error: Event) => {75logger.error(JSON.stringify(error));76});77}78doListen(webSocket as any as ReconnectingWebSocket, handler, eventHandler, logger);79return webSocket;80}8182protected createLogger(): Logger {83return new ConsoleLogger();84}8586/**87* Creates a web socket for the given url88*/89createWebSocket(url: string, WebSocketConstructor = WebSocket): WebSocket {90return new ReconnectingWebSocket(url, undefined, {91maxReconnectionDelay: 10000,92minReconnectionDelay: 1000,93reconnectionDelayGrowFactor: 1.3,94maxRetries: Infinity,95debug: false,96WebSocket: WebSocketConstructor,97}) as any;98}99}100101// The following was extracted from vscode-ws-jsonrpc to make these changes:102// - switch from WebSocket to ReconnectingWebSocket103// - webSocket.onopen: making sure it's only ever called once so we're re-using MessageConnection104// - WebSocketMessageWriter: buffer and re-try messages instead of throwing an error immidiately105// - WebSocketMessageReader: don't close MessageConnection on 'socket.onclose'106function doListen(107resocket: ReconnectingWebSocket,108handler: ConnectionHandler,109eventHandler: ConnectionEventHandler,110logger: Logger,111) {112resocket.addEventListener("close", () => eventHandler.onTransportDidClose());113114let alreadyOpened = false;115resocket.onopen = () => {116// trigerr "open" every time we re-open the underlying websocket117eventHandler.onTransportDidOpen();118119// make sure we're only ever creating one MessageConnection, irregardless of how many times we have to re-open the underlying (reconnecting) websocket120if (alreadyOpened) {121return;122}123alreadyOpened = true;124125const connection = createWebSocketConnection(resocket, logger);126handler.onConnection(connection);127};128}129130function createWebSocketConnection(resocket: ReconnectingWebSocket, logger: Logger) {131const socket = toSocket(resocket as any);132const messageReader = new NonClosingWebSocketMessageReader(socket);133const messageWriter = new BufferingWebSocketMessageWriter(resocket, logger);134const connection = createMessageConnection(messageReader, messageWriter, logger);135connection.onClose(() => connection.dispose());136return connection;137}138139/**140* This takes vscode-ws-jsonrpc/lib/socket/writer/WebSocketMessageWriter and adds a buffer141*/142class BufferingWebSocketMessageWriter extends AbstractMessageWriter {143protected readonly socket: ReconnectingWebSocket;144protected readonly logger: Logger;145protected errorCount: number = 0;146147protected buffer: any[] = [];148149constructor(socket: ReconnectingWebSocket, logger: Logger) {150super();151this.socket = socket;152this.logger = logger;153154socket.addEventListener("open", (event: Event) => this.flushBuffer());155}156157write(msg: any) {158if (this.socket.readyState !== ReconnectingWebSocket.OPEN) {159this.bufferMsg(msg);160return;161}162163try {164const content = JSON.stringify(msg);165this.socket.send(content);166} catch (e) {167this.errorCount++;168this.fireError(e, msg, this.errorCount);169170this.bufferMsg(msg);171}172}173174protected flushBuffer() {175if (this.buffer.length === 0) {176return;177}178179const buffer = [...this.buffer];180this.buffer = [];181for (const msg of buffer) {182this.write(msg);183}184//this.logger.info(`flushed buffer (${this.buffer.length})`)185}186187protected bufferMsg(msg: any) {188this.buffer.push(msg);189//this.logger.info(`buffered message (${this.buffer.length})`);190}191}192193/**194* This takes vscode-ws-jsonrpc/lib/socket/reader/WebSocketMessageReader and removes the "onClose -> fireClose" connection195*/196class NonClosingWebSocketMessageReader extends AbstractMessageReader {197protected readonly socket: IWebSocket;198protected readonly events: any[] = [];199protected state: "initial" | "listening" | "closed" = "initial";200protected callback: (message: any) => void = () => {};201202constructor(socket: IWebSocket) {203super();204this.socket = socket;205this.socket.onMessage((message) => this.readMessage(message));206this.socket.onError((error) => this.fireError(error));207this.socket.onClose((code, reason) => {208if (code !== 1000) {209const error = {210name: "" + code,211message: `Error during socket reconnect: code = ${code}, reason = ${reason}`,212};213this.fireError(error);214}215// this.fireClose(); // <-- reason for this class to be copied over216});217}218listen(callback: (message: any) => void) {219if (this.state === "initial") {220this.state = "listening";221this.callback = callback;222while (this.events.length !== 0) {223const event = this.events.pop();224if (event.message) {225this.readMessage(event.message);226} else if (event.error) {227this.fireError(event.error);228} else {229this.fireClose();230}231}232}233}234readMessage(message: any) {235if (this.state === "initial") {236this.events.splice(0, 0, { message });237} else if (this.state === "listening") {238try {239const data = JSON.parse(message);240this.callback(data);241} catch (error) {242log.debug("Failed to decode JSON-RPC message.", error);243}244}245}246fireError(error: any) {247if (this.state === "initial") {248this.events.splice(0, 0, { error });249} else if (this.state === "listening") {250super.fireError(error);251}252}253fireClose() {254if (this.state === "initial") {255this.events.splice(0, 0, {});256} else if (this.state === "listening") {257super.fireClose();258}259this.state = "closed";260}261}262263264