Path: blob/main/components/supervisor/pkg/ports/ports.go
2500 views
// Copyright (c) 2020 Gitpod GmbH. All rights reserved.1// Licensed under the GNU Affero General Public License (AGPL).2// See License.AGPL.txt in the project root for license information.34package ports56import (7"context"8"errors"9"fmt"10"io"11"net"12"reflect"13"sort"14"strings"15"sync"16"time"1718"golang.org/x/net/nettest"19"golang.org/x/xerrors"2021"github.com/gitpod-io/gitpod/common-go/log"22gitpod "github.com/gitpod-io/gitpod/gitpod-protocol"23"github.com/gitpod-io/gitpod/supervisor/api"24"inet.af/tcpproxy"25)2627var workspaceIPAdress string2829func init() {30_, workspaceIPAdress = defaultRoutableIP()31}3233// NewManager creates a new port manager34func NewManager(exposed ExposedPortsInterface, served ServedPortsObserver, config ConfigInterace, tunneled TunneledPortsInterface, internalPorts ...uint32) *Manager {35state := make(map[uint32]*managedPort)36internal := make(map[uint32]struct{})37for _, p := range internalPorts {38internal[p] = struct{}{}39}4041return &Manager{42E: exposed,43S: served,44C: config,45T: tunneled,4647forceUpdates: make(chan struct{}, 1),4849internal: internal,50proxies: make(map[uint32]*localhostProxy),51autoExposed: make(map[uint32]*autoExposure),52autoTunneled: make(map[uint32]struct{}),5354state: state,55subscriptions: make(map[*Subscription]struct{}),56proxyStarter: startLocalhostProxy,5758autoTunnelEnabled: true,59}60}6162type localhostProxy struct {63io.Closer64proxyPort uint3265}6667type autoExposure struct {68state api.PortAutoExposure69ctx context.Context70public bool71protocol string72}7374// Manager brings together served and exposed ports. It keeps track of which port is exposed, which one is served,75// auto-exposes ports and proxies ports served on localhost only.76type Manager struct {77E ExposedPortsInterface78S ServedPortsObserver79C ConfigInterace80T TunneledPortsInterface8182forceUpdates chan struct{}8384internal map[uint32]struct{}85proxies map[uint32]*localhostProxy86proxyStarter func(port uint32) (proxy io.Closer, err error)87autoExposed map[uint32]*autoExposure8889autoTunneled map[uint32]struct{}90autoTunnelEnabled bool9192configs *Configs93exposed []ExposedPort94served []ServedPort95tunneled []PortTunnelState9697state map[uint32]*managedPort98mu sync.RWMutex99100subscriptions map[*Subscription]struct{}101closed bool102}103104type managedPort struct {105Served bool106Exposed bool107Visibility api.PortVisibility108Protocol api.PortProtocol109Description string110Name string111URL string112OnExposed api.OnPortExposedAction // deprecated113OnOpen api.PortsStatus_OnOpenAction114AutoExposure api.PortAutoExposure115116LocalhostPort uint32117118Tunneled bool119TunneledTargetPort uint32120TunneledVisibility api.TunnelVisiblity121TunneledClients map[string]uint32122}123124// Subscription is a Subscription to status updates125type Subscription struct {126updates chan []*api.PortsStatus127Close func(lock bool) error128}129130// Updates returns the updates channel131func (s *Subscription) Updates() <-chan []*api.PortsStatus {132return s.updates133}134135// Run starts the port manager which keeps running until one of its observers stops.136func (pm *Manager) Run(ctx context.Context, wg *sync.WaitGroup) {137defer wg.Done()138defer log.Debug("portManager shutdown")139140ctx, cancel := context.WithCancel(ctx)141defer func() {142// We copy the subscriptions to a list prior to closing them, to prevent a data race143// between the map iteration and entry removal when closing the subscription.144pm.mu.Lock()145pm.closed = true146subs := make([]*Subscription, 0, len(pm.subscriptions))147for s := range pm.subscriptions {148subs = append(subs, s)149}150pm.mu.Unlock()151152for _, s := range subs {153_ = s.Close(true)154}155}()156defer cancel()157158go pm.E.Run(ctx)159exposedUpdates, exposedErrors := pm.E.Observe(ctx)160servedUpdates, servedErrors := pm.S.Observe(ctx)161configUpdates, configErrors := pm.C.Observe(ctx)162tunneledUpdates, tunneledErrors := pm.T.Observe(ctx)163for {164var (165exposed []ExposedPort166served []ServedPort167configured *Configs168tunneled []PortTunnelState169forceUpdate bool170)171select {172case <-pm.forceUpdates:173forceUpdate = true174case exposed = <-exposedUpdates:175if exposed == nil {176if ctx.Err() == nil {177log.Error("exposed ports observer stopped unexpectedly")178}179return180}181case served = <-servedUpdates:182if served == nil {183if ctx.Err() == nil {184log.Error("served ports observer stopped unexpectedly")185}186return187}188case configured = <-configUpdates:189if configured == nil {190if ctx.Err() == nil {191log.Error("configured ports observer stopped unexpectedly")192}193return194}195case tunneled = <-tunneledUpdates:196if tunneled == nil {197if ctx.Err() == nil {198log.Error("tunneled ports observer stopped unexpectedly")199}200return201}202203case err := <-exposedErrors:204if err == nil {205if ctx.Err() == nil {206log.Error("exposed ports observer stopped unexpectedly")207}208return209}210log.WithError(err).Warn("error while observing exposed ports")211case err := <-servedErrors:212if err == nil {213if ctx.Err() == nil {214log.Error("served ports observer stopped unexpectedly")215}216return217}218log.WithError(err).Warn("error while observing served ports")219case err := <-configErrors:220if err == nil {221if ctx.Err() == nil {222log.Error("port configs observer stopped unexpectedly")223}224return225}226log.WithError(err).Warn("error while observing served port configs")227case err := <-tunneledErrors:228if err == nil {229if ctx.Err() == nil {230log.Error("tunneled ports observer stopped unexpectedly")231}232return233}234log.WithError(err).Warn("error while observing tunneled ports")235}236237if exposed == nil && served == nil && configured == nil && tunneled == nil && !forceUpdate {238// we received just an error, but no update239continue240}241pm.updateState(ctx, exposed, served, configured, tunneled)242}243}244245// Status provides the current port status246func (pm *Manager) Status() []*api.PortsStatus {247pm.mu.RLock()248defer pm.mu.RUnlock()249250return pm.getStatus()251}252253func (pm *Manager) updateState(ctx context.Context, exposed []ExposedPort, served []ServedPort, configured *Configs, tunneled []PortTunnelState) {254pm.mu.Lock()255defer pm.mu.Unlock()256257if exposed != nil && !reflect.DeepEqual(pm.exposed, exposed) {258pm.exposed = exposed259}260261if tunneled != nil && !reflect.DeepEqual(pm.tunneled, tunneled) {262pm.tunneled = tunneled263}264265if served != nil {266servedMap := make(map[uint32]ServedPort)267for _, port := range served {268if _, existProxy := pm.proxies[port.Port]; existProxy && port.Address.String() == workspaceIPAdress {269// Ignore entries that are bound to the workspace ip address270// as they are created by the internal reverse proxy271continue272}273274config, _, exists := pm.configs.Get(port.Port)275// don't serve ports that are configured to be ignored-completely276if exists && config.OnOpen == "ignore-completely" {277continue278}279280current, exists := servedMap[port.Port]281if !exists || (!port.BoundToLocalhost && current.BoundToLocalhost) {282servedMap[port.Port] = port283}284}285286var servedKeys []uint32287for k := range servedMap {288servedKeys = append(servedKeys, k)289}290sort.Slice(servedKeys, func(i, j int) bool {291return servedKeys[i] < servedKeys[j]292})293294var newServed []ServedPort295for _, key := range servedKeys {296newServed = append(newServed, servedMap[key])297}298299if !reflect.DeepEqual(pm.served, newServed) {300log.WithField("served", newServed).Debug("updating served ports")301pm.served = newServed302pm.updateProxies()303pm.autoTunnel(ctx)304}305}306307if configured != nil {308pm.configs = configured309}310311newState := pm.nextState(ctx)312stateChanged := !reflect.DeepEqual(newState, pm.state)313pm.state = newState314315if !stateChanged && configured == nil {316return317}318319status := pm.getStatus()320log.WithField("ports", fmt.Sprintf("%+v", status)).Debug("ports changed")321for sub := range pm.subscriptions {322select {323case sub.updates <- status:324case <-time.After(5 * time.Second):325log.Error("ports subscription droped out")326_ = sub.Close(false)327}328}329}330331func (pm *Manager) nextState(ctx context.Context) map[uint32]*managedPort {332state := make(map[uint32]*managedPort)333334genManagedPort := func(port uint32) *managedPort {335if mp, exists := state[port]; exists {336return mp337}338config, _, exists := pm.configs.Get(port)339var portConfig *gitpod.PortConfig340if exists && config != nil {341portConfig = &config.PortConfig342}343mp := &managedPort{344LocalhostPort: port,345OnExposed: getOnExposedAction(portConfig, port),346OnOpen: getOnOpenAction(portConfig, port),347}348if exists {349mp.Name = config.Name350mp.Description = config.Description351}352state[port] = mp353return mp354}355356// 1. first capture exposed and tunneled since they don't depend on configured or served ports357for _, exposed := range pm.exposed {358port := exposed.LocalPort359if pm.boundInternally(port) {360continue361}362Visibility := api.PortVisibility_private363if exposed.Public {364Visibility = api.PortVisibility_public365}366portProtocol := api.PortProtocol_http367if exposed.Protocol == gitpod.PortProtocolHTTPS {368portProtocol = api.PortProtocol_https369}370mp := genManagedPort(port)371mp.Exposed = true372mp.Protocol = portProtocol373mp.Visibility = Visibility374mp.URL = exposed.URL375}376377for _, tunneled := range pm.tunneled {378port := tunneled.Desc.LocalPort379if pm.boundInternally(port) {380continue381}382mp := genManagedPort(port)383mp.Tunneled = true384mp.TunneledTargetPort = tunneled.Desc.TargetPort385mp.TunneledVisibility = tunneled.Desc.Visibility386mp.TunneledClients = tunneled.Clients387}388389// 2. second capture configured since we don't want to auto expose already exposed ports390if pm.configs != nil {391pm.configs.ForEach(func(port uint32, config *SortConfig) {392if pm.boundInternally(port) {393return394}395mp := genManagedPort(port)396autoExpose, autoExposed := pm.autoExposed[port]397if autoExposed {398mp.AutoExposure = autoExpose.state399}400if mp.Exposed || autoExposed {401return402}403404mp.Visibility = api.PortVisibility_private405if config.Visibility == "public" {406mp.Visibility = api.PortVisibility_public407}408public := mp.Visibility == api.PortVisibility_public409mp.AutoExposure = pm.autoExpose(ctx, mp.LocalhostPort, public, config.Protocol).state410})411}412413// 3. at last capture served ports since414// we don't want to auto expose already exposed ports on the same port415// and need configured to decide about default visiblity properly416for _, served := range pm.served {417port := served.Port418if pm.boundInternally(port) {419continue420}421mp := genManagedPort(port)422mp.Served = true423424autoExposure, autoExposed := pm.autoExposed[port]425if autoExposed {426mp.AutoExposure = autoExposure.state427continue428}429430var public bool431protocol := "http"432config, kind, exists := pm.configs.Get(mp.LocalhostPort)433434getProtocol := func(p api.PortProtocol) string {435switch p {436case api.PortProtocol_https:437return "https"438default:439return "http"440}441}442443configured := exists && kind == PortConfigKind444if mp.Exposed || configured {445public = mp.Visibility == api.PortVisibility_public446protocol = getProtocol(mp.Protocol)447} else if exists {448public = config.Visibility == "public"449protocol = config.Protocol450}451452if mp.Exposed && ((mp.Visibility == api.PortVisibility_public && public) || (mp.Visibility == api.PortVisibility_private && !public)) && protocol != "https" {453continue454}455456mp.AutoExposure = pm.autoExpose(ctx, mp.LocalhostPort, public, protocol).state457}458459var ports []uint32460for port := range state {461ports = append(ports, port)462}463464sort.Slice(ports, func(i, j int) bool {465return ports[i] < ports[j]466})467468newState := make(map[uint32]*managedPort)469for _, mp := range ports {470newState[mp] = state[mp]471}472473return newState474}475476// clients should guard a call with check whether such port is already exposed or auto exposed477func (pm *Manager) autoExpose(ctx context.Context, localPort uint32, public bool, protocol string) *autoExposure {478exposing := pm.E.Expose(ctx, localPort, public, protocol)479autoExpose := &autoExposure{480state: api.PortAutoExposure_trying,481ctx: ctx,482public: public,483protocol: protocol,484}485go func() {486err := <-exposing487if err != nil {488if err != context.Canceled {489autoExpose.state = api.PortAutoExposure_failed490log.WithError(err).WithField("localPort", localPort).Warn("cannot auto-expose port")491}492return493}494autoExpose.state = api.PortAutoExposure_succeeded495log.WithField("localPort", localPort).Info("auto-exposed port")496}()497pm.autoExposed[localPort] = autoExpose498log.WithField("localPort", localPort).Info("auto-exposing port")499return autoExpose500}501502// RetryAutoExpose retries auto exposing the give port503func (pm *Manager) RetryAutoExpose(ctx context.Context, localPort uint32) {504pm.mu.Lock()505defer pm.mu.Unlock()506autoExpose, autoExposed := pm.autoExposed[localPort]507if !autoExposed || autoExpose.state != api.PortAutoExposure_failed || autoExpose.ctx.Err() != nil {508return509}510pm.autoExpose(autoExpose.ctx, localPort, autoExpose.public, autoExpose.protocol)511pm.forceUpdate()512}513514func (pm *Manager) forceUpdate() {515if len(pm.forceUpdates) == 0 {516pm.forceUpdates <- struct{}{}517}518}519520func (pm *Manager) autoTunnel(ctx context.Context) {521if !pm.autoTunnelEnabled {522var localPorts []uint32523for localPort := range pm.autoTunneled {524localPorts = append(localPorts, localPort)525}526// CloseTunnel ensures that everything is closed527pm.autoTunneled = make(map[uint32]struct{})528_, err := pm.T.CloseTunnel(ctx, localPorts...)529if err != nil {530log.WithError(err).Error("cannot close auto tunneled ports")531}532return533}534var descs []*PortTunnelDescription535for _, served := range pm.served {536if pm.boundInternally(served.Port) {537continue538}539540_, autoTunneled := pm.autoTunneled[served.Port]541if !autoTunneled {542descs = append(descs, &PortTunnelDescription{543LocalPort: served.Port,544TargetPort: served.Port,545Visibility: api.TunnelVisiblity_host,546})547}548}549autoTunneled, err := pm.T.Tunnel(ctx, &TunnelOptions{550SkipIfExists: true,551}, descs...)552if err != nil {553log.WithError(err).Error("cannot auto tunnel ports")554}555for _, localPort := range autoTunneled {556pm.autoTunneled[localPort] = struct{}{}557}558}559560func (pm *Manager) updateProxies() {561servedPortMap := map[uint32]bool{}562for _, s := range pm.served {563servedPortMap[s.Port] = s.BoundToLocalhost564}565566for port, proxy := range pm.proxies {567if boundToLocalhost, exists := servedPortMap[port]; !exists || !boundToLocalhost {568delete(pm.proxies, port)569err := proxy.Close()570if err != nil {571log.WithError(err).WithField("localPort", port).Warn("cannot stop localhost proxy")572} else {573log.WithField("localPort", port).Info("localhost proxy has been stopped")574}575}576}577578for _, served := range pm.served {579localPort := served.Port580_, exists := pm.proxies[localPort]581if exists || !served.BoundToLocalhost {582continue583}584585proxy, err := pm.proxyStarter(localPort)586if err != nil {587log.WithError(err).WithField("localPort", localPort).Warn("cannot start localhost proxy")588continue589}590log.WithField("localPort", localPort).Info("localhost proxy has been started")591592pm.proxies[localPort] = &localhostProxy{593Closer: proxy,594proxyPort: localPort,595}596}597}598599// deprecated600func getOnExposedAction(config *gitpod.PortConfig, port uint32) api.OnPortExposedAction {601if config == nil {602// anything above 32767 seems odd (e.g. used by language servers)603unusualRange := !(0 < port && port < 32767)604wellKnown := port <= 10000605if unusualRange || !wellKnown {606return api.OnPortExposedAction_ignore607}608return api.OnPortExposedAction_notify_private609}610if config.OnOpen == "ignore" {611return api.OnPortExposedAction_ignore612}613if config.OnOpen == "open-browser" {614return api.OnPortExposedAction_open_browser615}616if config.OnOpen == "open-preview" {617return api.OnPortExposedAction_open_preview618}619return api.OnPortExposedAction_notify620}621622func getOnOpenAction(config *gitpod.PortConfig, port uint32) api.PortsStatus_OnOpenAction {623if config == nil {624// anything above 32767 seems odd (e.g. used by language servers)625unusualRange := !(0 < port && port < 32767)626wellKnown := port <= 10000627if unusualRange || !wellKnown {628return api.PortsStatus_ignore629}630return api.PortsStatus_notify_private631}632if config.OnOpen == "ignore-completely" {633return api.PortsStatus_ignore_completely634}635if config.OnOpen == "ignore" {636return api.PortsStatus_ignore637}638if config.OnOpen == "open-browser" {639return api.PortsStatus_open_browser640}641if config.OnOpen == "open-preview" {642return api.PortsStatus_open_preview643}644return api.PortsStatus_notify645}646647func (pm *Manager) boundInternally(port uint32) bool {648_, exists := pm.internal[port]649return exists650}651652// Expose exposes a port653func (pm *Manager) Expose(ctx context.Context, port uint32) error {654unlock := true655pm.mu.RLock()656defer func() {657if unlock {658pm.mu.RUnlock()659}660}()661662mp, ok := pm.state[port]663if ok {664if mp.Exposed {665return nil666}667if pm.boundInternally(port) {668return xerrors.New("internal service cannot be exposed")669}670}671672config, kind, exists := pm.configs.Get(port)673if exists && kind == PortConfigKind {674// will be auto-exposed675return nil676}677678// we don't need the lock anymore. Let's unlock and make sure the defer doesn't try679// the same thing again.680pm.mu.RUnlock()681unlock = false682683public := false684protocol := gitpod.PortProtocolHTTP685686if exists {687public = config.Visibility != "private"688protocol = config.Protocol689}690691err := <-pm.E.Expose(ctx, port, public, protocol)692if err != nil && err != context.Canceled {693log.WithError(err).WithField("port", port).Error("cannot expose port")694}695return err696}697698// Tunnel opens a new tunnel.699func (pm *Manager) Tunnel(ctx context.Context, desc *PortTunnelDescription) error {700pm.mu.Lock()701defer pm.mu.Unlock()702if pm.boundInternally(desc.LocalPort) {703return xerrors.New("cannot tunnel internal port")704}705706tunneled, err := pm.T.Tunnel(ctx, &TunnelOptions{707SkipIfExists: false,708}, desc)709for _, localPort := range tunneled {710delete(pm.autoTunneled, localPort)711}712return err713}714715// CloseTunnel closes the tunnel.716func (pm *Manager) CloseTunnel(ctx context.Context, port uint32) error {717unlock := true718pm.mu.RLock()719defer func() {720if unlock {721pm.mu.RUnlock()722}723}()724725if pm.boundInternally(port) {726return xerrors.New("cannot close internal port tunnel")727}728729// we don't need the lock anymore. Let's unlock and make sure the defer doesn't try730// the same thing again.731pm.mu.RUnlock()732unlock = false733734_, err := pm.T.CloseTunnel(ctx, port)735return err736}737738// EstablishTunnel actually establishes the tunnel739func (pm *Manager) EstablishTunnel(ctx context.Context, clientID string, localPort uint32, targetPort uint32) (net.Conn, error) {740return pm.T.EstablishTunnel(ctx, clientID, localPort, targetPort)741}742743// AutoTunnel controls enablement of auto tunneling744func (pm *Manager) AutoTunnel(ctx context.Context, enabled bool) {745pm.mu.Lock()746defer pm.mu.Unlock()747pm.autoTunnelEnabled = enabled748pm.autoTunnel(ctx)749}750751var (752// ErrClosed when the port management is stopped753ErrClosed = errors.New("closed")754// ErrTooManySubscriptions when max allowed subscriptions exceed755ErrTooManySubscriptions = errors.New("too many subscriptions")756)757758// Subscribe subscribes for status updates759func (pm *Manager) Subscribe() (*Subscription, error) {760pm.mu.Lock()761defer pm.mu.Unlock()762763if pm.closed {764return nil, ErrClosed765}766767if len(pm.subscriptions) > maxSubscriptions {768return nil, fmt.Errorf("too many subscriptions: %d", len(pm.subscriptions))769// return nil, ErrTooManySubscriptions770}771772sub := &Subscription{updates: make(chan []*api.PortsStatus, 5)}773var once sync.Once774sub.Close = func(lock bool) error {775if lock {776pm.mu.Lock()777defer pm.mu.Unlock()778}779once.Do(func() {780close(sub.updates)781})782delete(pm.subscriptions, sub)783return nil784}785pm.subscriptions[sub] = struct{}{}786787// makes sure that no updates can happen between clients receiving an initial status and subscribing788sub.updates <- pm.getStatus()789return sub, nil790}791792// getStatus produces an API compatible port status list.793// Callers are expected to hold mu.794func (pm *Manager) getStatus() []*api.PortsStatus {795res := make([]*api.PortsStatus, 0, len(pm.state))796for port := range pm.state {797status := pm.getPortStatus(port)798// make sure they are not listed in ports list799if status.OnOpen == api.PortsStatus_ignore_completely {800continue801}802res = append(res, status)803}804sort.SliceStable(res, func(i, j int) bool {805// Max number of port 65536806score1 := NON_CONFIGED_BASIC_SCORE + res[i].LocalPort807score2 := NON_CONFIGED_BASIC_SCORE + res[j].LocalPort808if c, _, ok := pm.configs.Get(res[i].LocalPort); ok {809score1 = c.Sort810}811if c, _, ok := pm.configs.Get(res[j].LocalPort); ok {812score2 = c.Sort813}814if score1 != score2 {815return score1 < score2816}817// Ranged ports818return res[i].LocalPort < res[j].LocalPort819})820return res821}822823func (pm *Manager) getPortStatus(port uint32) *api.PortsStatus {824mp := pm.state[port]825ps := &api.PortsStatus{826LocalPort: mp.LocalhostPort,827Served: mp.Served,828Description: mp.Description,829Name: mp.Name,830OnOpen: mp.OnOpen,831}832if mp.Exposed && mp.URL != "" {833ps.Exposed = &api.ExposedPortInfo{834Visibility: mp.Visibility,835Protocol: mp.Protocol,836Url: mp.URL,837OnExposed: mp.OnExposed,838}839}840ps.AutoExposure = mp.AutoExposure841if mp.Tunneled {842ps.Tunneled = &api.TunneledPortInfo{843TargetPort: mp.TunneledTargetPort,844Visibility: mp.TunneledVisibility,845Clients: mp.TunneledClients,846}847}848return ps849}850851func startLocalhostProxy(port uint32) (io.Closer, error) {852listen := fmt.Sprintf("%s:%d", workspaceIPAdress, port)853target := fmt.Sprintf("localhost:%d", port)854855var p tcpproxy.Proxy856p.AddRoute(listen, tcpproxy.To(target))857858go func() {859err := p.Run()860if err == net.ErrClosed || strings.Contains(err.Error(), "use of closed network connection") {861return862}863log.WithError(err).WithField("local-port", port).Error("localhost proxy failed")864}()865return &p, nil866}867868func defaultRoutableIP() (string, string) {869iface, err := nettest.RoutedInterface("ip", net.FlagUp|net.FlagBroadcast)870if err != nil {871return "", ""872}873874iface, err = net.InterfaceByName(iface.Name)875if err != nil {876return "", ""877}878879addresses, err := iface.Addrs()880if err != nil {881return "", ""882}883884return iface.Name, addresses[0].(*net.IPNet).IP.String()885}886887888