Path: blob/main/components/supervisor/pkg/ports/tunnel.go
2500 views
// Copyright (c) 2021 Gitpod GmbH. All rights reserved.1// Licensed under the GNU Affero General Public License (AGPL).2// See License.AGPL.txt in the project root for license information.34package ports56import (7"context"8"fmt"9"io"10"net"11"sort"12"strconv"13"sync"1415"golang.org/x/xerrors"1617"github.com/gitpod-io/gitpod/supervisor/api"18)1920type PortTunnelDescription struct {21LocalPort uint3222TargetPort uint3223Visibility api.TunnelVisiblity24}2526type PortTunnelState struct {27Desc PortTunnelDescription28Clients map[string]uint3229}3031type PortTunnel struct {32State PortTunnelState33Conns map[string]map[net.Conn]struct{}34}3536type TunnelOptions struct {37SkipIfExists bool38}3940// TunneledPortsInterface observes the tunneled ports.41type TunneledPortsInterface interface {42// Observe starts observing the tunneled ports until the context is canceled.43// The list of tunneled ports is always the complete picture, i.e. if a single port changes,44// the whole list is returned.45// When the observer stops operating (because the context as canceled or an irrecoverable46// error occurred), the observer will close both channels.47Observe(ctx context.Context) (<-chan []PortTunnelState, <-chan error)4849// Tunnel notifies clients to install listeners on remote machines.50// After that such clients should call EstablishTunnel to forward incoming connections.51Tunnel(ctx context.Context, options *TunnelOptions, descs ...*PortTunnelDescription) ([]uint32, error)5253// CloseTunnel closes tunnels.54CloseTunnel(ctx context.Context, localPorts ...uint32) ([]uint32, error)5556// EstablishTunnel actually establishes the tunnel for an incoming connection on a remote machine.57EstablishTunnel(ctx context.Context, clientID string, localPort uint32, targetPort uint32) (net.Conn, error)58}5960// TunneledPortsService observes the tunneled ports.61type TunneledPortsService struct {62mu *sync.RWMutex63cond *sync.Cond64tunnels map[uint32]*PortTunnel65}6667// NewTunneledPortsService creates a new instance.68func NewTunneledPortsService(debugEnable bool) *TunneledPortsService {69var mu sync.RWMutex70return &TunneledPortsService{71mu: &mu,72cond: sync.NewCond(&mu),73tunnels: make(map[uint32]*PortTunnel),74}75}7677type tunnelConn struct {78net.Conn79once sync.Once80closeErr error81onDidClose func()82}8384func (c *tunnelConn) Close() error {85c.once.Do(func() {86c.closeErr = c.Conn.Close()87c.onDidClose()88})89return c.closeErr90}9192// Observe starts observing the tunneled ports until the context is canceled.93func (p *TunneledPortsService) Observe(ctx context.Context) (<-chan []PortTunnelState, <-chan error) {94var (95errchan = make(chan error, 1)96reschan = make(chan []PortTunnelState)97)9899go func() {100defer close(errchan)101defer close(reschan)102103p.cond.L.Lock()104defer p.cond.L.Unlock()105for {106var i int107res := make([]PortTunnelState, len(p.tunnels))108for _, port := range p.tunnels {109res[i] = port.State110i++111}112reschan <- res113114p.cond.Wait()115if ctx.Err() != nil {116return117}118}119}()120121return reschan, errchan122}123124func (desc *PortTunnelDescription) validate() (err error) {125if desc.LocalPort <= 0 || desc.LocalPort > 0xFFFF {126return xerrors.Errorf("bad local port: %d", desc.LocalPort)127}128if desc.TargetPort > 0xFFFF {129return xerrors.Errorf("bad target port: %d", desc.TargetPort)130}131return nil132}133134// Tunnel opens new tunnels.135func (p *TunneledPortsService) Tunnel(ctx context.Context, options *TunnelOptions, descs ...*PortTunnelDescription) (tunneled []uint32, err error) {136var shouldNotify bool137p.cond.L.Lock()138defer p.cond.L.Unlock()139for _, desc := range descs {140descErr := desc.validate()141if descErr != nil {142if err == nil {143err = descErr144} else {145err = xerrors.Errorf("%s\n%s", err, descErr)146}147continue148}149tunnel, tunnelExists := p.tunnels[desc.LocalPort]150if !tunnelExists {151tunnel = &PortTunnel{152State: PortTunnelState{153Clients: make(map[string]uint32),154},155Conns: make(map[string]map[net.Conn]struct{}),156}157p.tunnels[desc.LocalPort] = tunnel158} else if options.SkipIfExists {159continue160}161tunnel.State.Desc = *desc162shouldNotify = true163tunneled = append(tunneled, desc.LocalPort)164}165if shouldNotify {166p.cond.Broadcast()167}168return tunneled, err169}170171// CloseTunnel closes tunnels.172func (p *TunneledPortsService) CloseTunnel(ctx context.Context, localPorts ...uint32) (closedPorts []uint32, err error) {173var closed []*PortTunnel174p.cond.L.Lock()175for _, localPort := range localPorts {176tunnel, existsTunnel := p.tunnels[localPort]177if !existsTunnel {178continue179}180delete(p.tunnels, localPort)181closed = append(closed, tunnel)182closedPorts = append(closedPorts, localPort)183}184if len(closed) > 0 {185p.cond.Broadcast()186}187p.cond.L.Unlock()188for _, tunnel := range closed {189for _, conns := range tunnel.Conns {190for conn := range conns {191closeErr := conn.Close()192if closeErr == nil {193continue194}195if err == nil {196err = closeErr197} else {198err = xerrors.Errorf("%s\n%s", err, closeErr)199}200}201}202}203return closedPorts, err204}205206// EstablishTunnel actually establishes the tunnel.207func (p *TunneledPortsService) EstablishTunnel(ctx context.Context, clientID string, localPort uint32, targetPort uint32) (net.Conn, error) {208p.cond.L.Lock()209defer p.cond.L.Unlock()210211tunnel, tunnelExists := p.tunnels[localPort]212if tunnelExists {213expectedTargetPort, clientExists := tunnel.State.Clients[clientID]214if clientExists && expectedTargetPort != targetPort {215return nil, xerrors.Errorf("client '%s': %d:%d is already tunneling", clientID, localPort, targetPort)216}217} else {218return nil, xerrors.Errorf("client '%s': '%d' tunnel does not exist", clientID, localPort)219}220221addr := net.JoinHostPort("localhost", strconv.FormatInt(int64(localPort), 10))222conn, err := net.Dial("tcp", addr)223if err != nil {224return nil, err225}226var result net.Conn227result = &tunnelConn{228Conn: conn,229onDidClose: func() {230p.cond.L.Lock()231defer p.cond.L.Unlock()232_, existsTunnel := p.tunnels[localPort]233if !existsTunnel {234return235}236delete(tunnel.Conns[clientID], result)237if len(tunnel.Conns[clientID]) == 0 {238delete(tunnel.State.Clients, clientID)239}240p.cond.Broadcast()241},242}243if tunnel.Conns[clientID] == nil {244tunnel.Conns[clientID] = make(map[net.Conn]struct{})245}246tunnel.Conns[clientID][result] = struct{}{}247tunnel.State.Clients[clientID] = targetPort248p.cond.Broadcast()249return result, nil250}251252// Snapshot writes a snapshot to w.253func (p *TunneledPortsService) Snapshot(w io.Writer) {254p.mu.RLock()255defer p.mu.RUnlock()256257localPorts := make([]uint32, 0, len(p.tunnels))258for k := range p.tunnels {259localPorts = append(localPorts, k)260}261sort.Slice(localPorts, func(i, j int) bool { return localPorts[i] < localPorts[j] })262263for _, localPort := range localPorts {264tunnel := p.tunnels[localPort]265fmt.Fprintf(w, "Local Port: %d\n", tunnel.State.Desc.LocalPort)266fmt.Fprintf(w, "Target Port: %d\n", tunnel.State.Desc.TargetPort)267visibilty := api.TunnelVisiblity_name[int32(tunnel.State.Desc.Visibility)]268fmt.Fprintf(w, "Visibility: %s\n", visibilty)269for clientID, remotePort := range tunnel.State.Clients {270fmt.Fprintf(w, "Client: %s\n", clientID)271fmt.Fprintf(w, " Remote Port: %d\n", remotePort)272fmt.Fprintf(w, " Tunnel Count: %d\n", len(tunnel.Conns[clientID]))273}274fmt.Fprintf(w, "\n")275}276}277278279