Path: blob/main/components/ws-proxy/pkg/proxy/infoprovider.go
2500 views
// Copyright (c) 2020 Gitpod GmbH. All rights reserved.1// Licensed under the GNU Affero General Public License (AGPL).2// See License.AGPL.txt in the project root for license information.34package proxy56import (7"context"8"net/url"9"sort"10"strconv"1112"golang.org/x/xerrors"13"k8s.io/apimachinery/pkg/api/errors"14"k8s.io/apimachinery/pkg/runtime"15"k8s.io/apimachinery/pkg/util/uuid"16"k8s.io/client-go/tools/cache"17ctrl "sigs.k8s.io/controller-runtime"18"sigs.k8s.io/controller-runtime/pkg/client"19"sigs.k8s.io/controller-runtime/pkg/predicate"20"sigs.k8s.io/controller-runtime/pkg/reconcile"2122wsk8s "github.com/gitpod-io/gitpod/common-go/kubernetes"23"github.com/gitpod-io/gitpod/common-go/log"24"github.com/gitpod-io/gitpod/ws-manager/api"25wsapi "github.com/gitpod-io/gitpod/ws-manager/api"26workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1"27"github.com/gitpod-io/gitpod/ws-proxy/pkg/common"28)2930const (31workspaceIndex = "workspaceIndex"32ipAddressIndex = "ipAddressIndex"33)3435// getPortStr extracts the port part from a given URL string. Returns "" if parsing fails or port is not specified.36func getPortStr(urlStr string) string {37portURL, err := url.Parse(urlStr)38if err != nil {39log.WithField("url", urlStr).WithError(err).Error("error parsing URL while getting URL port")40return ""41}42if portURL.Port() == "" {43switch scheme := portURL.Scheme; scheme {44case "http":45return "80"46case "https":47return "443"48}49}5051return portURL.Port()52}5354type ConnectionContext struct {55WorkspaceID string56Port string57UUID string58CancelFunc context.CancelCauseFunc59}6061type CRDWorkspaceInfoProvider struct {62client.Client63Scheme *runtime.Scheme6465store cache.ThreadSafeStore66contextStore cache.ThreadSafeStore67}6869// NewCRDWorkspaceInfoProvider creates a fresh WorkspaceInfoProvider.70func NewCRDWorkspaceInfoProvider(client client.Client, scheme *runtime.Scheme) (*CRDWorkspaceInfoProvider, error) {71// create custom indexer for searches72indexers := cache.Indexers{73workspaceIndex: func(obj interface{}) ([]string, error) {74if workspaceInfo, ok := obj.(*common.WorkspaceInfo); ok {75return []string{workspaceInfo.WorkspaceID}, nil76}7778return nil, xerrors.Errorf("object is not a WorkspaceInfo")79},80ipAddressIndex: func(obj interface{}) ([]string, error) {81if workspaceInfo, ok := obj.(*common.WorkspaceInfo); ok {82if workspaceInfo.IPAddress == "" {83return nil, nil84}85return []string{workspaceInfo.IPAddress}, nil86}87return nil, xerrors.Errorf("object is not a WorkspaceInfo")88},89}90contextIndexers := cache.Indexers{91workspaceIndex: func(obj interface{}) ([]string, error) {92if connCtx, ok := obj.(*ConnectionContext); ok {93return []string{connCtx.WorkspaceID}, nil94}95return nil, xerrors.Errorf("object is not a ConnectionContext")96},97}9899return &CRDWorkspaceInfoProvider{100Client: client,101Scheme: scheme,102103store: cache.NewThreadSafeStore(indexers, cache.Indices{}),104contextStore: cache.NewThreadSafeStore(contextIndexers, cache.Indices{}),105}, nil106}107108// WorkspaceInfo returns the WorkspaceInfo for the given workspaceID.109// It performs validation to ensure the workspace is unique and properly associated with its IP address.110func (r *CRDWorkspaceInfoProvider) WorkspaceInfo(workspaceID string) *common.WorkspaceInfo {111workspaces, err := r.store.ByIndex(workspaceIndex, workspaceID)112if err != nil {113return nil114}115116if len(workspaces) == 0 {117return nil118}119120if len(workspaces) > 1 {121log.WithField("workspaceID", workspaceID).WithField("instanceCount", len(workspaces)).Warn("multiple workspace instances found")122}123124sort.Slice(workspaces, func(i, j int) bool {125a := workspaces[i].(*common.WorkspaceInfo)126b := workspaces[j].(*common.WorkspaceInfo)127return a.StartedAt.After(b.StartedAt)128})129130wsInfo := workspaces[0].(*common.WorkspaceInfo)131132if wsInfo.IPAddress == "" {133return wsInfo134}135136if conflict, err := r.validateIPAddressConflict(workspaceID, wsInfo.IPAddress); conflict || err != nil {137return nil138}139140return wsInfo141}142143func (r *CRDWorkspaceInfoProvider) validateIPAddressConflict(workspaceID, ipAddress string) (bool, error) {144wsInfos, err := r.workspacesInfoByIPAddress(ipAddress)145if err != nil {146log.WithError(err).WithField("workspaceID", workspaceID).WithField("ipAddress", ipAddress).Error("failed to get workspaces by IP address")147return true, err148}149150if len(wsInfos) > 1 {151log.WithField("workspaceID", workspaceID).WithField("ipAddress", ipAddress).WithField("workspaceCount", len(wsInfos)).Warn("multiple workspaces found for IP address")152return true, nil153}154155if len(wsInfos) == 1 && wsInfos[0].WorkspaceID != workspaceID {156log.WithField("workspaceID", workspaceID).WithField("ipAddress", ipAddress).WithField("foundWorkspaceID", wsInfos[0].WorkspaceID).Warn("workspace IP address conflict detected")157return true, nil158}159160return false, nil161}162163func (r *CRDWorkspaceInfoProvider) workspacesInfoByIPAddress(ipAddress string) ([]*common.WorkspaceInfo, error) {164workspaces := make([]*common.WorkspaceInfo, 0)165166objs, err := r.store.ByIndex(ipAddressIndex, ipAddress)167if err != nil {168return nil, err169}170for _, w := range objs {171workspaces = append(workspaces, w.(*common.WorkspaceInfo))172}173174return workspaces, nil175}176177func (r *CRDWorkspaceInfoProvider) AcquireContext(ctx context.Context, workspaceID string, port string) (context.Context, string, error) {178ws := r.WorkspaceInfo(workspaceID)179if ws == nil {180return ctx, "", xerrors.Errorf("workspace %s not found", workspaceID)181}182id := string(uuid.NewUUID())183ctx, cancel := context.WithCancelCause(ctx)184connCtx := &ConnectionContext{185WorkspaceID: workspaceID,186Port: port,187CancelFunc: cancel,188UUID: id,189}190191r.contextStore.Add(id, connCtx)192return ctx, id, nil193}194195func (r *CRDWorkspaceInfoProvider) ReleaseContext(id string) {196r.contextStore.Delete(id)197}198199func (r *CRDWorkspaceInfoProvider) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {200var ws workspacev1.Workspace201err := r.Client.Get(context.Background(), req.NamespacedName, &ws)202if errors.IsNotFound(err) {203// workspace is gone - that's ok204r.store.Delete(req.Name)205log.WithField("workspacepod", req.Name).Debug("removing workspace from store")206207return reconcile.Result{}, nil208}209210var podIP string211if ws.Status.Runtime != nil {212podIP = ws.Status.Runtime.PodIP213}214215ports := make([]*wsapi.PortSpec, 0, len(ws.Spec.Ports))216for _, p := range ws.Spec.Ports {217v := wsapi.PortVisibility_PORT_VISIBILITY_PRIVATE218protocol := wsapi.PortProtocol_PORT_PROTOCOL_HTTP219if p.Visibility == workspacev1.AdmissionLevelEveryone {220v = wsapi.PortVisibility_PORT_VISIBILITY_PUBLIC221}222if p.Protocol == workspacev1.PortProtocolHttps {223protocol = wsapi.PortProtocol_PORT_PROTOCOL_HTTPS224}225ports = append(ports, &wsapi.PortSpec{226Port: p.Port,227Visibility: v,228Protocol: protocol,229})230}231232admission := wsapi.AdmissionLevel_ADMIT_OWNER_ONLY233if ws.Spec.Admission.Level == workspacev1.AdmissionLevelEveryone {234admission = wsapi.AdmissionLevel_ADMIT_EVERYONE235}236managedByMk2 := true237if managedBy, ok := ws.Labels[wsk8s.WorkspaceManagedByLabel]; ok && managedBy != "ws-manager-mk2" {238managedByMk2 = false239}240241wsinfo := &common.WorkspaceInfo{242WorkspaceID: ws.Spec.Ownership.WorkspaceID,243InstanceID: ws.Name,244URL: ws.Status.URL,245IDEImage: ws.Spec.Image.IDE.Web,246SupervisorImage: ws.Spec.Image.IDE.Supervisor,247IDEPublicPort: getPortStr(ws.Status.URL),248IPAddress: podIP,249Ports: ports,250Auth: &wsapi.WorkspaceAuthentication{Admission: admission, OwnerToken: ws.Status.OwnerToken},251StartedAt: ws.CreationTimestamp.Time,252OwnerUserId: ws.Spec.Ownership.Owner,253SSHPublicKeys: ws.Spec.SshPublicKeys,254IsRunning: ws.Status.Phase == workspacev1.WorkspacePhaseRunning,255IsEnabledSSHCA: ws.Spec.SSHGatewayCAPublicKey != "",256IsManagedByMk2: managedByMk2,257}258259r.store.Update(req.Name, wsinfo)260r.invalidateConnectionContext(wsinfo)261log.WithField("workspace", req.Name).WithField("details", wsinfo).Debug("adding/updating workspace details")262263return ctrl.Result{}, nil264}265266func (r *CRDWorkspaceInfoProvider) invalidateConnectionContext(ws *common.WorkspaceInfo) {267connCtxs, err := r.contextStore.ByIndex(workspaceIndex, ws.WorkspaceID)268if err != nil {269return270}271if len(connCtxs) == 0 {272return273}274275if ws.Auth != nil && ws.Auth.Admission == wsapi.AdmissionLevel_ADMIT_EVERYONE {276return277}278publicPorts := make(map[string]struct{})279for _, p := range ws.Ports {280if p.Visibility == api.PortVisibility_PORT_VISIBILITY_PUBLIC {281publicPorts[strconv.FormatUint(uint64(p.Port), 10)] = struct{}{}282}283}284285for _, _connCtx := range connCtxs {286connCtx, ok := _connCtx.(*ConnectionContext)287if !ok {288continue289}290if _, ok := publicPorts[connCtx.Port]; ok {291continue292}293connCtx.CancelFunc(xerrors.Errorf("workspace %s is no longer public", ws.WorkspaceID))294r.contextStore.Delete(connCtx.UUID)295}296}297298// SetupWithManager sets up the controller with the Manager.299func (r *CRDWorkspaceInfoProvider) SetupWithManager(mgr ctrl.Manager) error {300return ctrl.NewControllerManagedBy(mgr).301Named("workspacecrd").302WithEventFilter(predicate.ResourceVersionChangedPredicate{}).303For(304&workspacev1.Workspace{},305).306Complete(r)307}308309310