Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/supervisor/pkg/ports/tunnel.go
2500 views
1
// Copyright (c) 2021 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package ports
6
7
import (
8
"context"
9
"fmt"
10
"io"
11
"net"
12
"sort"
13
"strconv"
14
"sync"
15
16
"golang.org/x/xerrors"
17
18
"github.com/gitpod-io/gitpod/supervisor/api"
19
)
20
21
type PortTunnelDescription struct {
22
LocalPort uint32
23
TargetPort uint32
24
Visibility api.TunnelVisiblity
25
}
26
27
type PortTunnelState struct {
28
Desc PortTunnelDescription
29
Clients map[string]uint32
30
}
31
32
type PortTunnel struct {
33
State PortTunnelState
34
Conns map[string]map[net.Conn]struct{}
35
}
36
37
type TunnelOptions struct {
38
SkipIfExists bool
39
}
40
41
// TunneledPortsInterface observes the tunneled ports.
42
type TunneledPortsInterface interface {
43
// Observe starts observing the tunneled ports until the context is canceled.
44
// The list of tunneled ports is always the complete picture, i.e. if a single port changes,
45
// the whole list is returned.
46
// When the observer stops operating (because the context as canceled or an irrecoverable
47
// error occurred), the observer will close both channels.
48
Observe(ctx context.Context) (<-chan []PortTunnelState, <-chan error)
49
50
// Tunnel notifies clients to install listeners on remote machines.
51
// After that such clients should call EstablishTunnel to forward incoming connections.
52
Tunnel(ctx context.Context, options *TunnelOptions, descs ...*PortTunnelDescription) ([]uint32, error)
53
54
// CloseTunnel closes tunnels.
55
CloseTunnel(ctx context.Context, localPorts ...uint32) ([]uint32, error)
56
57
// EstablishTunnel actually establishes the tunnel for an incoming connection on a remote machine.
58
EstablishTunnel(ctx context.Context, clientID string, localPort uint32, targetPort uint32) (net.Conn, error)
59
}
60
61
// TunneledPortsService observes the tunneled ports.
62
type TunneledPortsService struct {
63
mu *sync.RWMutex
64
cond *sync.Cond
65
tunnels map[uint32]*PortTunnel
66
}
67
68
// NewTunneledPortsService creates a new instance.
69
func NewTunneledPortsService(debugEnable bool) *TunneledPortsService {
70
var mu sync.RWMutex
71
return &TunneledPortsService{
72
mu: &mu,
73
cond: sync.NewCond(&mu),
74
tunnels: make(map[uint32]*PortTunnel),
75
}
76
}
77
78
type tunnelConn struct {
79
net.Conn
80
once sync.Once
81
closeErr error
82
onDidClose func()
83
}
84
85
func (c *tunnelConn) Close() error {
86
c.once.Do(func() {
87
c.closeErr = c.Conn.Close()
88
c.onDidClose()
89
})
90
return c.closeErr
91
}
92
93
// Observe starts observing the tunneled ports until the context is canceled.
94
func (p *TunneledPortsService) Observe(ctx context.Context) (<-chan []PortTunnelState, <-chan error) {
95
var (
96
errchan = make(chan error, 1)
97
reschan = make(chan []PortTunnelState)
98
)
99
100
go func() {
101
defer close(errchan)
102
defer close(reschan)
103
104
p.cond.L.Lock()
105
defer p.cond.L.Unlock()
106
for {
107
var i int
108
res := make([]PortTunnelState, len(p.tunnels))
109
for _, port := range p.tunnels {
110
res[i] = port.State
111
i++
112
}
113
reschan <- res
114
115
p.cond.Wait()
116
if ctx.Err() != nil {
117
return
118
}
119
}
120
}()
121
122
return reschan, errchan
123
}
124
125
func (desc *PortTunnelDescription) validate() (err error) {
126
if desc.LocalPort <= 0 || desc.LocalPort > 0xFFFF {
127
return xerrors.Errorf("bad local port: %d", desc.LocalPort)
128
}
129
if desc.TargetPort > 0xFFFF {
130
return xerrors.Errorf("bad target port: %d", desc.TargetPort)
131
}
132
return nil
133
}
134
135
// Tunnel opens new tunnels.
136
func (p *TunneledPortsService) Tunnel(ctx context.Context, options *TunnelOptions, descs ...*PortTunnelDescription) (tunneled []uint32, err error) {
137
var shouldNotify bool
138
p.cond.L.Lock()
139
defer p.cond.L.Unlock()
140
for _, desc := range descs {
141
descErr := desc.validate()
142
if descErr != nil {
143
if err == nil {
144
err = descErr
145
} else {
146
err = xerrors.Errorf("%s\n%s", err, descErr)
147
}
148
continue
149
}
150
tunnel, tunnelExists := p.tunnels[desc.LocalPort]
151
if !tunnelExists {
152
tunnel = &PortTunnel{
153
State: PortTunnelState{
154
Clients: make(map[string]uint32),
155
},
156
Conns: make(map[string]map[net.Conn]struct{}),
157
}
158
p.tunnels[desc.LocalPort] = tunnel
159
} else if options.SkipIfExists {
160
continue
161
}
162
tunnel.State.Desc = *desc
163
shouldNotify = true
164
tunneled = append(tunneled, desc.LocalPort)
165
}
166
if shouldNotify {
167
p.cond.Broadcast()
168
}
169
return tunneled, err
170
}
171
172
// CloseTunnel closes tunnels.
173
func (p *TunneledPortsService) CloseTunnel(ctx context.Context, localPorts ...uint32) (closedPorts []uint32, err error) {
174
var closed []*PortTunnel
175
p.cond.L.Lock()
176
for _, localPort := range localPorts {
177
tunnel, existsTunnel := p.tunnels[localPort]
178
if !existsTunnel {
179
continue
180
}
181
delete(p.tunnels, localPort)
182
closed = append(closed, tunnel)
183
closedPorts = append(closedPorts, localPort)
184
}
185
if len(closed) > 0 {
186
p.cond.Broadcast()
187
}
188
p.cond.L.Unlock()
189
for _, tunnel := range closed {
190
for _, conns := range tunnel.Conns {
191
for conn := range conns {
192
closeErr := conn.Close()
193
if closeErr == nil {
194
continue
195
}
196
if err == nil {
197
err = closeErr
198
} else {
199
err = xerrors.Errorf("%s\n%s", err, closeErr)
200
}
201
}
202
}
203
}
204
return closedPorts, err
205
}
206
207
// EstablishTunnel actually establishes the tunnel.
208
func (p *TunneledPortsService) EstablishTunnel(ctx context.Context, clientID string, localPort uint32, targetPort uint32) (net.Conn, error) {
209
p.cond.L.Lock()
210
defer p.cond.L.Unlock()
211
212
tunnel, tunnelExists := p.tunnels[localPort]
213
if tunnelExists {
214
expectedTargetPort, clientExists := tunnel.State.Clients[clientID]
215
if clientExists && expectedTargetPort != targetPort {
216
return nil, xerrors.Errorf("client '%s': %d:%d is already tunneling", clientID, localPort, targetPort)
217
}
218
} else {
219
return nil, xerrors.Errorf("client '%s': '%d' tunnel does not exist", clientID, localPort)
220
}
221
222
addr := net.JoinHostPort("localhost", strconv.FormatInt(int64(localPort), 10))
223
conn, err := net.Dial("tcp", addr)
224
if err != nil {
225
return nil, err
226
}
227
var result net.Conn
228
result = &tunnelConn{
229
Conn: conn,
230
onDidClose: func() {
231
p.cond.L.Lock()
232
defer p.cond.L.Unlock()
233
_, existsTunnel := p.tunnels[localPort]
234
if !existsTunnel {
235
return
236
}
237
delete(tunnel.Conns[clientID], result)
238
if len(tunnel.Conns[clientID]) == 0 {
239
delete(tunnel.State.Clients, clientID)
240
}
241
p.cond.Broadcast()
242
},
243
}
244
if tunnel.Conns[clientID] == nil {
245
tunnel.Conns[clientID] = make(map[net.Conn]struct{})
246
}
247
tunnel.Conns[clientID][result] = struct{}{}
248
tunnel.State.Clients[clientID] = targetPort
249
p.cond.Broadcast()
250
return result, nil
251
}
252
253
// Snapshot writes a snapshot to w.
254
func (p *TunneledPortsService) Snapshot(w io.Writer) {
255
p.mu.RLock()
256
defer p.mu.RUnlock()
257
258
localPorts := make([]uint32, 0, len(p.tunnels))
259
for k := range p.tunnels {
260
localPorts = append(localPorts, k)
261
}
262
sort.Slice(localPorts, func(i, j int) bool { return localPorts[i] < localPorts[j] })
263
264
for _, localPort := range localPorts {
265
tunnel := p.tunnels[localPort]
266
fmt.Fprintf(w, "Local Port: %d\n", tunnel.State.Desc.LocalPort)
267
fmt.Fprintf(w, "Target Port: %d\n", tunnel.State.Desc.TargetPort)
268
visibilty := api.TunnelVisiblity_name[int32(tunnel.State.Desc.Visibility)]
269
fmt.Fprintf(w, "Visibility: %s\n", visibilty)
270
for clientID, remotePort := range tunnel.State.Clients {
271
fmt.Fprintf(w, "Client: %s\n", clientID)
272
fmt.Fprintf(w, " Remote Port: %d\n", remotePort)
273
fmt.Fprintf(w, " Tunnel Count: %d\n", len(tunnel.Conns[clientID]))
274
}
275
fmt.Fprintf(w, "\n")
276
}
277
}
278
279