Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/supervisor/pkg/ports/ports.go
2500 views
1
// Copyright (c) 2020 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package ports
6
7
import (
8
"context"
9
"errors"
10
"fmt"
11
"io"
12
"net"
13
"reflect"
14
"sort"
15
"strings"
16
"sync"
17
"time"
18
19
"golang.org/x/net/nettest"
20
"golang.org/x/xerrors"
21
22
"github.com/gitpod-io/gitpod/common-go/log"
23
gitpod "github.com/gitpod-io/gitpod/gitpod-protocol"
24
"github.com/gitpod-io/gitpod/supervisor/api"
25
"inet.af/tcpproxy"
26
)
27
28
var workspaceIPAdress string
29
30
func init() {
31
_, workspaceIPAdress = defaultRoutableIP()
32
}
33
34
// NewManager creates a new port manager
35
func NewManager(exposed ExposedPortsInterface, served ServedPortsObserver, config ConfigInterace, tunneled TunneledPortsInterface, internalPorts ...uint32) *Manager {
36
state := make(map[uint32]*managedPort)
37
internal := make(map[uint32]struct{})
38
for _, p := range internalPorts {
39
internal[p] = struct{}{}
40
}
41
42
return &Manager{
43
E: exposed,
44
S: served,
45
C: config,
46
T: tunneled,
47
48
forceUpdates: make(chan struct{}, 1),
49
50
internal: internal,
51
proxies: make(map[uint32]*localhostProxy),
52
autoExposed: make(map[uint32]*autoExposure),
53
autoTunneled: make(map[uint32]struct{}),
54
55
state: state,
56
subscriptions: make(map[*Subscription]struct{}),
57
proxyStarter: startLocalhostProxy,
58
59
autoTunnelEnabled: true,
60
}
61
}
62
63
type localhostProxy struct {
64
io.Closer
65
proxyPort uint32
66
}
67
68
type autoExposure struct {
69
state api.PortAutoExposure
70
ctx context.Context
71
public bool
72
protocol string
73
}
74
75
// Manager brings together served and exposed ports. It keeps track of which port is exposed, which one is served,
76
// auto-exposes ports and proxies ports served on localhost only.
77
type Manager struct {
78
E ExposedPortsInterface
79
S ServedPortsObserver
80
C ConfigInterace
81
T TunneledPortsInterface
82
83
forceUpdates chan struct{}
84
85
internal map[uint32]struct{}
86
proxies map[uint32]*localhostProxy
87
proxyStarter func(port uint32) (proxy io.Closer, err error)
88
autoExposed map[uint32]*autoExposure
89
90
autoTunneled map[uint32]struct{}
91
autoTunnelEnabled bool
92
93
configs *Configs
94
exposed []ExposedPort
95
served []ServedPort
96
tunneled []PortTunnelState
97
98
state map[uint32]*managedPort
99
mu sync.RWMutex
100
101
subscriptions map[*Subscription]struct{}
102
closed bool
103
}
104
105
type managedPort struct {
106
Served bool
107
Exposed bool
108
Visibility api.PortVisibility
109
Protocol api.PortProtocol
110
Description string
111
Name string
112
URL string
113
OnExposed api.OnPortExposedAction // deprecated
114
OnOpen api.PortsStatus_OnOpenAction
115
AutoExposure api.PortAutoExposure
116
117
LocalhostPort uint32
118
119
Tunneled bool
120
TunneledTargetPort uint32
121
TunneledVisibility api.TunnelVisiblity
122
TunneledClients map[string]uint32
123
}
124
125
// Subscription is a Subscription to status updates
126
type Subscription struct {
127
updates chan []*api.PortsStatus
128
Close func(lock bool) error
129
}
130
131
// Updates returns the updates channel
132
func (s *Subscription) Updates() <-chan []*api.PortsStatus {
133
return s.updates
134
}
135
136
// Run starts the port manager which keeps running until one of its observers stops.
137
func (pm *Manager) Run(ctx context.Context, wg *sync.WaitGroup) {
138
defer wg.Done()
139
defer log.Debug("portManager shutdown")
140
141
ctx, cancel := context.WithCancel(ctx)
142
defer func() {
143
// We copy the subscriptions to a list prior to closing them, to prevent a data race
144
// between the map iteration and entry removal when closing the subscription.
145
pm.mu.Lock()
146
pm.closed = true
147
subs := make([]*Subscription, 0, len(pm.subscriptions))
148
for s := range pm.subscriptions {
149
subs = append(subs, s)
150
}
151
pm.mu.Unlock()
152
153
for _, s := range subs {
154
_ = s.Close(true)
155
}
156
}()
157
defer cancel()
158
159
go pm.E.Run(ctx)
160
exposedUpdates, exposedErrors := pm.E.Observe(ctx)
161
servedUpdates, servedErrors := pm.S.Observe(ctx)
162
configUpdates, configErrors := pm.C.Observe(ctx)
163
tunneledUpdates, tunneledErrors := pm.T.Observe(ctx)
164
for {
165
var (
166
exposed []ExposedPort
167
served []ServedPort
168
configured *Configs
169
tunneled []PortTunnelState
170
forceUpdate bool
171
)
172
select {
173
case <-pm.forceUpdates:
174
forceUpdate = true
175
case exposed = <-exposedUpdates:
176
if exposed == nil {
177
if ctx.Err() == nil {
178
log.Error("exposed ports observer stopped unexpectedly")
179
}
180
return
181
}
182
case served = <-servedUpdates:
183
if served == nil {
184
if ctx.Err() == nil {
185
log.Error("served ports observer stopped unexpectedly")
186
}
187
return
188
}
189
case configured = <-configUpdates:
190
if configured == nil {
191
if ctx.Err() == nil {
192
log.Error("configured ports observer stopped unexpectedly")
193
}
194
return
195
}
196
case tunneled = <-tunneledUpdates:
197
if tunneled == nil {
198
if ctx.Err() == nil {
199
log.Error("tunneled ports observer stopped unexpectedly")
200
}
201
return
202
}
203
204
case err := <-exposedErrors:
205
if err == nil {
206
if ctx.Err() == nil {
207
log.Error("exposed ports observer stopped unexpectedly")
208
}
209
return
210
}
211
log.WithError(err).Warn("error while observing exposed ports")
212
case err := <-servedErrors:
213
if err == nil {
214
if ctx.Err() == nil {
215
log.Error("served ports observer stopped unexpectedly")
216
}
217
return
218
}
219
log.WithError(err).Warn("error while observing served ports")
220
case err := <-configErrors:
221
if err == nil {
222
if ctx.Err() == nil {
223
log.Error("port configs observer stopped unexpectedly")
224
}
225
return
226
}
227
log.WithError(err).Warn("error while observing served port configs")
228
case err := <-tunneledErrors:
229
if err == nil {
230
if ctx.Err() == nil {
231
log.Error("tunneled ports observer stopped unexpectedly")
232
}
233
return
234
}
235
log.WithError(err).Warn("error while observing tunneled ports")
236
}
237
238
if exposed == nil && served == nil && configured == nil && tunneled == nil && !forceUpdate {
239
// we received just an error, but no update
240
continue
241
}
242
pm.updateState(ctx, exposed, served, configured, tunneled)
243
}
244
}
245
246
// Status provides the current port status
247
func (pm *Manager) Status() []*api.PortsStatus {
248
pm.mu.RLock()
249
defer pm.mu.RUnlock()
250
251
return pm.getStatus()
252
}
253
254
func (pm *Manager) updateState(ctx context.Context, exposed []ExposedPort, served []ServedPort, configured *Configs, tunneled []PortTunnelState) {
255
pm.mu.Lock()
256
defer pm.mu.Unlock()
257
258
if exposed != nil && !reflect.DeepEqual(pm.exposed, exposed) {
259
pm.exposed = exposed
260
}
261
262
if tunneled != nil && !reflect.DeepEqual(pm.tunneled, tunneled) {
263
pm.tunneled = tunneled
264
}
265
266
if served != nil {
267
servedMap := make(map[uint32]ServedPort)
268
for _, port := range served {
269
if _, existProxy := pm.proxies[port.Port]; existProxy && port.Address.String() == workspaceIPAdress {
270
// Ignore entries that are bound to the workspace ip address
271
// as they are created by the internal reverse proxy
272
continue
273
}
274
275
config, _, exists := pm.configs.Get(port.Port)
276
// don't serve ports that are configured to be ignored-completely
277
if exists && config.OnOpen == "ignore-completely" {
278
continue
279
}
280
281
current, exists := servedMap[port.Port]
282
if !exists || (!port.BoundToLocalhost && current.BoundToLocalhost) {
283
servedMap[port.Port] = port
284
}
285
}
286
287
var servedKeys []uint32
288
for k := range servedMap {
289
servedKeys = append(servedKeys, k)
290
}
291
sort.Slice(servedKeys, func(i, j int) bool {
292
return servedKeys[i] < servedKeys[j]
293
})
294
295
var newServed []ServedPort
296
for _, key := range servedKeys {
297
newServed = append(newServed, servedMap[key])
298
}
299
300
if !reflect.DeepEqual(pm.served, newServed) {
301
log.WithField("served", newServed).Debug("updating served ports")
302
pm.served = newServed
303
pm.updateProxies()
304
pm.autoTunnel(ctx)
305
}
306
}
307
308
if configured != nil {
309
pm.configs = configured
310
}
311
312
newState := pm.nextState(ctx)
313
stateChanged := !reflect.DeepEqual(newState, pm.state)
314
pm.state = newState
315
316
if !stateChanged && configured == nil {
317
return
318
}
319
320
status := pm.getStatus()
321
log.WithField("ports", fmt.Sprintf("%+v", status)).Debug("ports changed")
322
for sub := range pm.subscriptions {
323
select {
324
case sub.updates <- status:
325
case <-time.After(5 * time.Second):
326
log.Error("ports subscription droped out")
327
_ = sub.Close(false)
328
}
329
}
330
}
331
332
func (pm *Manager) nextState(ctx context.Context) map[uint32]*managedPort {
333
state := make(map[uint32]*managedPort)
334
335
genManagedPort := func(port uint32) *managedPort {
336
if mp, exists := state[port]; exists {
337
return mp
338
}
339
config, _, exists := pm.configs.Get(port)
340
var portConfig *gitpod.PortConfig
341
if exists && config != nil {
342
portConfig = &config.PortConfig
343
}
344
mp := &managedPort{
345
LocalhostPort: port,
346
OnExposed: getOnExposedAction(portConfig, port),
347
OnOpen: getOnOpenAction(portConfig, port),
348
}
349
if exists {
350
mp.Name = config.Name
351
mp.Description = config.Description
352
}
353
state[port] = mp
354
return mp
355
}
356
357
// 1. first capture exposed and tunneled since they don't depend on configured or served ports
358
for _, exposed := range pm.exposed {
359
port := exposed.LocalPort
360
if pm.boundInternally(port) {
361
continue
362
}
363
Visibility := api.PortVisibility_private
364
if exposed.Public {
365
Visibility = api.PortVisibility_public
366
}
367
portProtocol := api.PortProtocol_http
368
if exposed.Protocol == gitpod.PortProtocolHTTPS {
369
portProtocol = api.PortProtocol_https
370
}
371
mp := genManagedPort(port)
372
mp.Exposed = true
373
mp.Protocol = portProtocol
374
mp.Visibility = Visibility
375
mp.URL = exposed.URL
376
}
377
378
for _, tunneled := range pm.tunneled {
379
port := tunneled.Desc.LocalPort
380
if pm.boundInternally(port) {
381
continue
382
}
383
mp := genManagedPort(port)
384
mp.Tunneled = true
385
mp.TunneledTargetPort = tunneled.Desc.TargetPort
386
mp.TunneledVisibility = tunneled.Desc.Visibility
387
mp.TunneledClients = tunneled.Clients
388
}
389
390
// 2. second capture configured since we don't want to auto expose already exposed ports
391
if pm.configs != nil {
392
pm.configs.ForEach(func(port uint32, config *SortConfig) {
393
if pm.boundInternally(port) {
394
return
395
}
396
mp := genManagedPort(port)
397
autoExpose, autoExposed := pm.autoExposed[port]
398
if autoExposed {
399
mp.AutoExposure = autoExpose.state
400
}
401
if mp.Exposed || autoExposed {
402
return
403
}
404
405
mp.Visibility = api.PortVisibility_private
406
if config.Visibility == "public" {
407
mp.Visibility = api.PortVisibility_public
408
}
409
public := mp.Visibility == api.PortVisibility_public
410
mp.AutoExposure = pm.autoExpose(ctx, mp.LocalhostPort, public, config.Protocol).state
411
})
412
}
413
414
// 3. at last capture served ports since
415
// we don't want to auto expose already exposed ports on the same port
416
// and need configured to decide about default visiblity properly
417
for _, served := range pm.served {
418
port := served.Port
419
if pm.boundInternally(port) {
420
continue
421
}
422
mp := genManagedPort(port)
423
mp.Served = true
424
425
autoExposure, autoExposed := pm.autoExposed[port]
426
if autoExposed {
427
mp.AutoExposure = autoExposure.state
428
continue
429
}
430
431
var public bool
432
protocol := "http"
433
config, kind, exists := pm.configs.Get(mp.LocalhostPort)
434
435
getProtocol := func(p api.PortProtocol) string {
436
switch p {
437
case api.PortProtocol_https:
438
return "https"
439
default:
440
return "http"
441
}
442
}
443
444
configured := exists && kind == PortConfigKind
445
if mp.Exposed || configured {
446
public = mp.Visibility == api.PortVisibility_public
447
protocol = getProtocol(mp.Protocol)
448
} else if exists {
449
public = config.Visibility == "public"
450
protocol = config.Protocol
451
}
452
453
if mp.Exposed && ((mp.Visibility == api.PortVisibility_public && public) || (mp.Visibility == api.PortVisibility_private && !public)) && protocol != "https" {
454
continue
455
}
456
457
mp.AutoExposure = pm.autoExpose(ctx, mp.LocalhostPort, public, protocol).state
458
}
459
460
var ports []uint32
461
for port := range state {
462
ports = append(ports, port)
463
}
464
465
sort.Slice(ports, func(i, j int) bool {
466
return ports[i] < ports[j]
467
})
468
469
newState := make(map[uint32]*managedPort)
470
for _, mp := range ports {
471
newState[mp] = state[mp]
472
}
473
474
return newState
475
}
476
477
// clients should guard a call with check whether such port is already exposed or auto exposed
478
func (pm *Manager) autoExpose(ctx context.Context, localPort uint32, public bool, protocol string) *autoExposure {
479
exposing := pm.E.Expose(ctx, localPort, public, protocol)
480
autoExpose := &autoExposure{
481
state: api.PortAutoExposure_trying,
482
ctx: ctx,
483
public: public,
484
protocol: protocol,
485
}
486
go func() {
487
err := <-exposing
488
if err != nil {
489
if err != context.Canceled {
490
autoExpose.state = api.PortAutoExposure_failed
491
log.WithError(err).WithField("localPort", localPort).Warn("cannot auto-expose port")
492
}
493
return
494
}
495
autoExpose.state = api.PortAutoExposure_succeeded
496
log.WithField("localPort", localPort).Info("auto-exposed port")
497
}()
498
pm.autoExposed[localPort] = autoExpose
499
log.WithField("localPort", localPort).Info("auto-exposing port")
500
return autoExpose
501
}
502
503
// RetryAutoExpose retries auto exposing the give port
504
func (pm *Manager) RetryAutoExpose(ctx context.Context, localPort uint32) {
505
pm.mu.Lock()
506
defer pm.mu.Unlock()
507
autoExpose, autoExposed := pm.autoExposed[localPort]
508
if !autoExposed || autoExpose.state != api.PortAutoExposure_failed || autoExpose.ctx.Err() != nil {
509
return
510
}
511
pm.autoExpose(autoExpose.ctx, localPort, autoExpose.public, autoExpose.protocol)
512
pm.forceUpdate()
513
}
514
515
func (pm *Manager) forceUpdate() {
516
if len(pm.forceUpdates) == 0 {
517
pm.forceUpdates <- struct{}{}
518
}
519
}
520
521
func (pm *Manager) autoTunnel(ctx context.Context) {
522
if !pm.autoTunnelEnabled {
523
var localPorts []uint32
524
for localPort := range pm.autoTunneled {
525
localPorts = append(localPorts, localPort)
526
}
527
// CloseTunnel ensures that everything is closed
528
pm.autoTunneled = make(map[uint32]struct{})
529
_, err := pm.T.CloseTunnel(ctx, localPorts...)
530
if err != nil {
531
log.WithError(err).Error("cannot close auto tunneled ports")
532
}
533
return
534
}
535
var descs []*PortTunnelDescription
536
for _, served := range pm.served {
537
if pm.boundInternally(served.Port) {
538
continue
539
}
540
541
_, autoTunneled := pm.autoTunneled[served.Port]
542
if !autoTunneled {
543
descs = append(descs, &PortTunnelDescription{
544
LocalPort: served.Port,
545
TargetPort: served.Port,
546
Visibility: api.TunnelVisiblity_host,
547
})
548
}
549
}
550
autoTunneled, err := pm.T.Tunnel(ctx, &TunnelOptions{
551
SkipIfExists: true,
552
}, descs...)
553
if err != nil {
554
log.WithError(err).Error("cannot auto tunnel ports")
555
}
556
for _, localPort := range autoTunneled {
557
pm.autoTunneled[localPort] = struct{}{}
558
}
559
}
560
561
func (pm *Manager) updateProxies() {
562
servedPortMap := map[uint32]bool{}
563
for _, s := range pm.served {
564
servedPortMap[s.Port] = s.BoundToLocalhost
565
}
566
567
for port, proxy := range pm.proxies {
568
if boundToLocalhost, exists := servedPortMap[port]; !exists || !boundToLocalhost {
569
delete(pm.proxies, port)
570
err := proxy.Close()
571
if err != nil {
572
log.WithError(err).WithField("localPort", port).Warn("cannot stop localhost proxy")
573
} else {
574
log.WithField("localPort", port).Info("localhost proxy has been stopped")
575
}
576
}
577
}
578
579
for _, served := range pm.served {
580
localPort := served.Port
581
_, exists := pm.proxies[localPort]
582
if exists || !served.BoundToLocalhost {
583
continue
584
}
585
586
proxy, err := pm.proxyStarter(localPort)
587
if err != nil {
588
log.WithError(err).WithField("localPort", localPort).Warn("cannot start localhost proxy")
589
continue
590
}
591
log.WithField("localPort", localPort).Info("localhost proxy has been started")
592
593
pm.proxies[localPort] = &localhostProxy{
594
Closer: proxy,
595
proxyPort: localPort,
596
}
597
}
598
}
599
600
// deprecated
601
func getOnExposedAction(config *gitpod.PortConfig, port uint32) api.OnPortExposedAction {
602
if config == nil {
603
// anything above 32767 seems odd (e.g. used by language servers)
604
unusualRange := !(0 < port && port < 32767)
605
wellKnown := port <= 10000
606
if unusualRange || !wellKnown {
607
return api.OnPortExposedAction_ignore
608
}
609
return api.OnPortExposedAction_notify_private
610
}
611
if config.OnOpen == "ignore" {
612
return api.OnPortExposedAction_ignore
613
}
614
if config.OnOpen == "open-browser" {
615
return api.OnPortExposedAction_open_browser
616
}
617
if config.OnOpen == "open-preview" {
618
return api.OnPortExposedAction_open_preview
619
}
620
return api.OnPortExposedAction_notify
621
}
622
623
func getOnOpenAction(config *gitpod.PortConfig, port uint32) api.PortsStatus_OnOpenAction {
624
if config == nil {
625
// anything above 32767 seems odd (e.g. used by language servers)
626
unusualRange := !(0 < port && port < 32767)
627
wellKnown := port <= 10000
628
if unusualRange || !wellKnown {
629
return api.PortsStatus_ignore
630
}
631
return api.PortsStatus_notify_private
632
}
633
if config.OnOpen == "ignore-completely" {
634
return api.PortsStatus_ignore_completely
635
}
636
if config.OnOpen == "ignore" {
637
return api.PortsStatus_ignore
638
}
639
if config.OnOpen == "open-browser" {
640
return api.PortsStatus_open_browser
641
}
642
if config.OnOpen == "open-preview" {
643
return api.PortsStatus_open_preview
644
}
645
return api.PortsStatus_notify
646
}
647
648
func (pm *Manager) boundInternally(port uint32) bool {
649
_, exists := pm.internal[port]
650
return exists
651
}
652
653
// Expose exposes a port
654
func (pm *Manager) Expose(ctx context.Context, port uint32) error {
655
unlock := true
656
pm.mu.RLock()
657
defer func() {
658
if unlock {
659
pm.mu.RUnlock()
660
}
661
}()
662
663
mp, ok := pm.state[port]
664
if ok {
665
if mp.Exposed {
666
return nil
667
}
668
if pm.boundInternally(port) {
669
return xerrors.New("internal service cannot be exposed")
670
}
671
}
672
673
config, kind, exists := pm.configs.Get(port)
674
if exists && kind == PortConfigKind {
675
// will be auto-exposed
676
return nil
677
}
678
679
// we don't need the lock anymore. Let's unlock and make sure the defer doesn't try
680
// the same thing again.
681
pm.mu.RUnlock()
682
unlock = false
683
684
public := false
685
protocol := gitpod.PortProtocolHTTP
686
687
if exists {
688
public = config.Visibility != "private"
689
protocol = config.Protocol
690
}
691
692
err := <-pm.E.Expose(ctx, port, public, protocol)
693
if err != nil && err != context.Canceled {
694
log.WithError(err).WithField("port", port).Error("cannot expose port")
695
}
696
return err
697
}
698
699
// Tunnel opens a new tunnel.
700
func (pm *Manager) Tunnel(ctx context.Context, desc *PortTunnelDescription) error {
701
pm.mu.Lock()
702
defer pm.mu.Unlock()
703
if pm.boundInternally(desc.LocalPort) {
704
return xerrors.New("cannot tunnel internal port")
705
}
706
707
tunneled, err := pm.T.Tunnel(ctx, &TunnelOptions{
708
SkipIfExists: false,
709
}, desc)
710
for _, localPort := range tunneled {
711
delete(pm.autoTunneled, localPort)
712
}
713
return err
714
}
715
716
// CloseTunnel closes the tunnel.
717
func (pm *Manager) CloseTunnel(ctx context.Context, port uint32) error {
718
unlock := true
719
pm.mu.RLock()
720
defer func() {
721
if unlock {
722
pm.mu.RUnlock()
723
}
724
}()
725
726
if pm.boundInternally(port) {
727
return xerrors.New("cannot close internal port tunnel")
728
}
729
730
// we don't need the lock anymore. Let's unlock and make sure the defer doesn't try
731
// the same thing again.
732
pm.mu.RUnlock()
733
unlock = false
734
735
_, err := pm.T.CloseTunnel(ctx, port)
736
return err
737
}
738
739
// EstablishTunnel actually establishes the tunnel
740
func (pm *Manager) EstablishTunnel(ctx context.Context, clientID string, localPort uint32, targetPort uint32) (net.Conn, error) {
741
return pm.T.EstablishTunnel(ctx, clientID, localPort, targetPort)
742
}
743
744
// AutoTunnel controls enablement of auto tunneling
745
func (pm *Manager) AutoTunnel(ctx context.Context, enabled bool) {
746
pm.mu.Lock()
747
defer pm.mu.Unlock()
748
pm.autoTunnelEnabled = enabled
749
pm.autoTunnel(ctx)
750
}
751
752
var (
753
// ErrClosed when the port management is stopped
754
ErrClosed = errors.New("closed")
755
// ErrTooManySubscriptions when max allowed subscriptions exceed
756
ErrTooManySubscriptions = errors.New("too many subscriptions")
757
)
758
759
// Subscribe subscribes for status updates
760
func (pm *Manager) Subscribe() (*Subscription, error) {
761
pm.mu.Lock()
762
defer pm.mu.Unlock()
763
764
if pm.closed {
765
return nil, ErrClosed
766
}
767
768
if len(pm.subscriptions) > maxSubscriptions {
769
return nil, fmt.Errorf("too many subscriptions: %d", len(pm.subscriptions))
770
// return nil, ErrTooManySubscriptions
771
}
772
773
sub := &Subscription{updates: make(chan []*api.PortsStatus, 5)}
774
var once sync.Once
775
sub.Close = func(lock bool) error {
776
if lock {
777
pm.mu.Lock()
778
defer pm.mu.Unlock()
779
}
780
once.Do(func() {
781
close(sub.updates)
782
})
783
delete(pm.subscriptions, sub)
784
return nil
785
}
786
pm.subscriptions[sub] = struct{}{}
787
788
// makes sure that no updates can happen between clients receiving an initial status and subscribing
789
sub.updates <- pm.getStatus()
790
return sub, nil
791
}
792
793
// getStatus produces an API compatible port status list.
794
// Callers are expected to hold mu.
795
func (pm *Manager) getStatus() []*api.PortsStatus {
796
res := make([]*api.PortsStatus, 0, len(pm.state))
797
for port := range pm.state {
798
status := pm.getPortStatus(port)
799
// make sure they are not listed in ports list
800
if status.OnOpen == api.PortsStatus_ignore_completely {
801
continue
802
}
803
res = append(res, status)
804
}
805
sort.SliceStable(res, func(i, j int) bool {
806
// Max number of port 65536
807
score1 := NON_CONFIGED_BASIC_SCORE + res[i].LocalPort
808
score2 := NON_CONFIGED_BASIC_SCORE + res[j].LocalPort
809
if c, _, ok := pm.configs.Get(res[i].LocalPort); ok {
810
score1 = c.Sort
811
}
812
if c, _, ok := pm.configs.Get(res[j].LocalPort); ok {
813
score2 = c.Sort
814
}
815
if score1 != score2 {
816
return score1 < score2
817
}
818
// Ranged ports
819
return res[i].LocalPort < res[j].LocalPort
820
})
821
return res
822
}
823
824
func (pm *Manager) getPortStatus(port uint32) *api.PortsStatus {
825
mp := pm.state[port]
826
ps := &api.PortsStatus{
827
LocalPort: mp.LocalhostPort,
828
Served: mp.Served,
829
Description: mp.Description,
830
Name: mp.Name,
831
OnOpen: mp.OnOpen,
832
}
833
if mp.Exposed && mp.URL != "" {
834
ps.Exposed = &api.ExposedPortInfo{
835
Visibility: mp.Visibility,
836
Protocol: mp.Protocol,
837
Url: mp.URL,
838
OnExposed: mp.OnExposed,
839
}
840
}
841
ps.AutoExposure = mp.AutoExposure
842
if mp.Tunneled {
843
ps.Tunneled = &api.TunneledPortInfo{
844
TargetPort: mp.TunneledTargetPort,
845
Visibility: mp.TunneledVisibility,
846
Clients: mp.TunneledClients,
847
}
848
}
849
return ps
850
}
851
852
func startLocalhostProxy(port uint32) (io.Closer, error) {
853
listen := fmt.Sprintf("%s:%d", workspaceIPAdress, port)
854
target := fmt.Sprintf("localhost:%d", port)
855
856
var p tcpproxy.Proxy
857
p.AddRoute(listen, tcpproxy.To(target))
858
859
go func() {
860
err := p.Run()
861
if err == net.ErrClosed || strings.Contains(err.Error(), "use of closed network connection") {
862
return
863
}
864
log.WithError(err).WithField("local-port", port).Error("localhost proxy failed")
865
}()
866
return &p, nil
867
}
868
869
func defaultRoutableIP() (string, string) {
870
iface, err := nettest.RoutedInterface("ip", net.FlagUp|net.FlagBroadcast)
871
if err != nil {
872
return "", ""
873
}
874
875
iface, err = net.InterfaceByName(iface.Name)
876
if err != nil {
877
return "", ""
878
}
879
880
addresses, err := iface.Addrs()
881
if err != nil {
882
return "", ""
883
}
884
885
return iface.Name, addresses[0].(*net.IPNet).IP.String()
886
}
887
888