Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/gitpod-protocol/go/websocket.go
2498 views
1
// Copyright (c) 2021 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
/*---------------------------------------------------------------------------------------------
5
* Copyright (c) 2020 Jaime Pillora <[email protected]>. All rights reserved.
6
* Licensed under the MIT License. See https://github.com/jpillora/chisel/blob/7aa0da95db178b8bc4f20ab49128368348fd4410/LICENSE for license information.
7
*--------------------------------------------------------------------------------------------*/
8
// copied and modified from https://github.com/jpillora/chisel/blob/33fa2010abd42ec76ed9011995f5240642b1a3c5/share/cnet/conn_ws.go
9
package protocol
10
11
import (
12
"context"
13
"sync"
14
"time"
15
16
"github.com/gorilla/websocket"
17
)
18
19
type WebsocketConnection struct {
20
*websocket.Conn
21
buff []byte
22
23
Ctx context.Context
24
cancel func()
25
26
once sync.Once
27
closeErr error
28
waitDone chan struct{}
29
}
30
31
var (
32
// Time allowed to write a message to the peer.
33
writeWait = 10 * time.Second
34
35
// Time allowed to read the next pong message from the peer.
36
pongWait = 15 * time.Second
37
38
// Send pings to peer with this period. Must be less than pongWait.
39
pingPeriod = (pongWait * 9) / 10
40
)
41
42
// NewWebsocketConnection converts a websocket.Conn into a net.Conn
43
func NewWebsocketConnection(ctx context.Context, websocketConn *websocket.Conn, onStale func(staleErr error)) (*WebsocketConnection, error) {
44
ctx, cancel := context.WithCancel(ctx)
45
c := &WebsocketConnection{
46
Conn: websocketConn,
47
waitDone: make(chan struct{}),
48
Ctx: ctx,
49
cancel: cancel,
50
}
51
err := c.SetReadDeadline(time.Now().Add(pongWait))
52
if err != nil {
53
return nil, err
54
}
55
c.SetPongHandler(func(string) error { c.SetReadDeadline(time.Now().Add(pongWait)); return nil })
56
57
go func() {
58
defer c.Close()
59
ticker := time.NewTicker(pingPeriod)
60
defer ticker.Stop()
61
for {
62
select {
63
case <-ctx.Done():
64
return
65
case <-ticker.C:
66
staleErr := c.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(writeWait))
67
if staleErr != nil {
68
onStale(staleErr)
69
return
70
}
71
}
72
}
73
}()
74
return c, nil
75
}
76
77
// Close closes the connection
78
func (c *WebsocketConnection) Close() error {
79
c.once.Do(func() {
80
c.cancel()
81
c.closeErr = c.Conn.Close()
82
close(c.waitDone)
83
})
84
return c.closeErr
85
}
86
87
// Wait waits till the connection is closed.
88
func (c *WebsocketConnection) Wait() error {
89
<-c.waitDone
90
return c.closeErr
91
}
92
93
// Read is not threadsafe though thats okay since there
94
// should never be more than one reader
95
func (c *WebsocketConnection) Read(dst []byte) (int, error) {
96
ldst := len(dst)
97
//use buffer or read new message
98
var src []byte
99
if len(c.buff) > 0 {
100
src = c.buff
101
c.buff = nil
102
} else if _, msg, err := c.Conn.ReadMessage(); err == nil {
103
src = msg
104
} else {
105
return 0, err
106
}
107
//copy src->dest
108
var n int
109
if len(src) > ldst {
110
//copy as much as possible of src into dst
111
n = copy(dst, src[:ldst])
112
//copy remainder into buffer
113
r := src[ldst:]
114
lr := len(r)
115
c.buff = make([]byte, lr)
116
copy(c.buff, r)
117
} else {
118
//copy all of src into dst
119
n = copy(dst, src)
120
}
121
//return bytes copied
122
return n, nil
123
}
124
125
func (c *WebsocketConnection) Write(b []byte) (int, error) {
126
err := c.Conn.WriteMessage(websocket.BinaryMessage, b)
127
if err != nil {
128
return 0, err
129
}
130
n := len(b)
131
return n, nil
132
}
133
134
func (c *WebsocketConnection) SetDeadline(t time.Time) error {
135
if err := c.Conn.SetReadDeadline(t); err != nil {
136
return err
137
}
138
return c.Conn.SetWriteDeadline(t)
139
}
140
141