Path: blob/main/components/gitpod-protocol/go/reconnecting-ws.go
2498 views
// Copyright (c) 2020 Gitpod GmbH. All rights reserved.1// Licensed under the GNU Affero General Public License (AGPL).2// See License.AGPL.txt in the project root for license information.34package protocol56import (7"context"8"encoding/json"9"errors"10"fmt"11"net/http"12"sync"13"time"1415backoff "github.com/cenkalti/backoff/v4"16"github.com/gorilla/websocket"17"github.com/sirupsen/logrus"18)1920// ErrClosed is returned when the reconnecting web socket is closed.21var ErrClosed = errors.New("reconnecting-ws: closed")2223// ErrBadHandshake is returned when the server response to opening handshake is24// invalid.25type ErrBadHandshake struct {26URL string27Resp *http.Response28}2930func (e *ErrBadHandshake) Error() string {31var statusCode int32if e.Resp != nil {33statusCode = e.Resp.StatusCode34}35return fmt.Sprintf("reconnecting-ws: bad handshake: code %v - URL: %v", statusCode, e.URL)36}3738// The ReconnectingWebsocket represents a Reconnecting WebSocket connection.39type ReconnectingWebsocket struct {40url string41reqHeader http.Header42handshakeTimeout time.Duration4344once sync.Once45closeErr error46closedCh chan struct{}47connCh chan chan *WebsocketConnection48errCh chan error4950log *logrus.Entry5152ReconnectionHandler func()5354badHandshakeCount uint855badHandshakeMax uint856}5758// NewReconnectingWebsocket creates a new instance of ReconnectingWebsocket59func NewReconnectingWebsocket(url string, reqHeader http.Header, log *logrus.Entry) *ReconnectingWebsocket {60return &ReconnectingWebsocket{61url: url,62reqHeader: reqHeader,63handshakeTimeout: 2 * time.Second,64connCh: make(chan chan *WebsocketConnection),65closedCh: make(chan struct{}),66errCh: make(chan error),67log: log,68badHandshakeCount: 0,69badHandshakeMax: 15,70}71}7273// Close closes the underlying webscoket connection.74func (rc *ReconnectingWebsocket) Close() error {75return rc.closeWithError(ErrClosed)76}7778func (rc *ReconnectingWebsocket) closeWithError(closeErr error) error {79rc.once.Do(func() {80rc.closeErr = closeErr81close(rc.closedCh)82})83return nil84}8586// EnsureConnection ensures ws connections87// Returns only if connection is permanently failed88// If the passed handler returns false as closed then err is returned to the client,89// otherwise err is treated as a connection error, and new conneciton is provided.90func (rc *ReconnectingWebsocket) EnsureConnection(handler func(conn *WebsocketConnection) (closed bool, err error)) error {91for {92connCh := make(chan *WebsocketConnection, 1)93select {94case <-rc.closedCh:95return rc.closeErr96case rc.connCh <- connCh:97}98conn := <-connCh99closed, err := handler(conn)100if !closed {101return err102}103select {104case <-rc.closedCh:105return rc.closeErr106case rc.errCh <- err:107}108}109}110111func isJSONError(err error) bool {112_, isJsonErr := err.(*json.MarshalerError)113if isJsonErr {114return true115}116_, isJsonErr = err.(*json.SyntaxError)117if isJsonErr {118return true119}120_, isJsonErr = err.(*json.UnsupportedTypeError)121if isJsonErr {122return true123}124_, isJsonErr = err.(*json.UnsupportedValueError)125return isJsonErr126}127128// WriteObject writes the JSON encoding of v as a message.129// See the documentation for encoding/json Marshal for details about the conversion of Go values to JSON.130func (rc *ReconnectingWebsocket) WriteObject(v interface{}) error {131return rc.EnsureConnection(func(conn *WebsocketConnection) (bool, error) {132err := conn.WriteJSON(v)133closed := err != nil && !isJSONError(err)134return closed, err135})136}137138// ReadObject reads the next JSON-encoded message from the connection and stores it in the value pointed to by v.139// See the documentation for the encoding/json Unmarshal function for details about the conversion of JSON to a Go value.140func (rc *ReconnectingWebsocket) ReadObject(v interface{}) error {141return rc.EnsureConnection(func(conn *WebsocketConnection) (bool, error) {142err := conn.ReadJSON(v)143closed := err != nil && !isJSONError(err)144return closed, err145})146}147148// Dial creates a new client connection.149func (rc *ReconnectingWebsocket) Dial(ctx context.Context) error {150var conn *WebsocketConnection151defer func() {152if conn == nil {153return154}155rc.log.WithField("url", rc.url).Debug("connection is permanently closed")156conn.Close()157}()158159conn = rc.connect(ctx)160161for {162select {163case <-rc.closedCh:164return rc.closeErr165case connCh := <-rc.connCh:166connCh <- conn167case <-rc.errCh:168if conn != nil {169conn.Close()170}171172time.Sleep(1 * time.Second)173conn = rc.connect(ctx)174if conn != nil && rc.ReconnectionHandler != nil {175go rc.ReconnectionHandler()176}177}178}179}180181func (rc *ReconnectingWebsocket) connect(ctx context.Context) *WebsocketConnection {182exp := &backoff.ExponentialBackOff{183InitialInterval: 2 * time.Second,184RandomizationFactor: 0.5,185Multiplier: 1.5,186MaxInterval: 30 * time.Second,187MaxElapsedTime: 0,188Stop: backoff.Stop,189Clock: backoff.SystemClock,190}191exp.Reset()192for {193// Gorilla websocket does not check if context is valid when dialing so we do it prior194select {195case <-ctx.Done():196rc.log.WithField("url", rc.url).Debug("context done...closing")197rc.Close()198return nil199default:200}201202dialer := websocket.Dialer{HandshakeTimeout: rc.handshakeTimeout}203conn, resp, err := dialer.DialContext(ctx, rc.url, rc.reqHeader)204if err == nil {205rc.log.WithField("url", rc.url).Debug("connection was successfully established")206ws, err := NewWebsocketConnection(context.Background(), conn, func(staleErr error) {207rc.errCh <- staleErr208})209if err == nil {210rc.badHandshakeCount = 0211return ws212}213}214215var statusCode int216if resp != nil {217statusCode = resp.StatusCode218}219220// 200 is bad gateway for ws, we should keep trying221if err == websocket.ErrBadHandshake && statusCode != 200 {222rc.badHandshakeCount++223// if mal-formed handshake request (unauthorized, forbidden) or client actions (redirect) are required then fail immediately224// otherwise try several times and fail, maybe temporarily unavailable, like server restart225if rc.badHandshakeCount > rc.badHandshakeMax || (http.StatusMultipleChoices <= statusCode && statusCode < http.StatusInternalServerError) {226_ = rc.closeWithError(&ErrBadHandshake{rc.url, resp})227return nil228}229}230231delay := exp.NextBackOff()232rc.log.WithError(err).233WithField("url", rc.url).234WithField("badHandshakeCount", fmt.Sprintf("%d/%d", rc.badHandshakeCount, rc.badHandshakeMax)).235WithField("statusCode", statusCode).236WithField("delay", delay.String()).237Error("failed to connect, trying again...")238select {239case <-rc.closedCh:240return nil241case <-time.After(delay):242}243}244}245246247