Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/local-app/pkg/bastion/bastion.go
2500 views
1
// Copyright (c) 2021 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package bastion
6
7
import (
8
"context"
9
"crypto/rand"
10
"crypto/rsa"
11
"crypto/x509"
12
"encoding/pem"
13
"errors"
14
"fmt"
15
"io"
16
"io/ioutil"
17
"net"
18
"net/http"
19
"os"
20
"path/filepath"
21
"strconv"
22
"strings"
23
"sync"
24
"time"
25
26
"github.com/google/uuid"
27
"github.com/kevinburke/ssh_config"
28
"github.com/sirupsen/logrus"
29
"golang.org/x/crypto/ssh"
30
"golang.org/x/xerrors"
31
"google.golang.org/grpc"
32
"google.golang.org/grpc/credentials/insecure"
33
"google.golang.org/protobuf/proto"
34
35
gitpod "github.com/gitpod-io/gitpod/gitpod-protocol"
36
app "github.com/gitpod-io/gitpod/local-app/api"
37
supervisor "github.com/gitpod-io/gitpod/supervisor/api"
38
)
39
40
var (
41
// ErrClosed when the port management is stopped
42
ErrClosed = errors.New("closed")
43
// ErrTooManySubscriptions when max allowed subscriptions exceed
44
ErrTooManySubscriptions = errors.New("too many subscriptions")
45
)
46
47
// StatusSubscription is a StatusSubscription to status updates
48
type StatusSubscription struct {
49
instanceID string
50
updates chan []*app.TunnelStatus
51
Close func() error
52
}
53
54
func (s *StatusSubscription) Updates() <-chan []*app.TunnelStatus {
55
return s.updates
56
}
57
58
type TunnelClient struct {
59
ID string // we cannot use conn session ID, since proto fails to serialize it
60
Conn ssh.Conn
61
}
62
63
type TunnelListener struct {
64
RemotePort uint32
65
LocalAddr string
66
LocalPort uint32
67
Visibility supervisor.TunnelVisiblity
68
Ctx context.Context
69
Cancel func()
70
}
71
72
type Workspace struct {
73
InstanceID string
74
WorkspaceID string
75
Phase string
76
OwnerToken string
77
URL string
78
79
supervisorListener *TunnelListener
80
supervisorClient *grpc.ClientConn
81
82
tunnelMu sync.RWMutex
83
tunnelListeners map[uint32]*TunnelListener
84
tunnelEnabled bool
85
cancelTunnel context.CancelFunc
86
87
localSSHListener *TunnelListener
88
SSHPrivateFN string
89
SSHPublicKey string
90
91
ctx context.Context
92
cancel context.CancelFunc
93
94
tunnelClient chan chan *TunnelClient
95
tunnelClientConnected bool
96
}
97
98
func (ws *Workspace) Status() []*app.TunnelStatus {
99
ws.tunnelMu.RLock()
100
defer ws.tunnelMu.RUnlock()
101
res := make([]*app.TunnelStatus, 0, len(ws.tunnelListeners))
102
for _, listener := range ws.tunnelListeners {
103
res = append(res, &app.TunnelStatus{
104
RemotePort: listener.RemotePort,
105
LocalPort: listener.LocalPort,
106
Visibility: listener.Visibility,
107
})
108
}
109
return res
110
}
111
112
type Callbacks interface {
113
InstanceUpdate(*Workspace)
114
}
115
116
type CompositeCallbacks []Callbacks
117
118
func (cb CompositeCallbacks) InstanceUpdate(w *Workspace) {
119
for _, c := range cb {
120
c.InstanceUpdate(w)
121
}
122
}
123
124
type SSHConfigWritingCallback struct {
125
Path string
126
127
workspaces map[string]*Workspace
128
}
129
130
func (s *SSHConfigWritingCallback) InstanceUpdate(w *Workspace) {
131
if s.workspaces == nil {
132
s.workspaces = make(map[string]*Workspace)
133
}
134
if w.localSSHListener == nil || w.Phase == "stopping" {
135
delete(s.workspaces, w.WorkspaceID)
136
} else if _, exists := s.workspaces[w.WorkspaceID]; !exists {
137
s.workspaces[w.WorkspaceID] = w
138
}
139
140
var cfg ssh_config.Config
141
for _, ws := range s.workspaces {
142
p, err := ssh_config.NewPattern(ws.WorkspaceID)
143
if err != nil {
144
logrus.WithError(err).Warn("cannot produce ssh_config entry")
145
continue
146
}
147
148
host, port, _ := net.SplitHostPort(ws.localSSHListener.LocalAddr)
149
cfg.Hosts = append(cfg.Hosts, &ssh_config.Host{
150
Patterns: []*ssh_config.Pattern{p},
151
Nodes: []ssh_config.Node{
152
&ssh_config.KV{Key: "HostName", Value: host},
153
&ssh_config.KV{Key: "User", Value: "gitpod"},
154
&ssh_config.KV{Key: "Port", Value: port},
155
&ssh_config.KV{Key: "IdentityFile", Value: ws.SSHPrivateFN},
156
&ssh_config.KV{Key: "IdentitiesOnly", Value: "yes"},
157
},
158
})
159
}
160
161
err := ioutil.WriteFile(s.Path, []byte(cfg.String()), 0644)
162
if err != nil {
163
logrus.WithError(err).WithField("path", s.Path).Error("cannot write ssh config file")
164
return
165
}
166
}
167
168
func New(client gitpod.APIInterface, localAppTimeout time.Duration, cb Callbacks) *Bastion {
169
ctx, cancel := context.WithCancel(context.Background())
170
return &Bastion{
171
Client: client,
172
Callbacks: cb,
173
workspaces: make(map[string]*Workspace),
174
localAppTimeout: localAppTimeout,
175
workspaceMapChangeChan: make(chan int),
176
ctx: ctx,
177
stop: cancel,
178
updates: make(chan *WorkspaceUpdateRequest, 10),
179
subscriptions: make(map[*StatusSubscription]struct{}, 10),
180
}
181
}
182
183
type WorkspaceUpdateRequest struct {
184
instance *gitpod.WorkspaceInstance
185
done chan *Workspace
186
}
187
188
type Bastion struct {
189
id string
190
updates chan *WorkspaceUpdateRequest
191
192
Client gitpod.APIInterface
193
Callbacks Callbacks
194
195
workspacesMu sync.RWMutex
196
workspaces map[string]*Workspace
197
198
localAppTimeout time.Duration
199
workspaceMapChangeChan chan int
200
201
ctx context.Context
202
stop context.CancelFunc
203
204
subscriptionsMu sync.RWMutex
205
subscriptions map[*StatusSubscription]struct{}
206
207
EnableAutoTunnel bool
208
}
209
210
func (b *Bastion) Run() error {
211
updates, err := b.Client.WorkspaceUpdates(b.ctx, "")
212
if err != nil {
213
return err
214
}
215
216
defer func() {
217
// We copy the subscriptions to a list prior to closing them, to prevent a data race
218
// between the map iteration and entry removal when closing the subscription.
219
b.subscriptionsMu.Lock()
220
subs := make([]*StatusSubscription, 0, len(b.subscriptions))
221
for s := range b.subscriptions {
222
subs = append(subs, s)
223
}
224
b.subscriptionsMu.Unlock()
225
226
for _, s := range subs {
227
s.Close()
228
}
229
}()
230
231
go b.handleTimeout()
232
if b.localAppTimeout != 0 {
233
b.workspaceMapChangeChan <- 0
234
}
235
236
go func() {
237
for u := range b.updates {
238
b.handleUpdate(u)
239
}
240
}()
241
b.FullUpdate()
242
243
for u := range updates {
244
b.updates <- &WorkspaceUpdateRequest{
245
instance: u,
246
}
247
}
248
return nil
249
}
250
251
func (b *Bastion) FullUpdate() {
252
wss, err := b.Client.GetWorkspaces(b.ctx, &gitpod.GetWorkspacesOptions{Limit: float64(100)})
253
if err != nil {
254
logrus.WithError(err).Warn("cannot get workspaces")
255
} else {
256
for _, ws := range wss {
257
if ws.LatestInstance == nil {
258
continue
259
}
260
b.updates <- &WorkspaceUpdateRequest{
261
instance: ws.LatestInstance,
262
}
263
}
264
}
265
}
266
267
func (b *Bastion) Update(workspaceID string) *Workspace {
268
ws, err := b.Client.GetWorkspace(b.ctx, workspaceID)
269
if err != nil {
270
logrus.WithError(err).WithField("WorkspaceID", workspaceID).Error("cannot get workspace")
271
return nil
272
}
273
if ws.LatestInstance == nil {
274
return nil
275
}
276
done := make(chan *Workspace)
277
b.updates <- &WorkspaceUpdateRequest{
278
instance: ws.LatestInstance,
279
done: done,
280
}
281
return <-done
282
}
283
284
func (b *Bastion) handleTimeout() {
285
if b.localAppTimeout == 0 {
286
return
287
}
288
var timer *time.Timer
289
for count := range b.workspaceMapChangeChan {
290
if count == 0 && timer == nil {
291
logrus.Debugf("local app will terminate in %v", b.localAppTimeout)
292
timer = time.AfterFunc(b.localAppTimeout, func() {
293
os.Exit(0)
294
})
295
} else if count != 0 && timer != nil {
296
logrus.Debugln("reset local app terminate timeout")
297
timer.Stop()
298
timer = nil
299
}
300
}
301
}
302
303
func (b *Bastion) handleUpdate(ur *WorkspaceUpdateRequest) {
304
var ws *Workspace
305
u := ur.instance
306
defer func() {
307
if ur.done != nil {
308
ur.done <- ws
309
close(ur.done)
310
}
311
}()
312
313
b.workspacesMu.Lock()
314
defer b.workspacesMu.Unlock()
315
316
ws, ok := b.workspaces[u.ID]
317
if !ok {
318
if u.Status.Phase == "stopping" || u.Status.Phase == "stopped" {
319
return
320
}
321
ctx, cancel := context.WithCancel(b.ctx)
322
ws = &Workspace{
323
InstanceID: u.ID,
324
WorkspaceID: u.WorkspaceID,
325
326
ctx: ctx,
327
cancel: cancel,
328
329
tunnelClient: make(chan chan *TunnelClient, 1),
330
tunnelListeners: make(map[uint32]*TunnelListener),
331
tunnelEnabled: true,
332
}
333
}
334
ws.Phase = u.Status.Phase
335
ws.URL = u.IdeURL
336
ws.OwnerToken = u.Status.OwnerToken
337
if ws.OwnerToken == "" && ws.Phase == "running" {
338
// updates don't carry the owner token
339
go b.FullUpdate()
340
}
341
342
switch ws.Phase {
343
case "running":
344
if ws.OwnerToken != "" && !ws.tunnelClientConnected {
345
err := b.connectTunnelClient(ws.ctx, ws)
346
if err != nil {
347
logrus.WithError(err).WithField("workspace", ws.WorkspaceID).Error("tunnel client failed to connect")
348
}
349
}
350
if ws.supervisorListener == nil && ws.tunnelClientConnected {
351
var err error
352
ws.supervisorListener, err = b.establishTunnel(ws.ctx, ws, "supervisor", 22999, 0, supervisor.TunnelVisiblity_host)
353
if err != nil {
354
logrus.WithError(err).WithField("workspace", ws.WorkspaceID).Error("cannot establish supervisor tunnel")
355
}
356
}
357
358
if ws.supervisorClient == nil && ws.supervisorListener != nil {
359
var err error
360
ws.supervisorClient, err = grpc.Dial(ws.supervisorListener.LocalAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
361
if err != nil {
362
logrus.WithError(err).WithField("workspace", ws.WorkspaceID).Error("error connecting to supervisor")
363
} else {
364
go func() {
365
<-ws.ctx.Done()
366
ws.supervisorClient.Close()
367
}()
368
}
369
}
370
371
if ws.supervisorClient != nil && b.EnableAutoTunnel {
372
go b.tunnelPorts(ws)
373
}
374
375
if ws.localSSHListener == nil && ws.supervisorClient != nil {
376
func() {
377
var err error
378
ws.SSHPrivateFN, ws.SSHPublicKey, err = generateSSHKeys(ws.InstanceID)
379
if err != nil {
380
logrus.WithError(err).WithField("workspaceInstanceID", ws.InstanceID).Error("cannot produce SSH keypair")
381
return
382
}
383
384
ws.localSSHListener, err = b.establishSSHTunnel(ws)
385
if err != nil {
386
logrus.WithError(err).Error("cannot establish SSH tunnel")
387
}
388
}()
389
}
390
391
case "stopping", "stopped":
392
ws.cancel()
393
delete(b.workspaces, u.ID)
394
b.Callbacks.InstanceUpdate(ws)
395
if b.localAppTimeout != 0 {
396
b.workspaceMapChangeChan <- len(b.workspaces)
397
}
398
return
399
}
400
401
b.workspaces[u.ID] = ws
402
b.Callbacks.InstanceUpdate(ws)
403
if b.localAppTimeout != 0 {
404
b.workspaceMapChangeChan <- len(b.workspaces)
405
}
406
}
407
408
func generateSSHKeys(instanceID string) (privateKeyFN string, publicKey string, err error) {
409
privateKeyFN = filepath.Join(os.TempDir(), fmt.Sprintf("gitpod_%s_id_rsa", instanceID))
410
useRrandomFile := func() {
411
var tmpf *os.File
412
tmpf, err = ioutil.TempFile("", "gitpod_*_id_rsa")
413
if err != nil {
414
return
415
}
416
tmpf.Close()
417
privateKeyFN = tmpf.Name()
418
}
419
if stat, serr := os.Stat(privateKeyFN); serr == nil && stat.IsDir() {
420
useRrandomFile()
421
} else if serr == nil {
422
var publicKeyRaw []byte
423
publicKeyRaw, err = ioutil.ReadFile(privateKeyFN + ".pub")
424
publicKey = string(publicKeyRaw)
425
if err == nil {
426
// we've loaded a pre-existing key - all is well
427
return
428
}
429
430
logrus.WithError(err).WithField("instance", instanceID).WithField("privateKeyFN", privateKeyFN).Warn("cannot load public SSH key for this workspace")
431
useRrandomFile()
432
}
433
434
privateKey, err := rsa.GenerateKey(rand.Reader, 2048)
435
if err != nil {
436
return
437
}
438
err = privateKey.Validate()
439
if err != nil {
440
return
441
}
442
443
privDER := x509.MarshalPKCS1PrivateKey(privateKey)
444
privBlock := pem.Block{
445
Type: "RSA PRIVATE KEY",
446
Headers: nil,
447
Bytes: privDER,
448
}
449
privatePEM := pem.EncodeToMemory(&privBlock)
450
err = ioutil.WriteFile(privateKeyFN, privatePEM, 0600)
451
if err != nil {
452
return
453
}
454
455
publicRsaKey, err := ssh.NewPublicKey(&privateKey.PublicKey)
456
if err != nil {
457
return
458
}
459
publicKey = string(ssh.MarshalAuthorizedKey(publicRsaKey))
460
_ = ioutil.WriteFile(privateKeyFN+".pub", []byte(publicKey), 0644)
461
462
return
463
}
464
465
func (b *Bastion) connectTunnelClient(ctx context.Context, ws *Workspace) error {
466
if ws.URL == "" {
467
return xerrors.Errorf("IDE URL is empty")
468
}
469
if ws.OwnerToken == "" {
470
return xerrors.Errorf("owner token is empty")
471
}
472
if ws.tunnelClientConnected {
473
return xerrors.Errorf("tunnel: ssh client is already connected")
474
}
475
ws.tunnelClientConnected = true
476
477
tunnelURL := ws.URL
478
tunnelURL = strings.ReplaceAll(tunnelURL, "https://", "wss://")
479
tunnelURL = strings.ReplaceAll(tunnelURL, "http://", "ws://")
480
tunnelURL += "/_supervisor/tunnel"
481
h := make(http.Header)
482
h.Set("x-gitpod-owner-token", ws.OwnerToken)
483
webSocket := gitpod.NewReconnectingWebsocket(tunnelURL, h, logrus.WithField("workspace", ws.WorkspaceID))
484
go webSocket.Dial(ctx)
485
go func() {
486
var (
487
client *TunnelClient
488
err error
489
)
490
defer func() {
491
ws.tunnelClientConnected = false
492
webSocket.Close()
493
if err != nil {
494
logrus.WithField("workspace", ws.WorkspaceID).WithError(err).Error("tunnel: failed to connect ssh client")
495
}
496
if client != nil {
497
logrus.WithField("workspace", ws.WorkspaceID).WithField("id", client.ID).Warn("tunnel: ssh client is permanently closed")
498
}
499
}()
500
client, closed, err := newTunnelClient(ctx, ws, webSocket)
501
for {
502
if err != nil {
503
return
504
}
505
select {
506
case <-ctx.Done():
507
return
508
case clientCh := <-ws.tunnelClient:
509
clientCh <- client
510
case <-closed:
511
client, closed, err = newTunnelClient(ctx, ws, webSocket)
512
}
513
}
514
}()
515
return nil
516
}
517
518
func newTunnelClient(ctx context.Context, ws *Workspace, reconnecting *gitpod.ReconnectingWebsocket) (client *TunnelClient, closed chan struct{}, err error) {
519
logrus.WithField("workspace", ws.WorkspaceID).Info("tunnel: trying to connect ssh client...")
520
err = reconnecting.EnsureConnection(func(conn *gitpod.WebsocketConnection) (bool, error) {
521
id, err := uuid.NewRandom()
522
if err != nil {
523
return false, err
524
}
525
526
sshConn, chans, reqs, err := ssh.NewClientConn(conn, "", &ssh.ClientConfig{
527
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
528
})
529
if err != nil {
530
logrus.WithError(err).WithField("workspace", ws.WorkspaceID).Warn("tunnel: failed to connect ssh client, trying again...")
531
return true, err
532
}
533
logrus.WithField("workspace", ws.WorkspaceID).WithField("id", id).Info("tunnel: ssh client connected")
534
go func() {
535
conn.Wait()
536
sshConn.Close()
537
}()
538
go ssh.DiscardRequests(reqs)
539
go func() {
540
for newCh := range chans {
541
// TODO(ak) reverse tunneling
542
newCh.Reject(ssh.UnknownChannelType, "tunnel: reverse is not supported yet")
543
}
544
}()
545
closed = make(chan struct{}, 1)
546
go func() {
547
err := sshConn.Wait()
548
logrus.WithError(err).WithField("workspace", ws.WorkspaceID).WithField("id", id).Warn("tunnel: ssh client closed")
549
close(closed)
550
}()
551
client = &TunnelClient{
552
ID: id.String(),
553
Conn: sshConn,
554
}
555
return false, nil
556
})
557
return client, closed, err
558
}
559
560
func (b *Bastion) establishTunnel(ctx context.Context, ws *Workspace, logprefix string, remotePort int, targetPort int, visibility supervisor.TunnelVisiblity) (*TunnelListener, error) {
561
if !ws.tunnelClientConnected {
562
return nil, xerrors.Errorf("tunnel client is not connected")
563
}
564
if visibility == supervisor.TunnelVisiblity_none {
565
return nil, xerrors.Errorf("tunnel visibility is none")
566
}
567
568
targetHost := "127.0.0.1"
569
if visibility == supervisor.TunnelVisiblity_network {
570
targetHost = "0.0.0.0"
571
}
572
573
netListener, err := net.Listen("tcp", targetHost+":"+strconv.Itoa(targetPort))
574
var localPort int
575
if err == nil {
576
localPort = netListener.(*net.TCPListener).Addr().(*net.TCPAddr).Port
577
} else {
578
netListener, err = net.Listen("tcp", targetHost+":0")
579
if err != nil {
580
return nil, err
581
}
582
localPort = netListener.(*net.TCPListener).Addr().(*net.TCPAddr).Port
583
}
584
logrus.WithField("workspace", ws.WorkspaceID).Info(logprefix + ": listening on " + netListener.Addr().String() + "...")
585
listenerCtx, cancel := context.WithCancel(ctx)
586
go func() {
587
<-listenerCtx.Done()
588
netListener.Close()
589
logrus.WithField("workspace", ws.WorkspaceID).Info(logprefix + ": closed")
590
}()
591
go func() {
592
for {
593
conn, err := netListener.Accept()
594
if listenerCtx.Err() != nil {
595
return
596
}
597
if err != nil {
598
logrus.WithError(err).WithField("workspace", ws.WorkspaceID).Warn(logprefix + ": failed to accept connection")
599
continue
600
}
601
logrus.WithField("workspace", ws.WorkspaceID).Debug(logprefix + ": accepted new connection")
602
go func() {
603
defer logrus.WithField("workspace", ws.WorkspaceID).Debug(logprefix + ": connection closed")
604
defer conn.Close()
605
606
clientCh := make(chan *TunnelClient, 1)
607
select {
608
case <-listenerCtx.Done():
609
return
610
case ws.tunnelClient <- clientCh:
611
}
612
client := <-clientCh
613
614
payload, err := proto.Marshal(&supervisor.TunnelPortRequest{
615
ClientId: client.ID,
616
Port: uint32(remotePort),
617
TargetPort: uint32(localPort),
618
})
619
if err != nil {
620
logrus.WithError(err).WithField("workspace", ws.WorkspaceID).WithField("id", client.ID).Error(logprefix + ": failed to marshal tunnel payload")
621
return
622
}
623
sshChan, reqs, err := client.Conn.OpenChannel("tunnel", payload)
624
if err != nil {
625
logrus.WithError(err).WithField("workspace", ws.WorkspaceID).WithField("id", client.ID).Warn(logprefix + ": failed to establish tunnel")
626
return
627
}
628
defer sshChan.Close()
629
go ssh.DiscardRequests(reqs)
630
631
ctx, cancel := context.WithCancel(listenerCtx)
632
go func() {
633
_, _ = io.Copy(sshChan, conn)
634
cancel()
635
}()
636
go func() {
637
_, _ = io.Copy(conn, sshChan)
638
cancel()
639
}()
640
<-ctx.Done()
641
}()
642
}
643
}()
644
return &TunnelListener{
645
RemotePort: uint32(remotePort),
646
LocalAddr: netListener.Addr().String(),
647
LocalPort: uint32(localPort),
648
Visibility: visibility,
649
Ctx: listenerCtx,
650
Cancel: cancel,
651
}, nil
652
}
653
654
func (b *Bastion) establishSSHTunnel(ws *Workspace) (listener *TunnelListener, err error) {
655
if ws.SSHPublicKey == "" {
656
return nil, xerrors.Errorf("no public key generated")
657
}
658
659
err = installSSHAuthorizedKey(ws, ws.SSHPublicKey)
660
if err != nil {
661
return nil, xerrors.Errorf("cannot install authorized key: %w", err)
662
}
663
listener, err = b.establishTunnel(ws.ctx, ws, "ssh", 23001, 0, supervisor.TunnelVisiblity_host)
664
return listener, err
665
}
666
667
func installSSHAuthorizedKey(ws *Workspace, key string) error {
668
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
669
defer cancel()
670
term := supervisor.NewTerminalServiceClient(ws.supervisorClient)
671
tres, err := term.Open(ctx, &supervisor.OpenTerminalRequest{Workdir: "/", Shell: "/bin/sh"})
672
if err != nil {
673
return err
674
}
675
//nolint:errcheck
676
defer term.Shutdown(ctx, &supervisor.ShutdownTerminalRequest{Alias: tres.Terminal.Alias})
677
678
done := make(chan bool, 1)
679
recv, err := term.Listen(ctx, &supervisor.ListenTerminalRequest{Alias: tres.Terminal.Alias})
680
if err != nil {
681
return err
682
}
683
684
go func() {
685
defer close(done)
686
for {
687
resp, err := recv.Recv()
688
if err != nil {
689
return
690
}
691
if resp.Output == nil {
692
continue
693
}
694
out, ok := resp.Output.(*supervisor.ListenTerminalResponse_Data)
695
if !ok {
696
continue
697
}
698
c := strings.TrimSpace(string(out.Data))
699
if strings.HasPrefix(c, "write done") {
700
done <- true
701
return
702
}
703
}
704
}()
705
_, err = term.Write(ctx, &supervisor.WriteTerminalRequest{
706
Alias: tres.Terminal.Alias,
707
Stdin: []byte(fmt.Sprintf("mkdir -p ~/.ssh; echo %s >> ~/.ssh/authorized_keys; echo write done\r\n", strings.TrimSpace(key))),
708
})
709
if err != nil {
710
return err
711
}
712
713
// give the command some time to execute
714
select {
715
case <-ctx.Done():
716
return ctx.Err()
717
case success := <-done:
718
if !success {
719
return xerrors.Errorf("unable to upload SSH key")
720
}
721
}
722
723
return nil
724
}
725
726
func (b *Bastion) tunnelPorts(ws *Workspace) {
727
ws.tunnelMu.Lock()
728
if !ws.tunnelEnabled || ws.cancelTunnel != nil {
729
ws.tunnelMu.Unlock()
730
return
731
}
732
ctx, cancel := context.WithCancel(ws.ctx)
733
ws.cancelTunnel = cancel
734
ws.tunnelMu.Unlock()
735
736
defer func() {
737
ws.tunnelMu.Lock()
738
defer ws.tunnelMu.Unlock()
739
740
ws.cancelTunnel = nil
741
logrus.WithField("workspace", ws.WorkspaceID).Info("ports tunneling finished")
742
}()
743
744
for {
745
logrus.WithField("workspace", ws.WorkspaceID).Info("tunneling ports...")
746
747
err := b.doTunnelPorts(ctx, ws)
748
if ws.ctx.Err() != nil {
749
return
750
}
751
if err != nil {
752
logrus.WithError(err).WithField("workspace", ws.WorkspaceID).Warn("ports tunneling failed, retrying...")
753
}
754
select {
755
case <-ctx.Done():
756
return
757
case <-time.After(1 * time.Second):
758
}
759
}
760
}
761
762
func (b *Bastion) doTunnelPorts(ctx context.Context, ws *Workspace) error {
763
ctx, cancel := context.WithCancel(ctx)
764
defer cancel()
765
766
statusService := supervisor.NewStatusServiceClient(ws.supervisorClient)
767
status, err := statusService.PortsStatus(ctx, &supervisor.PortsStatusRequest{
768
Observe: true,
769
})
770
if err != nil {
771
return err
772
}
773
defer b.notify(ws)
774
defer func() {
775
ws.tunnelMu.Lock()
776
defer ws.tunnelMu.Unlock()
777
for port, t := range ws.tunnelListeners {
778
delete(ws.tunnelListeners, port)
779
t.Cancel()
780
}
781
}()
782
for {
783
resp, err := status.Recv()
784
if err != nil {
785
return err
786
}
787
ws.tunnelMu.Lock()
788
currentTunneled := make(map[uint32]struct{})
789
for _, port := range resp.Ports {
790
visibility := supervisor.TunnelVisiblity_none
791
if port.Tunneled != nil {
792
visibility = port.Tunneled.Visibility
793
}
794
listener, alreadyTunneled := ws.tunnelListeners[port.LocalPort]
795
if alreadyTunneled && listener.Visibility != visibility {
796
listener.Cancel()
797
delete(ws.tunnelListeners, port.LocalPort)
798
}
799
if visibility == supervisor.TunnelVisiblity_none {
800
continue
801
}
802
currentTunneled[port.LocalPort] = struct{}{}
803
_, alreadyTunneled = ws.tunnelListeners[port.LocalPort]
804
if alreadyTunneled {
805
continue
806
}
807
_, alreadyTunneled = port.Tunneled.Clients[b.id]
808
if alreadyTunneled {
809
continue
810
}
811
812
logprefix := "tunnel[" + supervisor.TunnelVisiblity_name[int32(port.Tunneled.Visibility)] + ":" + strconv.Itoa(int(port.LocalPort)) + "]"
813
listener, err := b.establishTunnel(ws.ctx, ws, logprefix, int(port.LocalPort), int(port.Tunneled.TargetPort), port.Tunneled.Visibility)
814
if err != nil {
815
logrus.WithError(err).WithField("workspace", ws.WorkspaceID).WithField("port", port.LocalPort).Error("cannot establish port tunnel")
816
} else {
817
ws.tunnelListeners[port.LocalPort] = listener
818
}
819
}
820
for port, listener := range ws.tunnelListeners {
821
_, exists := currentTunneled[port]
822
if !exists {
823
delete(ws.tunnelListeners, port)
824
listener.Cancel()
825
}
826
}
827
ws.tunnelMu.Unlock()
828
b.notify(ws)
829
}
830
}
831
832
func (b *Bastion) notify(ws *Workspace) {
833
b.subscriptionsMu.RLock()
834
defer b.subscriptionsMu.RUnlock()
835
var subs []*StatusSubscription
836
for sub := range b.subscriptions {
837
if sub.instanceID == ws.InstanceID {
838
subs = append(subs, sub)
839
}
840
}
841
if len(subs) <= 0 {
842
return
843
}
844
status := ws.Status()
845
for _, sub := range subs {
846
select {
847
case sub.updates <- status:
848
case <-time.After(5 * time.Second):
849
logrus.Error("ports subscription dropped out")
850
sub.Close()
851
}
852
}
853
}
854
855
func (b *Bastion) Status(instanceID string) []*app.TunnelStatus {
856
ws, ok := b.getWorkspace(instanceID)
857
if !ok {
858
return nil
859
}
860
return ws.Status()
861
}
862
863
func (b *Bastion) getWorkspace(instanceID string) (*Workspace, bool) {
864
b.workspacesMu.RLock()
865
defer b.workspacesMu.RUnlock()
866
ws, ok := b.workspaces[instanceID]
867
return ws, ok
868
}
869
870
const maxStatusSubscriptions = 10
871
872
func (b *Bastion) Subscribe(instanceID string) (*StatusSubscription, error) {
873
b.subscriptionsMu.Lock()
874
defer b.subscriptionsMu.Unlock()
875
876
if b.ctx.Err() != nil {
877
return nil, ErrClosed
878
}
879
880
if len(b.subscriptions) > maxStatusSubscriptions {
881
return nil, ErrTooManySubscriptions
882
}
883
884
sub := &StatusSubscription{updates: make(chan []*app.TunnelStatus, 5), instanceID: instanceID}
885
var once sync.Once
886
sub.Close = func() error {
887
b.subscriptionsMu.Lock()
888
defer b.subscriptionsMu.Unlock()
889
890
once.Do(func() {
891
close(sub.updates)
892
})
893
delete(b.subscriptions, sub)
894
895
return nil
896
}
897
b.subscriptions[sub] = struct{}{}
898
899
// makes sure that no updates can happen between clients receiving an initial status and subscribing
900
sub.updates <- b.Status(instanceID)
901
return sub, nil
902
}
903
904
func (b *Bastion) AutoTunnel(instanceID string, enabled bool) {
905
ws, ok := b.getWorkspace(instanceID)
906
if !ok {
907
return
908
}
909
ws.tunnelMu.Lock()
910
defer ws.tunnelMu.Unlock()
911
if ws.tunnelEnabled == enabled {
912
return
913
}
914
ws.tunnelEnabled = enabled
915
if enabled {
916
if ws.cancelTunnel == nil && b.EnableAutoTunnel {
917
b.Update(ws.WorkspaceID)
918
}
919
} else if ws.cancelTunnel != nil {
920
ws.cancelTunnel()
921
}
922
}
923
924