Path: blob/main/components/local-app/pkg/bastion/bastion.go
2500 views
// Copyright (c) 2021 Gitpod GmbH. All rights reserved.1// Licensed under the GNU Affero General Public License (AGPL).2// See License.AGPL.txt in the project root for license information.34package bastion56import (7"context"8"crypto/rand"9"crypto/rsa"10"crypto/x509"11"encoding/pem"12"errors"13"fmt"14"io"15"io/ioutil"16"net"17"net/http"18"os"19"path/filepath"20"strconv"21"strings"22"sync"23"time"2425"github.com/google/uuid"26"github.com/kevinburke/ssh_config"27"github.com/sirupsen/logrus"28"golang.org/x/crypto/ssh"29"golang.org/x/xerrors"30"google.golang.org/grpc"31"google.golang.org/grpc/credentials/insecure"32"google.golang.org/protobuf/proto"3334gitpod "github.com/gitpod-io/gitpod/gitpod-protocol"35app "github.com/gitpod-io/gitpod/local-app/api"36supervisor "github.com/gitpod-io/gitpod/supervisor/api"37)3839var (40// ErrClosed when the port management is stopped41ErrClosed = errors.New("closed")42// ErrTooManySubscriptions when max allowed subscriptions exceed43ErrTooManySubscriptions = errors.New("too many subscriptions")44)4546// StatusSubscription is a StatusSubscription to status updates47type StatusSubscription struct {48instanceID string49updates chan []*app.TunnelStatus50Close func() error51}5253func (s *StatusSubscription) Updates() <-chan []*app.TunnelStatus {54return s.updates55}5657type TunnelClient struct {58ID string // we cannot use conn session ID, since proto fails to serialize it59Conn ssh.Conn60}6162type TunnelListener struct {63RemotePort uint3264LocalAddr string65LocalPort uint3266Visibility supervisor.TunnelVisiblity67Ctx context.Context68Cancel func()69}7071type Workspace struct {72InstanceID string73WorkspaceID string74Phase string75OwnerToken string76URL string7778supervisorListener *TunnelListener79supervisorClient *grpc.ClientConn8081tunnelMu sync.RWMutex82tunnelListeners map[uint32]*TunnelListener83tunnelEnabled bool84cancelTunnel context.CancelFunc8586localSSHListener *TunnelListener87SSHPrivateFN string88SSHPublicKey string8990ctx context.Context91cancel context.CancelFunc9293tunnelClient chan chan *TunnelClient94tunnelClientConnected bool95}9697func (ws *Workspace) Status() []*app.TunnelStatus {98ws.tunnelMu.RLock()99defer ws.tunnelMu.RUnlock()100res := make([]*app.TunnelStatus, 0, len(ws.tunnelListeners))101for _, listener := range ws.tunnelListeners {102res = append(res, &app.TunnelStatus{103RemotePort: listener.RemotePort,104LocalPort: listener.LocalPort,105Visibility: listener.Visibility,106})107}108return res109}110111type Callbacks interface {112InstanceUpdate(*Workspace)113}114115type CompositeCallbacks []Callbacks116117func (cb CompositeCallbacks) InstanceUpdate(w *Workspace) {118for _, c := range cb {119c.InstanceUpdate(w)120}121}122123type SSHConfigWritingCallback struct {124Path string125126workspaces map[string]*Workspace127}128129func (s *SSHConfigWritingCallback) InstanceUpdate(w *Workspace) {130if s.workspaces == nil {131s.workspaces = make(map[string]*Workspace)132}133if w.localSSHListener == nil || w.Phase == "stopping" {134delete(s.workspaces, w.WorkspaceID)135} else if _, exists := s.workspaces[w.WorkspaceID]; !exists {136s.workspaces[w.WorkspaceID] = w137}138139var cfg ssh_config.Config140for _, ws := range s.workspaces {141p, err := ssh_config.NewPattern(ws.WorkspaceID)142if err != nil {143logrus.WithError(err).Warn("cannot produce ssh_config entry")144continue145}146147host, port, _ := net.SplitHostPort(ws.localSSHListener.LocalAddr)148cfg.Hosts = append(cfg.Hosts, &ssh_config.Host{149Patterns: []*ssh_config.Pattern{p},150Nodes: []ssh_config.Node{151&ssh_config.KV{Key: "HostName", Value: host},152&ssh_config.KV{Key: "User", Value: "gitpod"},153&ssh_config.KV{Key: "Port", Value: port},154&ssh_config.KV{Key: "IdentityFile", Value: ws.SSHPrivateFN},155&ssh_config.KV{Key: "IdentitiesOnly", Value: "yes"},156},157})158}159160err := ioutil.WriteFile(s.Path, []byte(cfg.String()), 0644)161if err != nil {162logrus.WithError(err).WithField("path", s.Path).Error("cannot write ssh config file")163return164}165}166167func New(client gitpod.APIInterface, localAppTimeout time.Duration, cb Callbacks) *Bastion {168ctx, cancel := context.WithCancel(context.Background())169return &Bastion{170Client: client,171Callbacks: cb,172workspaces: make(map[string]*Workspace),173localAppTimeout: localAppTimeout,174workspaceMapChangeChan: make(chan int),175ctx: ctx,176stop: cancel,177updates: make(chan *WorkspaceUpdateRequest, 10),178subscriptions: make(map[*StatusSubscription]struct{}, 10),179}180}181182type WorkspaceUpdateRequest struct {183instance *gitpod.WorkspaceInstance184done chan *Workspace185}186187type Bastion struct {188id string189updates chan *WorkspaceUpdateRequest190191Client gitpod.APIInterface192Callbacks Callbacks193194workspacesMu sync.RWMutex195workspaces map[string]*Workspace196197localAppTimeout time.Duration198workspaceMapChangeChan chan int199200ctx context.Context201stop context.CancelFunc202203subscriptionsMu sync.RWMutex204subscriptions map[*StatusSubscription]struct{}205206EnableAutoTunnel bool207}208209func (b *Bastion) Run() error {210updates, err := b.Client.WorkspaceUpdates(b.ctx, "")211if err != nil {212return err213}214215defer func() {216// We copy the subscriptions to a list prior to closing them, to prevent a data race217// between the map iteration and entry removal when closing the subscription.218b.subscriptionsMu.Lock()219subs := make([]*StatusSubscription, 0, len(b.subscriptions))220for s := range b.subscriptions {221subs = append(subs, s)222}223b.subscriptionsMu.Unlock()224225for _, s := range subs {226s.Close()227}228}()229230go b.handleTimeout()231if b.localAppTimeout != 0 {232b.workspaceMapChangeChan <- 0233}234235go func() {236for u := range b.updates {237b.handleUpdate(u)238}239}()240b.FullUpdate()241242for u := range updates {243b.updates <- &WorkspaceUpdateRequest{244instance: u,245}246}247return nil248}249250func (b *Bastion) FullUpdate() {251wss, err := b.Client.GetWorkspaces(b.ctx, &gitpod.GetWorkspacesOptions{Limit: float64(100)})252if err != nil {253logrus.WithError(err).Warn("cannot get workspaces")254} else {255for _, ws := range wss {256if ws.LatestInstance == nil {257continue258}259b.updates <- &WorkspaceUpdateRequest{260instance: ws.LatestInstance,261}262}263}264}265266func (b *Bastion) Update(workspaceID string) *Workspace {267ws, err := b.Client.GetWorkspace(b.ctx, workspaceID)268if err != nil {269logrus.WithError(err).WithField("WorkspaceID", workspaceID).Error("cannot get workspace")270return nil271}272if ws.LatestInstance == nil {273return nil274}275done := make(chan *Workspace)276b.updates <- &WorkspaceUpdateRequest{277instance: ws.LatestInstance,278done: done,279}280return <-done281}282283func (b *Bastion) handleTimeout() {284if b.localAppTimeout == 0 {285return286}287var timer *time.Timer288for count := range b.workspaceMapChangeChan {289if count == 0 && timer == nil {290logrus.Debugf("local app will terminate in %v", b.localAppTimeout)291timer = time.AfterFunc(b.localAppTimeout, func() {292os.Exit(0)293})294} else if count != 0 && timer != nil {295logrus.Debugln("reset local app terminate timeout")296timer.Stop()297timer = nil298}299}300}301302func (b *Bastion) handleUpdate(ur *WorkspaceUpdateRequest) {303var ws *Workspace304u := ur.instance305defer func() {306if ur.done != nil {307ur.done <- ws308close(ur.done)309}310}()311312b.workspacesMu.Lock()313defer b.workspacesMu.Unlock()314315ws, ok := b.workspaces[u.ID]316if !ok {317if u.Status.Phase == "stopping" || u.Status.Phase == "stopped" {318return319}320ctx, cancel := context.WithCancel(b.ctx)321ws = &Workspace{322InstanceID: u.ID,323WorkspaceID: u.WorkspaceID,324325ctx: ctx,326cancel: cancel,327328tunnelClient: make(chan chan *TunnelClient, 1),329tunnelListeners: make(map[uint32]*TunnelListener),330tunnelEnabled: true,331}332}333ws.Phase = u.Status.Phase334ws.URL = u.IdeURL335ws.OwnerToken = u.Status.OwnerToken336if ws.OwnerToken == "" && ws.Phase == "running" {337// updates don't carry the owner token338go b.FullUpdate()339}340341switch ws.Phase {342case "running":343if ws.OwnerToken != "" && !ws.tunnelClientConnected {344err := b.connectTunnelClient(ws.ctx, ws)345if err != nil {346logrus.WithError(err).WithField("workspace", ws.WorkspaceID).Error("tunnel client failed to connect")347}348}349if ws.supervisorListener == nil && ws.tunnelClientConnected {350var err error351ws.supervisorListener, err = b.establishTunnel(ws.ctx, ws, "supervisor", 22999, 0, supervisor.TunnelVisiblity_host)352if err != nil {353logrus.WithError(err).WithField("workspace", ws.WorkspaceID).Error("cannot establish supervisor tunnel")354}355}356357if ws.supervisorClient == nil && ws.supervisorListener != nil {358var err error359ws.supervisorClient, err = grpc.Dial(ws.supervisorListener.LocalAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))360if err != nil {361logrus.WithError(err).WithField("workspace", ws.WorkspaceID).Error("error connecting to supervisor")362} else {363go func() {364<-ws.ctx.Done()365ws.supervisorClient.Close()366}()367}368}369370if ws.supervisorClient != nil && b.EnableAutoTunnel {371go b.tunnelPorts(ws)372}373374if ws.localSSHListener == nil && ws.supervisorClient != nil {375func() {376var err error377ws.SSHPrivateFN, ws.SSHPublicKey, err = generateSSHKeys(ws.InstanceID)378if err != nil {379logrus.WithError(err).WithField("workspaceInstanceID", ws.InstanceID).Error("cannot produce SSH keypair")380return381}382383ws.localSSHListener, err = b.establishSSHTunnel(ws)384if err != nil {385logrus.WithError(err).Error("cannot establish SSH tunnel")386}387}()388}389390case "stopping", "stopped":391ws.cancel()392delete(b.workspaces, u.ID)393b.Callbacks.InstanceUpdate(ws)394if b.localAppTimeout != 0 {395b.workspaceMapChangeChan <- len(b.workspaces)396}397return398}399400b.workspaces[u.ID] = ws401b.Callbacks.InstanceUpdate(ws)402if b.localAppTimeout != 0 {403b.workspaceMapChangeChan <- len(b.workspaces)404}405}406407func generateSSHKeys(instanceID string) (privateKeyFN string, publicKey string, err error) {408privateKeyFN = filepath.Join(os.TempDir(), fmt.Sprintf("gitpod_%s_id_rsa", instanceID))409useRrandomFile := func() {410var tmpf *os.File411tmpf, err = ioutil.TempFile("", "gitpod_*_id_rsa")412if err != nil {413return414}415tmpf.Close()416privateKeyFN = tmpf.Name()417}418if stat, serr := os.Stat(privateKeyFN); serr == nil && stat.IsDir() {419useRrandomFile()420} else if serr == nil {421var publicKeyRaw []byte422publicKeyRaw, err = ioutil.ReadFile(privateKeyFN + ".pub")423publicKey = string(publicKeyRaw)424if err == nil {425// we've loaded a pre-existing key - all is well426return427}428429logrus.WithError(err).WithField("instance", instanceID).WithField("privateKeyFN", privateKeyFN).Warn("cannot load public SSH key for this workspace")430useRrandomFile()431}432433privateKey, err := rsa.GenerateKey(rand.Reader, 2048)434if err != nil {435return436}437err = privateKey.Validate()438if err != nil {439return440}441442privDER := x509.MarshalPKCS1PrivateKey(privateKey)443privBlock := pem.Block{444Type: "RSA PRIVATE KEY",445Headers: nil,446Bytes: privDER,447}448privatePEM := pem.EncodeToMemory(&privBlock)449err = ioutil.WriteFile(privateKeyFN, privatePEM, 0600)450if err != nil {451return452}453454publicRsaKey, err := ssh.NewPublicKey(&privateKey.PublicKey)455if err != nil {456return457}458publicKey = string(ssh.MarshalAuthorizedKey(publicRsaKey))459_ = ioutil.WriteFile(privateKeyFN+".pub", []byte(publicKey), 0644)460461return462}463464func (b *Bastion) connectTunnelClient(ctx context.Context, ws *Workspace) error {465if ws.URL == "" {466return xerrors.Errorf("IDE URL is empty")467}468if ws.OwnerToken == "" {469return xerrors.Errorf("owner token is empty")470}471if ws.tunnelClientConnected {472return xerrors.Errorf("tunnel: ssh client is already connected")473}474ws.tunnelClientConnected = true475476tunnelURL := ws.URL477tunnelURL = strings.ReplaceAll(tunnelURL, "https://", "wss://")478tunnelURL = strings.ReplaceAll(tunnelURL, "http://", "ws://")479tunnelURL += "/_supervisor/tunnel"480h := make(http.Header)481h.Set("x-gitpod-owner-token", ws.OwnerToken)482webSocket := gitpod.NewReconnectingWebsocket(tunnelURL, h, logrus.WithField("workspace", ws.WorkspaceID))483go webSocket.Dial(ctx)484go func() {485var (486client *TunnelClient487err error488)489defer func() {490ws.tunnelClientConnected = false491webSocket.Close()492if err != nil {493logrus.WithField("workspace", ws.WorkspaceID).WithError(err).Error("tunnel: failed to connect ssh client")494}495if client != nil {496logrus.WithField("workspace", ws.WorkspaceID).WithField("id", client.ID).Warn("tunnel: ssh client is permanently closed")497}498}()499client, closed, err := newTunnelClient(ctx, ws, webSocket)500for {501if err != nil {502return503}504select {505case <-ctx.Done():506return507case clientCh := <-ws.tunnelClient:508clientCh <- client509case <-closed:510client, closed, err = newTunnelClient(ctx, ws, webSocket)511}512}513}()514return nil515}516517func newTunnelClient(ctx context.Context, ws *Workspace, reconnecting *gitpod.ReconnectingWebsocket) (client *TunnelClient, closed chan struct{}, err error) {518logrus.WithField("workspace", ws.WorkspaceID).Info("tunnel: trying to connect ssh client...")519err = reconnecting.EnsureConnection(func(conn *gitpod.WebsocketConnection) (bool, error) {520id, err := uuid.NewRandom()521if err != nil {522return false, err523}524525sshConn, chans, reqs, err := ssh.NewClientConn(conn, "", &ssh.ClientConfig{526HostKeyCallback: ssh.InsecureIgnoreHostKey(),527})528if err != nil {529logrus.WithError(err).WithField("workspace", ws.WorkspaceID).Warn("tunnel: failed to connect ssh client, trying again...")530return true, err531}532logrus.WithField("workspace", ws.WorkspaceID).WithField("id", id).Info("tunnel: ssh client connected")533go func() {534conn.Wait()535sshConn.Close()536}()537go ssh.DiscardRequests(reqs)538go func() {539for newCh := range chans {540// TODO(ak) reverse tunneling541newCh.Reject(ssh.UnknownChannelType, "tunnel: reverse is not supported yet")542}543}()544closed = make(chan struct{}, 1)545go func() {546err := sshConn.Wait()547logrus.WithError(err).WithField("workspace", ws.WorkspaceID).WithField("id", id).Warn("tunnel: ssh client closed")548close(closed)549}()550client = &TunnelClient{551ID: id.String(),552Conn: sshConn,553}554return false, nil555})556return client, closed, err557}558559func (b *Bastion) establishTunnel(ctx context.Context, ws *Workspace, logprefix string, remotePort int, targetPort int, visibility supervisor.TunnelVisiblity) (*TunnelListener, error) {560if !ws.tunnelClientConnected {561return nil, xerrors.Errorf("tunnel client is not connected")562}563if visibility == supervisor.TunnelVisiblity_none {564return nil, xerrors.Errorf("tunnel visibility is none")565}566567targetHost := "127.0.0.1"568if visibility == supervisor.TunnelVisiblity_network {569targetHost = "0.0.0.0"570}571572netListener, err := net.Listen("tcp", targetHost+":"+strconv.Itoa(targetPort))573var localPort int574if err == nil {575localPort = netListener.(*net.TCPListener).Addr().(*net.TCPAddr).Port576} else {577netListener, err = net.Listen("tcp", targetHost+":0")578if err != nil {579return nil, err580}581localPort = netListener.(*net.TCPListener).Addr().(*net.TCPAddr).Port582}583logrus.WithField("workspace", ws.WorkspaceID).Info(logprefix + ": listening on " + netListener.Addr().String() + "...")584listenerCtx, cancel := context.WithCancel(ctx)585go func() {586<-listenerCtx.Done()587netListener.Close()588logrus.WithField("workspace", ws.WorkspaceID).Info(logprefix + ": closed")589}()590go func() {591for {592conn, err := netListener.Accept()593if listenerCtx.Err() != nil {594return595}596if err != nil {597logrus.WithError(err).WithField("workspace", ws.WorkspaceID).Warn(logprefix + ": failed to accept connection")598continue599}600logrus.WithField("workspace", ws.WorkspaceID).Debug(logprefix + ": accepted new connection")601go func() {602defer logrus.WithField("workspace", ws.WorkspaceID).Debug(logprefix + ": connection closed")603defer conn.Close()604605clientCh := make(chan *TunnelClient, 1)606select {607case <-listenerCtx.Done():608return609case ws.tunnelClient <- clientCh:610}611client := <-clientCh612613payload, err := proto.Marshal(&supervisor.TunnelPortRequest{614ClientId: client.ID,615Port: uint32(remotePort),616TargetPort: uint32(localPort),617})618if err != nil {619logrus.WithError(err).WithField("workspace", ws.WorkspaceID).WithField("id", client.ID).Error(logprefix + ": failed to marshal tunnel payload")620return621}622sshChan, reqs, err := client.Conn.OpenChannel("tunnel", payload)623if err != nil {624logrus.WithError(err).WithField("workspace", ws.WorkspaceID).WithField("id", client.ID).Warn(logprefix + ": failed to establish tunnel")625return626}627defer sshChan.Close()628go ssh.DiscardRequests(reqs)629630ctx, cancel := context.WithCancel(listenerCtx)631go func() {632_, _ = io.Copy(sshChan, conn)633cancel()634}()635go func() {636_, _ = io.Copy(conn, sshChan)637cancel()638}()639<-ctx.Done()640}()641}642}()643return &TunnelListener{644RemotePort: uint32(remotePort),645LocalAddr: netListener.Addr().String(),646LocalPort: uint32(localPort),647Visibility: visibility,648Ctx: listenerCtx,649Cancel: cancel,650}, nil651}652653func (b *Bastion) establishSSHTunnel(ws *Workspace) (listener *TunnelListener, err error) {654if ws.SSHPublicKey == "" {655return nil, xerrors.Errorf("no public key generated")656}657658err = installSSHAuthorizedKey(ws, ws.SSHPublicKey)659if err != nil {660return nil, xerrors.Errorf("cannot install authorized key: %w", err)661}662listener, err = b.establishTunnel(ws.ctx, ws, "ssh", 23001, 0, supervisor.TunnelVisiblity_host)663return listener, err664}665666func installSSHAuthorizedKey(ws *Workspace, key string) error {667ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)668defer cancel()669term := supervisor.NewTerminalServiceClient(ws.supervisorClient)670tres, err := term.Open(ctx, &supervisor.OpenTerminalRequest{Workdir: "/", Shell: "/bin/sh"})671if err != nil {672return err673}674//nolint:errcheck675defer term.Shutdown(ctx, &supervisor.ShutdownTerminalRequest{Alias: tres.Terminal.Alias})676677done := make(chan bool, 1)678recv, err := term.Listen(ctx, &supervisor.ListenTerminalRequest{Alias: tres.Terminal.Alias})679if err != nil {680return err681}682683go func() {684defer close(done)685for {686resp, err := recv.Recv()687if err != nil {688return689}690if resp.Output == nil {691continue692}693out, ok := resp.Output.(*supervisor.ListenTerminalResponse_Data)694if !ok {695continue696}697c := strings.TrimSpace(string(out.Data))698if strings.HasPrefix(c, "write done") {699done <- true700return701}702}703}()704_, err = term.Write(ctx, &supervisor.WriteTerminalRequest{705Alias: tres.Terminal.Alias,706Stdin: []byte(fmt.Sprintf("mkdir -p ~/.ssh; echo %s >> ~/.ssh/authorized_keys; echo write done\r\n", strings.TrimSpace(key))),707})708if err != nil {709return err710}711712// give the command some time to execute713select {714case <-ctx.Done():715return ctx.Err()716case success := <-done:717if !success {718return xerrors.Errorf("unable to upload SSH key")719}720}721722return nil723}724725func (b *Bastion) tunnelPorts(ws *Workspace) {726ws.tunnelMu.Lock()727if !ws.tunnelEnabled || ws.cancelTunnel != nil {728ws.tunnelMu.Unlock()729return730}731ctx, cancel := context.WithCancel(ws.ctx)732ws.cancelTunnel = cancel733ws.tunnelMu.Unlock()734735defer func() {736ws.tunnelMu.Lock()737defer ws.tunnelMu.Unlock()738739ws.cancelTunnel = nil740logrus.WithField("workspace", ws.WorkspaceID).Info("ports tunneling finished")741}()742743for {744logrus.WithField("workspace", ws.WorkspaceID).Info("tunneling ports...")745746err := b.doTunnelPorts(ctx, ws)747if ws.ctx.Err() != nil {748return749}750if err != nil {751logrus.WithError(err).WithField("workspace", ws.WorkspaceID).Warn("ports tunneling failed, retrying...")752}753select {754case <-ctx.Done():755return756case <-time.After(1 * time.Second):757}758}759}760761func (b *Bastion) doTunnelPorts(ctx context.Context, ws *Workspace) error {762ctx, cancel := context.WithCancel(ctx)763defer cancel()764765statusService := supervisor.NewStatusServiceClient(ws.supervisorClient)766status, err := statusService.PortsStatus(ctx, &supervisor.PortsStatusRequest{767Observe: true,768})769if err != nil {770return err771}772defer b.notify(ws)773defer func() {774ws.tunnelMu.Lock()775defer ws.tunnelMu.Unlock()776for port, t := range ws.tunnelListeners {777delete(ws.tunnelListeners, port)778t.Cancel()779}780}()781for {782resp, err := status.Recv()783if err != nil {784return err785}786ws.tunnelMu.Lock()787currentTunneled := make(map[uint32]struct{})788for _, port := range resp.Ports {789visibility := supervisor.TunnelVisiblity_none790if port.Tunneled != nil {791visibility = port.Tunneled.Visibility792}793listener, alreadyTunneled := ws.tunnelListeners[port.LocalPort]794if alreadyTunneled && listener.Visibility != visibility {795listener.Cancel()796delete(ws.tunnelListeners, port.LocalPort)797}798if visibility == supervisor.TunnelVisiblity_none {799continue800}801currentTunneled[port.LocalPort] = struct{}{}802_, alreadyTunneled = ws.tunnelListeners[port.LocalPort]803if alreadyTunneled {804continue805}806_, alreadyTunneled = port.Tunneled.Clients[b.id]807if alreadyTunneled {808continue809}810811logprefix := "tunnel[" + supervisor.TunnelVisiblity_name[int32(port.Tunneled.Visibility)] + ":" + strconv.Itoa(int(port.LocalPort)) + "]"812listener, err := b.establishTunnel(ws.ctx, ws, logprefix, int(port.LocalPort), int(port.Tunneled.TargetPort), port.Tunneled.Visibility)813if err != nil {814logrus.WithError(err).WithField("workspace", ws.WorkspaceID).WithField("port", port.LocalPort).Error("cannot establish port tunnel")815} else {816ws.tunnelListeners[port.LocalPort] = listener817}818}819for port, listener := range ws.tunnelListeners {820_, exists := currentTunneled[port]821if !exists {822delete(ws.tunnelListeners, port)823listener.Cancel()824}825}826ws.tunnelMu.Unlock()827b.notify(ws)828}829}830831func (b *Bastion) notify(ws *Workspace) {832b.subscriptionsMu.RLock()833defer b.subscriptionsMu.RUnlock()834var subs []*StatusSubscription835for sub := range b.subscriptions {836if sub.instanceID == ws.InstanceID {837subs = append(subs, sub)838}839}840if len(subs) <= 0 {841return842}843status := ws.Status()844for _, sub := range subs {845select {846case sub.updates <- status:847case <-time.After(5 * time.Second):848logrus.Error("ports subscription dropped out")849sub.Close()850}851}852}853854func (b *Bastion) Status(instanceID string) []*app.TunnelStatus {855ws, ok := b.getWorkspace(instanceID)856if !ok {857return nil858}859return ws.Status()860}861862func (b *Bastion) getWorkspace(instanceID string) (*Workspace, bool) {863b.workspacesMu.RLock()864defer b.workspacesMu.RUnlock()865ws, ok := b.workspaces[instanceID]866return ws, ok867}868869const maxStatusSubscriptions = 10870871func (b *Bastion) Subscribe(instanceID string) (*StatusSubscription, error) {872b.subscriptionsMu.Lock()873defer b.subscriptionsMu.Unlock()874875if b.ctx.Err() != nil {876return nil, ErrClosed877}878879if len(b.subscriptions) > maxStatusSubscriptions {880return nil, ErrTooManySubscriptions881}882883sub := &StatusSubscription{updates: make(chan []*app.TunnelStatus, 5), instanceID: instanceID}884var once sync.Once885sub.Close = func() error {886b.subscriptionsMu.Lock()887defer b.subscriptionsMu.Unlock()888889once.Do(func() {890close(sub.updates)891})892delete(b.subscriptions, sub)893894return nil895}896b.subscriptions[sub] = struct{}{}897898// makes sure that no updates can happen between clients receiving an initial status and subscribing899sub.updates <- b.Status(instanceID)900return sub, nil901}902903func (b *Bastion) AutoTunnel(instanceID string, enabled bool) {904ws, ok := b.getWorkspace(instanceID)905if !ok {906return907}908ws.tunnelMu.Lock()909defer ws.tunnelMu.Unlock()910if ws.tunnelEnabled == enabled {911return912}913ws.tunnelEnabled = enabled914if enabled {915if ws.cancelTunnel == nil && b.EnableAutoTunnel {916b.Update(ws.WorkspaceID)917}918} else if ws.cancelTunnel != nil {919ws.cancelTunnel()920}921}922923924