Path: blob/main/components/ws-manager-mk2/service/manager.go
2496 views
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.1// Licensed under the GNU Affero General Public License (AGPL).2// See License-AGPL.txt in the project root for license information.34package service56import (7"context"8"crypto/sha256"9"fmt"10"sort"11"strconv"12"strings"13"sync"14"time"1516validation "github.com/go-ozzo/ozzo-validation"17"github.com/opentracing/opentracing-go"18"github.com/prometheus/client_golang/prometheus"19"github.com/sirupsen/logrus"20"golang.org/x/xerrors"21"google.golang.org/grpc/codes"22"google.golang.org/grpc/peer"23"google.golang.org/grpc/status"24"google.golang.org/protobuf/proto"25"google.golang.org/protobuf/types/known/durationpb"26"google.golang.org/protobuf/types/known/timestamppb"27corev1 "k8s.io/api/core/v1"28"k8s.io/apimachinery/pkg/api/errors"29"k8s.io/apimachinery/pkg/api/resource"30metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"31"k8s.io/apimachinery/pkg/labels"32"k8s.io/apimachinery/pkg/selection"33"k8s.io/apimachinery/pkg/types"34"k8s.io/apimachinery/pkg/util/wait"35"k8s.io/client-go/util/retry"36"k8s.io/utils/pointer"37"sigs.k8s.io/controller-runtime/pkg/client"38"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"3940wsk8s "github.com/gitpod-io/gitpod/common-go/kubernetes"41"github.com/gitpod-io/gitpod/common-go/log"42"github.com/gitpod-io/gitpod/common-go/tracing"43"github.com/gitpod-io/gitpod/common-go/util"44csapi "github.com/gitpod-io/gitpod/content-service/api"45"github.com/gitpod-io/gitpod/ws-manager-mk2/pkg/activity"46"github.com/gitpod-io/gitpod/ws-manager-mk2/pkg/constants"47"github.com/gitpod-io/gitpod/ws-manager-mk2/pkg/maintenance"48wsmanapi "github.com/gitpod-io/gitpod/ws-manager/api"49"github.com/gitpod-io/gitpod/ws-manager/api/config"50workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1"51)5253const (54// stopWorkspaceNormallyGracePeriod is the grace period we use when stopping a pod with StopWorkspaceNormally policy55stopWorkspaceNormallyGracePeriod = 30 * time.Second56// stopWorkspaceImmediatelyGracePeriod is the grace period we use when stopping a pod as soon as possible57stopWorkspaceImmediatelyGracePeriod = 1 * time.Second58)5960var (61// retryParams are custom backoff parameters used to modify a workspace.62// These params retry more quickly than the default retry.DefaultBackoff.63retryParams = wait.Backoff{64Steps: 10,65Duration: 10 * time.Millisecond,66Factor: 2.0,67Jitter: 0.2,68}69)7071func NewWorkspaceManagerServer(clnt client.Client, cfg *config.Configuration, reg prometheus.Registerer, maintenance maintenance.Maintenance) *WorkspaceManagerServer {72metrics := newWorkspaceMetrics(cfg.Namespace, clnt)73reg.MustRegister(metrics)7475return &WorkspaceManagerServer{76Client: clnt,77Config: cfg,78metrics: metrics,79maintenance: maintenance,80subs: subscriptions{81subscribers: make(map[string]chan *wsmanapi.SubscribeResponse),82},83}84}8586type WorkspaceManagerServer struct {87Client client.Client88Config *config.Configuration89metrics *workspaceMetrics90maintenance maintenance.Maintenance9192subs subscriptions93wsmanapi.UnimplementedWorkspaceManagerServer94}9596// OnWorkspaceReconcile is called by the controller whenever it reconciles a workspace.97// This function then publishes to subscribers.98func (wsm *WorkspaceManagerServer) OnWorkspaceReconcile(ctx context.Context, ws *workspacev1.Workspace) {99wsm.subs.PublishToSubscribers(ctx, &wsmanapi.SubscribeResponse{100Status: wsm.extractWorkspaceStatus(ws),101})102}103104func (wsm *WorkspaceManagerServer) StartWorkspace(ctx context.Context, req *wsmanapi.StartWorkspaceRequest) (resp *wsmanapi.StartWorkspaceResponse, err error) {105owi := log.OWI(req.Metadata.Owner, req.Metadata.MetaId, req.Id)106span, ctx := tracing.FromContext(ctx, "StartWorkspace")107tracing.LogRequestSafe(span, req)108tracing.ApplyOWI(span, owi)109defer tracing.FinishSpan(span, &err)110111if wsm.maintenance.IsEnabled(ctx) {112return &wsmanapi.StartWorkspaceResponse{}, status.Error(codes.FailedPrecondition, "under maintenance")113}114115if err := validateStartWorkspaceRequest(req); err != nil {116return nil, err117}118119var workspaceType workspacev1.WorkspaceType120switch req.Type {121case wsmanapi.WorkspaceType_IMAGEBUILD:122workspaceType = workspacev1.WorkspaceTypeImageBuild123case wsmanapi.WorkspaceType_PREBUILD:124workspaceType = workspacev1.WorkspaceTypePrebuild125case wsmanapi.WorkspaceType_REGULAR:126workspaceType = workspacev1.WorkspaceTypeRegular127default:128return nil, status.Errorf(codes.InvalidArgument, "unsupported workspace type: %v", req.Type)129}130131var git *workspacev1.GitSpec132if req.Spec.Git != nil {133git = &workspacev1.GitSpec{134Username: req.Spec.Git.Username,135Email: req.Spec.Git.Email,136}137}138139timeout, err := parseTimeout(req.Spec.Timeout)140if err != nil {141return nil, status.Error(codes.InvalidArgument, err.Error())142}143144closedTimeout, err := parseTimeout(req.Spec.ClosedTimeout)145if err != nil {146return nil, status.Error(codes.InvalidArgument, err.Error())147}148149maximumLifetime, err := parseTimeout(req.Spec.MaximumLifetime)150if err != nil {151return nil, status.Error(codes.InvalidArgument, err.Error())152}153154var admissionLevel workspacev1.AdmissionLevel155switch req.Spec.Admission {156case wsmanapi.AdmissionLevel_ADMIT_EVERYONE:157admissionLevel = workspacev1.AdmissionLevelEveryone158case wsmanapi.AdmissionLevel_ADMIT_OWNER_ONLY:159admissionLevel = workspacev1.AdmissionLevelOwner160default:161return nil, status.Errorf(codes.InvalidArgument, "unsupported admission level: %v", req.Spec.Admission)162}163164ports := make([]workspacev1.PortSpec, 0, len(req.Spec.Ports))165for _, p := range req.Spec.Ports {166v := workspacev1.AdmissionLevelOwner167if p.Visibility == wsmanapi.PortVisibility_PORT_VISIBILITY_PUBLIC {168v = workspacev1.AdmissionLevelEveryone169}170protocol := workspacev1.PortProtocolHttp171if p.Protocol == wsmanapi.PortProtocol_PORT_PROTOCOL_HTTPS {172protocol = workspacev1.PortProtocolHttps173}174ports = append(ports, workspacev1.PortSpec{175Port: p.Port,176Visibility: v,177Protocol: protocol,178})179}180181var classID string182_, ok := wsm.Config.WorkspaceClasses[req.Spec.Class]183if !ok {184classID = config.DefaultWorkspaceClass185} else {186classID = req.Spec.Class187}188189class, ok := wsm.Config.WorkspaceClasses[classID]190if !ok {191return nil, status.Errorf(codes.InvalidArgument, "workspace class \"%s\" is unknown", req.Spec.Class)192}193194storage, err := class.Container.Limits.StorageQuantity()195if err != nil {196msg := fmt.Sprintf("workspace class %s has invalid storage quantity: %v", class.Name, err)197return nil, status.Errorf(codes.InvalidArgument, "%s", msg)198}199200annotations := make(map[string]string)201for k, v := range req.Metadata.Annotations {202annotations[k] = v203}204205limits := class.Container.Limits206if limits != nil && limits.CPU != nil {207if limits.CPU.MinLimit != "" {208annotations[wsk8s.WorkspaceCpuMinLimitAnnotation] = limits.CPU.MinLimit209}210211if limits.CPU.BurstLimit != "" {212annotations[wsk8s.WorkspaceCpuBurstLimitAnnotation] = limits.CPU.BurstLimit213}214}215216var sshGatewayCAPublicKey string217for _, feature := range req.Spec.FeatureFlags {218switch feature {219case wsmanapi.WorkspaceFeatureFlag_WORKSPACE_CONNECTION_LIMITING:220annotations[wsk8s.WorkspaceNetConnLimitAnnotation] = util.BooleanTrueString221case wsmanapi.WorkspaceFeatureFlag_WORKSPACE_PSI:222annotations[wsk8s.WorkspacePressureStallInfoAnnotation] = util.BooleanTrueString223case wsmanapi.WorkspaceFeatureFlag_SSH_CA:224sshGatewayCAPublicKey = wsm.Config.SSHGatewayCAPublicKey225}226}227228envSecretName := fmt.Sprintf("%s-%s", req.Id, "env")229userEnvVars, envData := extractWorkspaceUserEnv(envSecretName, req.Spec.Envvars, req.Spec.SysEnvvars)230sysEnvVars := extractWorkspaceSysEnv(req.Spec.SysEnvvars)231232tokenData := extractWorkspaceTokenData(req.Spec)233initializer, err := proto.Marshal(req.Spec.Initializer)234if err != nil {235return nil, status.Errorf(codes.InvalidArgument, "cannot serialise content initializer: %v", err)236}237238ws := workspacev1.Workspace{239TypeMeta: metav1.TypeMeta{240APIVersion: workspacev1.GroupVersion.String(),241Kind: "Workspace",242},243ObjectMeta: metav1.ObjectMeta{244Name: req.Id,245Annotations: annotations,246Namespace: wsm.Config.Namespace,247Labels: map[string]string{248wsk8s.WorkspaceIDLabel: req.Metadata.MetaId,249wsk8s.OwnerLabel: req.Metadata.Owner,250wsk8s.WorkspaceManagedByLabel: constants.ManagedBy,251},252},253Spec: workspacev1.WorkspaceSpec{254Ownership: workspacev1.Ownership{255Owner: req.Metadata.Owner,256WorkspaceID: req.Metadata.MetaId,257},258Type: workspaceType,259Class: classID,260Image: workspacev1.WorkspaceImages{261Workspace: workspacev1.WorkspaceImage{262Ref: pointer.String(req.Spec.WorkspaceImage),263},264IDE: workspacev1.IDEImages{265Web: req.Spec.IdeImage.WebRef,266Refs: req.Spec.IdeImageLayers,267Supervisor: req.Spec.IdeImage.SupervisorRef,268},269},270Initializer: initializer,271UserEnvVars: userEnvVars,272SysEnvVars: sysEnvVars,273WorkspaceLocation: req.Spec.WorkspaceLocation,274Git: git,275Timeout: workspacev1.TimeoutSpec{276Time: timeout,277ClosedTimeout: closedTimeout,278MaximumLifetime: maximumLifetime,279},280Admission: workspacev1.AdmissionSpec{281Level: admissionLevel,282},283Ports: ports,284SshPublicKeys: req.Spec.SshPublicKeys,285StorageQuota: int(storage.Value()),286SSHGatewayCAPublicKey: sshGatewayCAPublicKey,287},288}289controllerutil.AddFinalizer(&ws, workspacev1.GitpodFinalizerName)290291exists, err := wsm.workspaceExists(ctx, req.Metadata.MetaId)292if err != nil {293return nil, fmt.Errorf("cannot check if workspace %s exists: %w", req.Metadata.MetaId, err)294}295296if exists {297return nil, status.Errorf(codes.AlreadyExists, "workspace %s already exists", req.Metadata.MetaId)298}299300err = wsm.createWorkspaceSecret(ctx, &ws, envSecretName, wsm.Config.Namespace, envData)301if err != nil {302return nil, fmt.Errorf("cannot create env secret for workspace %s: %w", req.Id, err)303}304305err = wsm.createWorkspaceSecret(ctx, &ws, fmt.Sprintf("%s-%s", req.Id, "tokens"), wsm.Config.SecretsNamespace, tokenData)306if err != nil {307return nil, fmt.Errorf("cannot create token secret for workspace %s: %w", req.Id, err)308}309310wsm.metrics.recordWorkspaceStart(&ws)311err = wsm.Client.Create(ctx, &ws)312if err != nil {313log.WithError(err).WithFields(owi).Error("error creating workspace")314return nil, status.Errorf(codes.FailedPrecondition, "cannot create workspace")315}316317var wsr workspacev1.Workspace318err = wait.PollWithContext(ctx, 100*time.Millisecond, 30*time.Second, func(c context.Context) (done bool, err error) {319err = wsm.Client.Get(ctx, types.NamespacedName{Namespace: wsm.Config.Namespace, Name: ws.Name}, &wsr)320if err != nil {321return false, nil322}323324if wsr.Status.OwnerToken != "" && wsr.Status.URL != "" {325return true, nil326}327328return false, nil329})330if err != nil {331return nil, status.Errorf(codes.FailedPrecondition, "cannot wait for workspace URL: %q", err)332}333334return &wsmanapi.StartWorkspaceResponse{335Url: wsr.Status.URL,336OwnerToken: wsr.Status.OwnerToken,337}, nil338}339340func (wsm *WorkspaceManagerServer) workspaceExists(ctx context.Context, id string) (bool, error) {341var workspaces workspacev1.WorkspaceList342err := wsm.Client.List(ctx, &workspaces, client.MatchingLabels{wsk8s.WorkspaceIDLabel: id})343if err != nil {344return false, err345}346347for _, ws := range workspaces.Items {348if ws.Status.Phase != workspacev1.WorkspacePhaseStopped {349return true, nil350}351}352353return false, nil354}355356func isProtectedEnvVar(name string, sysEnvvars []*wsmanapi.EnvironmentVariable) bool {357switch name {358case "THEIA_SUPERVISOR_TOKENS":359return true360default:361if isGitpodInternalEnvVar(name) {362return false363}364for _, env := range sysEnvvars {365if env.Name == name {366return false367}368}369return true370}371}372373func isGitpodInternalEnvVar(name string) bool {374return strings.HasPrefix(name, "GITPOD_") ||375strings.HasPrefix(name, "SUPERVISOR_") ||376strings.HasPrefix(name, "BOB_") ||377strings.HasPrefix(name, "THEIA_") ||378name == "NODE_EXTRA_CA_CERTS" ||379name == "VSX_REGISTRY_URL"380}381382func (wsm *WorkspaceManagerServer) createWorkspaceSecret(ctx context.Context, owner client.Object, name, namespace string, data map[string]string) error {383secret := corev1.Secret{384ObjectMeta: metav1.ObjectMeta{385Name: name,386Namespace: namespace,387},388StringData: data,389}390391err := wsm.Client.Create(ctx, &secret)392if err != nil && !errors.IsAlreadyExists(err) {393return err394}395396return nil397}398399func (wsm *WorkspaceManagerServer) StopWorkspace(ctx context.Context, req *wsmanapi.StopWorkspaceRequest) (res *wsmanapi.StopWorkspaceResponse, err error) {400owi := log.OWI("", "", req.Id)401span, ctx := tracing.FromContext(ctx, "StopWorkspace")402tracing.LogRequestSafe(span, req)403tracing.ApplyOWI(span, owi)404defer tracing.FinishSpan(span, &err)405406if wsm.maintenance.IsEnabled(ctx) {407return &wsmanapi.StopWorkspaceResponse{}, status.Error(codes.FailedPrecondition, "under maintenance")408}409410gracePeriod := stopWorkspaceNormallyGracePeriod411if req.Policy == wsmanapi.StopWorkspacePolicy_IMMEDIATELY {412span.LogKV("policy", "immediately")413gracePeriod = stopWorkspaceImmediatelyGracePeriod414} else if req.Policy == wsmanapi.StopWorkspacePolicy_ABORT {415span.LogKV("policy", "abort")416gracePeriod = stopWorkspaceImmediatelyGracePeriod417if err = wsm.modifyWorkspace(ctx, req.Id, true, func(ws *workspacev1.Workspace) error {418ws.Status.SetCondition(workspacev1.NewWorkspaceConditionAborted("StopWorkspaceRequest"))419return nil420}); err != nil {421log.WithError(err).WithFields(owi).Error("failed to add Aborted condition to workspace")422}423}424err = wsm.modifyWorkspace(ctx, req.Id, true, func(ws *workspacev1.Workspace) error {425ws.Status.SetCondition(workspacev1.NewWorkspaceConditionStoppedByRequest(gracePeriod.String()))426return nil427})428// Ignore NotFound errors, workspace has already been stopped.429if err != nil && status.Code(err) != codes.NotFound {430return nil, err431}432return &wsmanapi.StopWorkspaceResponse{}, nil433}434435func (wsm *WorkspaceManagerServer) GetWorkspaces(ctx context.Context, req *wsmanapi.GetWorkspacesRequest) (*wsmanapi.GetWorkspacesResponse, error) {436labelSelector, err := metadataFilterToLabelSelector(req.MustMatch)437if err != nil {438return nil, status.Errorf(codes.FailedPrecondition, "cannot convert metadata filter: %v", err)439}440441var workspaces workspacev1.WorkspaceList442err = wsm.Client.List(ctx, &workspaces, &client.ListOptions{443LabelSelector: labelSelector,444})445if err != nil {446return nil, status.Errorf(codes.FailedPrecondition, "cannot list workspaces: %v", err)447}448449res := make([]*wsmanapi.WorkspaceStatus, 0, len(workspaces.Items))450for _, ws := range workspaces.Items {451if !matchesMetadataAnnotations(&ws, req.MustMatch) {452continue453}454455res = append(res, wsm.extractWorkspaceStatus(&ws))456}457458return &wsmanapi.GetWorkspacesResponse{Status: res}, nil459}460461func (wsm *WorkspaceManagerServer) DescribeWorkspace(ctx context.Context, req *wsmanapi.DescribeWorkspaceRequest) (*wsmanapi.DescribeWorkspaceResponse, error) {462var ws workspacev1.Workspace463err := wsm.Client.Get(ctx, types.NamespacedName{Namespace: wsm.Config.Namespace, Name: req.Id}, &ws)464if errors.IsNotFound(err) {465return nil, status.Errorf(codes.NotFound, "workspace %s not found", req.Id)466}467if err != nil {468return nil, status.Errorf(codes.Internal, "cannot lookup workspace: %v", err)469}470471result := &wsmanapi.DescribeWorkspaceResponse{472Status: wsm.extractWorkspaceStatus(&ws),473}474475lastActivity := activity.Last(&ws)476if lastActivity != nil {477result.LastActivity = lastActivity.UTC().Format(time.RFC3339Nano)478}479return result, nil480}481482// Subscribe streams all status updates to a client483func (m *WorkspaceManagerServer) Subscribe(req *wsmanapi.SubscribeRequest, srv wsmanapi.WorkspaceManager_SubscribeServer) (err error) {484var sub subscriber = srv485if req.MustMatch != nil {486sub = &filteringSubscriber{srv, req.MustMatch}487}488489return m.subs.Subscribe(srv.Context(), sub)490}491492// MarkActive records a workspace as being active which prevents it from timing out493func (wsm *WorkspaceManagerServer) MarkActive(ctx context.Context, req *wsmanapi.MarkActiveRequest) (res *wsmanapi.MarkActiveResponse, err error) {494span, ctx := tracing.FromContext(ctx, "MarkActive")495tracing.ApplyOWI(span, log.OWI("", "", req.Id))496defer tracing.FinishSpan(span, &err)497498workspaceID := req.Id499500var ws workspacev1.Workspace501err = wsm.Client.Get(ctx, types.NamespacedName{Namespace: wsm.Config.Namespace, Name: req.Id}, &ws)502if errors.IsNotFound(err) {503return nil, status.Errorf(codes.NotFound, "workspace %s does not exist", req.Id)504}505if err != nil {506return nil, status.Errorf(codes.Internal, "cannot mark workspace: %v", err)507}508509var firstUserActivity *timestamppb.Timestamp510if c := wsk8s.GetCondition(ws.Status.Conditions, string(workspacev1.WorkspaceConditionFirstUserActivity)); c != nil {511firstUserActivity = timestamppb.New(c.LastTransitionTime.Time)512}513514// if user already mark workspace as active and this request has IgnoreIfActive flag, just simple ignore it515if firstUserActivity != nil && req.IgnoreIfActive {516return &wsmanapi.MarkActiveResponse{}, nil517}518519now := time.Now().UTC()520lastActivityStatus := metav1.NewTime(now)521ws.Status.LastActivity = &lastActivityStatus522523err = wsm.modifyWorkspace(ctx, req.Id, true, func(ws *workspacev1.Workspace) error {524ws.Status.LastActivity = &lastActivityStatus525return nil526})527if err != nil {528log.WithError(err).WithFields(log.OWI("", "", workspaceID)).Warn("was unable to update status")529}530531// We do however maintain the "closed" flag as condition on the workspace. This flag should not change532// very often and provides a better UX if it persists across ws-manager restarts.533isMarkedClosed := ws.IsConditionTrue(workspacev1.WorkspaceConditionClosed)534if req.Closed && !isMarkedClosed {535err = wsm.modifyWorkspace(ctx, req.Id, true, func(ws *workspacev1.Workspace) error {536ws.Status.SetCondition(workspacev1.NewWorkspaceConditionClosed(metav1.ConditionTrue, "MarkActiveRequest"))537return nil538})539} else if !req.Closed && isMarkedClosed {540err = wsm.modifyWorkspace(ctx, req.Id, true, func(ws *workspacev1.Workspace) error {541ws.Status.SetCondition(workspacev1.NewWorkspaceConditionClosed(metav1.ConditionFalse, "MarkActiveRequest"))542return nil543})544}545if err != nil {546logFields := logrus.Fields{547"closed": req.Closed,548"isMarkedClosed": isMarkedClosed,549}550log.WithError(err).WithFields(log.OWI("", "", workspaceID)).WithFields(logFields).Warn("was unable to mark workspace properly")551}552553// If it's the first call: Mark the pod with FirstUserActivity condition.554if firstUserActivity == nil {555err := wsm.modifyWorkspace(ctx, req.Id, true, func(ws *workspacev1.Workspace) error {556ws.Status.SetCondition(workspacev1.NewWorkspaceConditionFirstUserActivity("MarkActiveRequest"))557return nil558})559if err != nil {560log.WithError(err).WithFields(log.OWI("", "", workspaceID)).Warn("was unable to set FirstUserActivity condition on workspace")561return nil, err562}563}564565return &wsmanapi.MarkActiveResponse{}, nil566}567568func (wsm *WorkspaceManagerServer) SetTimeout(ctx context.Context, req *wsmanapi.SetTimeoutRequest) (*wsmanapi.SetTimeoutResponse, error) {569duration, err := time.ParseDuration(req.Duration)570if err != nil {571return nil, status.Errorf(codes.InvalidArgument, "invalid duration: %v", err)572}573574if req.Type == wsmanapi.TimeoutType_WORKSPACE_TIMEOUT {575err = wsm.modifyWorkspace(ctx, req.Id, false, func(ws *workspacev1.Workspace) error {576ws.Spec.Timeout.Time = &metav1.Duration{Duration: duration}577ws.Spec.Timeout.ClosedTimeout = &metav1.Duration{Duration: time.Duration(0)}578return nil579})580} else if req.Type == wsmanapi.TimeoutType_CLOSED_TIMEOUT {581err = wsm.modifyWorkspace(ctx, req.Id, false, func(ws *workspacev1.Workspace) error {582ws.Spec.Timeout.ClosedTimeout = &metav1.Duration{Duration: duration}583return nil584})585}586if err != nil {587return nil, err588}589590return &wsmanapi.SetTimeoutResponse{}, nil591}592593func (wsm *WorkspaceManagerServer) ControlPort(ctx context.Context, req *wsmanapi.ControlPortRequest) (res *wsmanapi.ControlPortResponse, err error) {594span, ctx := tracing.FromContext(ctx, "ControlPort")595tracing.ApplyOWI(span, log.OWI("", "", req.Id))596defer tracing.FinishSpan(span, &err)597598if req.Spec == nil {599return nil, status.Errorf(codes.InvalidArgument, "missing spec")600}601602port := req.Spec.Port603err = wsm.modifyWorkspace(ctx, req.Id, false, func(ws *workspacev1.Workspace) error {604n := 0605for _, x := range ws.Spec.Ports {606if x.Port != port {607ws.Spec.Ports[n] = x608n++609}610}611ws.Spec.Ports = ws.Spec.Ports[:n]612613if req.Expose {614visibility := workspacev1.AdmissionLevelOwner615protocol := workspacev1.PortProtocolHttp616if req.Spec.Visibility == wsmanapi.PortVisibility_PORT_VISIBILITY_PUBLIC {617visibility = workspacev1.AdmissionLevelEveryone618}619if req.Spec.Protocol == wsmanapi.PortProtocol_PORT_PROTOCOL_HTTPS {620protocol = workspacev1.PortProtocolHttps621}622ws.Spec.Ports = append(ws.Spec.Ports, workspacev1.PortSpec{623Port: port,624Visibility: visibility,625Protocol: protocol,626})627}628629return nil630})631if err != nil {632return nil, err633}634return &wsmanapi.ControlPortResponse{}, nil635}636637func (wsm *WorkspaceManagerServer) TakeSnapshot(ctx context.Context, req *wsmanapi.TakeSnapshotRequest) (res *wsmanapi.TakeSnapshotResponse, err error) {638span, ctx := tracing.FromContext(ctx, "TakeSnapshot")639tracing.ApplyOWI(span, log.OWI("", "", req.Id))640defer tracing.FinishSpan(span, &err)641642if wsm.maintenance.IsEnabled(ctx) {643return &wsmanapi.TakeSnapshotResponse{}, status.Error(codes.FailedPrecondition, "under maintenance")644}645646var ws workspacev1.Workspace647err = wsm.Client.Get(ctx, types.NamespacedName{Namespace: wsm.Config.Namespace, Name: req.Id}, &ws)648if errors.IsNotFound(err) {649return nil, status.Errorf(codes.NotFound, "workspace %s not found", req.Id)650}651if err != nil {652return nil, status.Errorf(codes.Internal, "cannot lookup workspace: %v", err)653}654655if ws.Status.Phase != workspacev1.WorkspacePhaseRunning {656return nil, status.Errorf(codes.FailedPrecondition, "snapshots can only be taken of running workspaces, not %s workspaces", ws.Status.Phase)657}658659snapshot := workspacev1.Snapshot{660TypeMeta: metav1.TypeMeta{661APIVersion: workspacev1.GroupVersion.String(),662Kind: "Snapshot",663},664ObjectMeta: metav1.ObjectMeta{665Name: fmt.Sprintf("%s-%d", req.Id, time.Now().UnixNano()),666Namespace: wsm.Config.Namespace,667},668Spec: workspacev1.SnapshotSpec{669NodeName: ws.Status.Runtime.NodeName,670WorkspaceID: ws.Name,671},672}673674err = controllerutil.SetOwnerReference(&ws, &snapshot, wsm.Client.Scheme())675if err != nil {676return nil, status.Errorf(codes.Internal, "cannot set owner for snapshot: %q", err)677}678679err = wsm.Client.Create(ctx, &snapshot)680if err != nil {681return nil, status.Errorf(codes.Internal, "cannot create snapshot object: %q", err)682}683684var sso workspacev1.Snapshot685err = wait.PollWithContext(ctx, 100*time.Millisecond, 10*time.Second, func(c context.Context) (done bool, err error) {686err = wsm.Client.Get(ctx, types.NamespacedName{Namespace: wsm.Config.Namespace, Name: snapshot.Name}, &sso)687if err != nil {688return false, err689}690691if sso.Status.Error != "" {692return true, fmt.Errorf("%s", sso.Status.Error)693}694695if sso.Status.URL != "" {696return true, nil697}698699return false, nil700})701702if err != nil {703return nil, status.Errorf(codes.Internal, "cannot wait for snapshot URL: %v", err)704}705706if !req.ReturnImmediately {707err = wait.PollWithContext(ctx, 100*time.Millisecond, 0, func(c context.Context) (done bool, err error) {708err = wsm.Client.Get(ctx, types.NamespacedName{Namespace: wsm.Config.Namespace, Name: ws.Name}, &sso)709if err != nil {710return false, nil711}712713if sso.Status.Completed {714return true, nil715}716717return false, nil718})719720if err != nil {721return nil, status.Errorf(codes.Internal, "cannot wait for snapshot: %q", err)722}723724if sso.Status.Error != "" {725return nil, status.Errorf(codes.Internal, "cannot take snapshot: %q", sso.Status.Error)726}727}728729return &wsmanapi.TakeSnapshotResponse{730Url: sso.Status.URL,731}, nil732}733734func (wsm *WorkspaceManagerServer) ControlAdmission(ctx context.Context, req *wsmanapi.ControlAdmissionRequest) (*wsmanapi.ControlAdmissionResponse, error) {735err := wsm.modifyWorkspace(ctx, req.Id, false, func(ws *workspacev1.Workspace) error {736switch req.Level {737case wsmanapi.AdmissionLevel_ADMIT_EVERYONE:738ws.Spec.Admission.Level = workspacev1.AdmissionLevelEveryone739case wsmanapi.AdmissionLevel_ADMIT_OWNER_ONLY:740ws.Spec.Admission.Level = workspacev1.AdmissionLevelOwner741default:742return status.Errorf(codes.InvalidArgument, "unsupported admission level: %v", req.Level)743}744return nil745})746if err != nil {747return nil, err748}749return &wsmanapi.ControlAdmissionResponse{}, nil750}751752func (wsm *WorkspaceManagerServer) UpdateSSHKey(ctx context.Context, req *wsmanapi.UpdateSSHKeyRequest) (res *wsmanapi.UpdateSSHKeyResponse, err error) {753span, ctx := tracing.FromContext(ctx, "UpdateSSHKey")754tracing.ApplyOWI(span, log.OWI("", "", req.Id))755defer tracing.FinishSpan(span, &err)756757if err = validateUpdateSSHKeyRequest(req); err != nil {758return &wsmanapi.UpdateSSHKeyResponse{}, err759}760761err = wsm.modifyWorkspace(ctx, req.Id, false, func(ws *workspacev1.Workspace) error {762ws.Spec.SshPublicKeys = req.Keys763return nil764})765766return &wsmanapi.UpdateSSHKeyResponse{}, err767}768769func (wsm *WorkspaceManagerServer) DescribeCluster(ctx context.Context, req *wsmanapi.DescribeClusterRequest) (res *wsmanapi.DescribeClusterResponse, err error) {770//nolint:ineffassign771span, ctx := tracing.FromContext(ctx, "DescribeCluster")772defer tracing.FinishSpan(span, &err)773774classes := make([]*wsmanapi.WorkspaceClass, 0, len(wsm.Config.WorkspaceClasses))775for id, class := range wsm.Config.WorkspaceClasses {776var cpu, ram, disk resource.Quantity777desc := class.Description778if desc == "" {779if class.Container.Limits != nil {780cpu, _ = resource.ParseQuantity(class.Container.Limits.CPU.BurstLimit)781ram, _ = resource.ParseQuantity(class.Container.Limits.Memory)782disk, _ = resource.ParseQuantity(class.Container.Limits.Storage)783}784if cpu.Value() == 0 && class.Container.Requests != nil {785cpu, _ = resource.ParseQuantity(class.Container.Requests.CPU)786}787if ram.Value() == 0 && class.Container.Requests != nil {788ram, _ = resource.ParseQuantity(class.Container.Requests.Memory)789}790desc = fmt.Sprintf("%d vCPU, %dGB memory, %dGB disk", cpu.Value(), ram.ScaledValue(resource.Giga), disk.ScaledValue(resource.Giga))791}792classes = append(classes, &wsmanapi.WorkspaceClass{793Id: id,794DisplayName: class.Name,795Description: desc,796CreditsPerMinute: class.CreditsPerMinute,797})798}799sort.Slice(classes, func(i, j int) bool {800return classes[i].Id < classes[j].Id801})802803return &wsmanapi.DescribeClusterResponse{804WorkspaceClasses: classes,805PreferredWorkspaceClass: wsm.Config.PreferredWorkspaceClass,806}, nil807}808809// modifyWorkspace modifies a workspace object using the mod function. If the mod function returns a gRPC status error, that error810// is returned directly. If mod returns a non-gRPC error it is turned into one.811func (wsm *WorkspaceManagerServer) modifyWorkspace(ctx context.Context, id string, updateStatus bool, mod func(ws *workspacev1.Workspace) error) (err error) {812span, ctx := tracing.FromContext(ctx, "modifyWorkspace")813tracing.ApplyOWI(span, log.OWI("", "", id))814defer tracing.FinishSpan(span, &err)815816err = retry.RetryOnConflict(retryParams, func() (err error) {817span, ctx := tracing.FromContext(ctx, "modifyWorkspaceRetryFn")818defer tracing.FinishSpan(span, &err)819820var ws workspacev1.Workspace821err = wsm.Client.Get(ctx, types.NamespacedName{Namespace: wsm.Config.Namespace, Name: id}, &ws)822if err != nil {823return err824}825826err = mod(&ws)827if err != nil {828return err829}830831if updateStatus {832err = wsm.Client.Status().Update(ctx, &ws)833} else {834err = wsm.Client.Update(ctx, &ws)835836}837return err838})839if errors.IsNotFound(err) {840return status.Errorf(codes.NotFound, "workspace %s not found", id)841}842if c := status.Code(err); c != codes.Unknown && c != codes.OK {843return err844}845if err != nil {846return status.Errorf(codes.Internal, "cannot modify workspace: %v", err)847}848return nil849}850851// validateStartWorkspaceRequest ensures that acting on this request will not leave the system in an invalid state852func validateStartWorkspaceRequest(req *wsmanapi.StartWorkspaceRequest) error {853err := validation.ValidateStruct(req.Spec,854validation.Field(&req.Spec.WorkspaceImage, validation.Required),855validation.Field(&req.Spec.WorkspaceLocation, validation.Required),856validation.Field(&req.Spec.Ports, validation.By(areValidPorts)),857validation.Field(&req.Spec.Initializer, validation.Required),858validation.Field(&req.Spec.FeatureFlags, validation.By(areValidFeatureFlags)),859)860if err != nil {861return status.Errorf(codes.InvalidArgument, "invalid request: %v", err)862}863864rules := make([]*validation.FieldRules, 0)865rules = append(rules, validation.Field(&req.Id, validation.Required))866rules = append(rules, validation.Field(&req.Spec, validation.Required))867rules = append(rules, validation.Field(&req.Type, validation.By(isValidWorkspaceType)))868if req.Type == wsmanapi.WorkspaceType_REGULAR {869rules = append(rules, validation.Field(&req.ServicePrefix, validation.Required))870}871err = validation.ValidateStruct(req, rules...)872if err != nil {873return status.Errorf(codes.InvalidArgument, "invalid request: %v", err)874}875876return nil877}878879func validateUpdateSSHKeyRequest(req *wsmanapi.UpdateSSHKeyRequest) error {880err := validation.ValidateStruct(req,881validation.Field(&req.Id, validation.Required),882validation.Field(&req.Keys, validation.Required),883)884885if err != nil {886return status.Errorf(codes.InvalidArgument, "invalid request: %v", err)887}888889return nil890}891892func isValidWorkspaceType(value interface{}) error {893s, ok := value.(wsmanapi.WorkspaceType)894if !ok {895return xerrors.Errorf("value is not a workspace type")896}897898_, ok = wsmanapi.WorkspaceType_name[int32(s)]899if !ok {900return xerrors.Errorf("value %d is out of range", s)901}902903return nil904}905906func areValidPorts(value interface{}) error {907s, ok := value.([]*wsmanapi.PortSpec)908if !ok {909return xerrors.Errorf("value is not a port spec list")910}911912idx := make(map[uint32]struct{})913for _, p := range s {914if _, exists := idx[p.Port]; exists {915return xerrors.Errorf("port %d is not unique", p.Port)916}917idx[p.Port] = struct{}{}918919// TODO [cw]: probably the target should be unique as well.920// I don't want to introduce new issues with too921// tight validation though.922}923924return nil925}926927func areValidFeatureFlags(value interface{}) error {928s, ok := value.([]wsmanapi.WorkspaceFeatureFlag)929if !ok {930return xerrors.Errorf("value not a feature flag list")931}932933idx := make(map[wsmanapi.WorkspaceFeatureFlag]struct{}, len(s))934for _, k := range s {935idx[k] = struct{}{}936}937938return nil939}940941func extractWorkspaceUserEnv(secretName string, userEnvs, sysEnvs []*wsmanapi.EnvironmentVariable) ([]corev1.EnvVar, map[string]string) {942envVars := make([]corev1.EnvVar, 0, len(userEnvs))943secrets := make(map[string]string)944for _, e := range userEnvs {945switch {946case e.Secret != nil:947securedEnv := corev1.EnvVar{948Name: e.Name,949ValueFrom: &corev1.EnvVarSource{950SecretKeyRef: &corev1.SecretKeySelector{951LocalObjectReference: corev1.LocalObjectReference{Name: e.Secret.SecretName},952Key: e.Secret.Key,953},954},955}956957envVars = append(envVars, securedEnv)958959case e.Value == "":960continue961962case !isProtectedEnvVar(e.Name, sysEnvs):963unprotectedEnv := corev1.EnvVar{964Name: e.Name,965Value: e.Value,966}967968envVars = append(envVars, unprotectedEnv)969970default:971name := fmt.Sprintf("%x", sha256.Sum256([]byte(e.Name)))972protectedEnv := corev1.EnvVar{973Name: e.Name,974ValueFrom: &corev1.EnvVarSource{975SecretKeyRef: &corev1.SecretKeySelector{976LocalObjectReference: corev1.LocalObjectReference{Name: secretName},977Key: name,978},979},980}981982envVars = append(envVars, protectedEnv)983secrets[name] = e.Value984}985}986987return envVars, secrets988}989990func extractWorkspaceSysEnv(sysEnvs []*wsmanapi.EnvironmentVariable) []corev1.EnvVar {991envs := make([]corev1.EnvVar, 0, len(sysEnvs))992for _, e := range sysEnvs {993envs = append(envs, corev1.EnvVar{994Name: e.Name,995Value: e.Value,996})997}998999return envs1000}10011002func extractWorkspaceTokenData(spec *wsmanapi.StartWorkspaceSpec) map[string]string {1003secrets := make(map[string]string)1004for k, v := range csapi.ExtractAndReplaceSecretsFromInitializer(spec.Initializer) {1005secrets[k] = v1006}1007return secrets1008}10091010func (wsm *WorkspaceManagerServer) extractWorkspaceStatus(ws *workspacev1.Workspace) *wsmanapi.WorkspaceStatus {1011log := log.WithFields(log.OWI(ws.Spec.Ownership.Owner, ws.Spec.Ownership.WorkspaceID, ws.Name))1012version, _ := strconv.ParseUint(ws.ResourceVersion, 10, 64)10131014var tpe wsmanapi.WorkspaceType1015switch ws.Spec.Type {1016case workspacev1.WorkspaceTypeImageBuild:1017tpe = wsmanapi.WorkspaceType_IMAGEBUILD1018case workspacev1.WorkspaceTypePrebuild:1019tpe = wsmanapi.WorkspaceType_PREBUILD1020case workspacev1.WorkspaceTypeRegular:1021tpe = wsmanapi.WorkspaceType_REGULAR1022}10231024timeout := wsm.Config.Timeouts.RegularWorkspace.String()1025if ws.Spec.Timeout.Time != nil {1026timeout = ws.Spec.Timeout.Time.Duration.String()1027}10281029closedTimeout := wsm.Config.Timeouts.AfterClose.String()1030if ws.Spec.Timeout.ClosedTimeout != nil {1031closedTimeout = ws.Spec.Timeout.ClosedTimeout.Duration.String()1032}10331034var phase wsmanapi.WorkspacePhase1035switch ws.Status.Phase {1036case workspacev1.WorkspacePhasePending:1037phase = wsmanapi.WorkspacePhase_PENDING1038case workspacev1.WorkspacePhaseImageBuild:1039// TODO(cw): once we have an imagebuild phase on the protocol, map this properly1040phase = wsmanapi.WorkspacePhase_PENDING1041case workspacev1.WorkspacePhaseCreating:1042phase = wsmanapi.WorkspacePhase_CREATING1043case workspacev1.WorkspacePhaseInitializing:1044phase = wsmanapi.WorkspacePhase_INITIALIZING1045case workspacev1.WorkspacePhaseRunning:1046phase = wsmanapi.WorkspacePhase_RUNNING1047case workspacev1.WorkspacePhaseStopping:1048phase = wsmanapi.WorkspacePhase_STOPPING1049case workspacev1.WorkspacePhaseStopped:1050phase = wsmanapi.WorkspacePhase_STOPPED1051case workspacev1.WorkspacePhaseUnknown:1052phase = wsmanapi.WorkspacePhase_UNKNOWN1053}10541055var firstUserActivity *timestamppb.Timestamp1056for _, c := range ws.Status.Conditions {1057if c.Type == string(workspacev1.WorkspaceConditionFirstUserActivity) {1058firstUserActivity = timestamppb.New(c.LastTransitionTime.Time)1059}1060}10611062var runtime *wsmanapi.WorkspaceRuntimeInfo1063if rt := ws.Status.Runtime; rt != nil {1064runtime = &wsmanapi.WorkspaceRuntimeInfo{1065NodeName: rt.NodeName,1066NodeIp: rt.HostIP,1067PodName: rt.PodName,1068}1069}10701071var admissionLevel wsmanapi.AdmissionLevel1072switch ws.Spec.Admission.Level {1073case workspacev1.AdmissionLevelEveryone:1074admissionLevel = wsmanapi.AdmissionLevel_ADMIT_EVERYONE1075case workspacev1.AdmissionLevelOwner:1076admissionLevel = wsmanapi.AdmissionLevel_ADMIT_OWNER_ONLY1077}10781079ports := make([]*wsmanapi.PortSpec, 0, len(ws.Spec.Ports))1080for _, p := range ws.Spec.Ports {1081v := wsmanapi.PortVisibility_PORT_VISIBILITY_PRIVATE1082if p.Visibility == workspacev1.AdmissionLevelEveryone {1083v = wsmanapi.PortVisibility_PORT_VISIBILITY_PUBLIC1084}1085protocol := wsmanapi.PortProtocol_PORT_PROTOCOL_HTTP1086if p.Protocol == workspacev1.PortProtocolHttps {1087protocol = wsmanapi.PortProtocol_PORT_PROTOCOL_HTTPS1088}1089url, err := config.RenderWorkspacePortURL(wsm.Config.WorkspacePortURLTemplate, config.PortURLContext{1090Host: wsm.Config.GitpodHostURL,1091ID: ws.Name,1092IngressPort: fmt.Sprint(p.Port),1093Prefix: ws.Spec.Ownership.WorkspaceID,1094WorkspacePort: fmt.Sprint(p.Port),1095})1096if err != nil {1097log.WithError(err).WithField("port", p.Port).Error("cannot render public URL for port, excluding the port from the workspace status")1098continue1099}1100ports = append(ports, &wsmanapi.PortSpec{1101Port: p.Port,1102Visibility: v,1103Url: url,1104Protocol: protocol,1105})1106}11071108var metrics *wsmanapi.WorkspaceMetadata_Metrics1109if ws.Status.ImageInfo != nil {1110metrics = &wsmanapi.WorkspaceMetadata_Metrics{1111Image: &wsmanapi.WorkspaceMetadata_ImageInfo{1112TotalSize: ws.Status.ImageInfo.TotalSize,1113WorkspaceImageSize: ws.Status.ImageInfo.WorkspaceImageSize,1114},1115}1116}11171118var initializerMetrics *wsmanapi.InitializerMetrics1119if ws.Status.InitializerMetrics != nil {1120initializerMetrics = mapInitializerMetrics(ws.Status.InitializerMetrics)1121}11221123res := &wsmanapi.WorkspaceStatus{1124Id: ws.Name,1125StatusVersion: version,1126Metadata: &wsmanapi.WorkspaceMetadata{1127Owner: ws.Spec.Ownership.Owner,1128MetaId: ws.Spec.Ownership.WorkspaceID,1129StartedAt: timestamppb.New(ws.CreationTimestamp.Time),1130Annotations: ws.Annotations,1131Metrics: metrics,1132},1133Spec: &wsmanapi.WorkspaceSpec{1134Class: ws.Spec.Class,1135ExposedPorts: ports,1136WorkspaceImage: pointer.StringDeref(ws.Spec.Image.Workspace.Ref, ""),1137IdeImage: &wsmanapi.IDEImage{1138WebRef: ws.Spec.Image.IDE.Web,1139SupervisorRef: ws.Spec.Image.IDE.Supervisor,1140},1141IdeImageLayers: ws.Spec.Image.IDE.Refs,1142Headless: ws.IsHeadless(),1143Url: ws.Status.URL,1144Type: tpe,1145Timeout: timeout,1146ClosedTimeout: closedTimeout,1147},1148Phase: phase,1149Conditions: &wsmanapi.WorkspaceConditions{1150Failed: getConditionMessageIfTrue(ws.Status.Conditions, string(workspacev1.WorkspaceConditionFailed)),1151Timeout: getConditionMessageIfTrue(ws.Status.Conditions, string(workspacev1.WorkspaceConditionTimeout)),1152Snapshot: ws.Status.Snapshot,1153Deployed: convertCondition(ws.Status.Conditions, string(workspacev1.WorkspaceConditionDeployed)),1154FirstUserActivity: firstUserActivity,1155HeadlessTaskFailed: getConditionMessageIfTrue(ws.Status.Conditions, string(workspacev1.WorkspaceConditionsHeadlessTaskFailed)),1156StoppedByRequest: convertCondition(ws.Status.Conditions, string(workspacev1.WorkspaceConditionStoppedByRequest)),1157FinalBackupComplete: convertCondition(ws.Status.Conditions, string(workspacev1.WorkspaceConditionBackupComplete)),1158Aborted: convertCondition(ws.Status.Conditions, string(workspacev1.WorkspaceConditionAborted)),1159},1160Runtime: runtime,1161Auth: &wsmanapi.WorkspaceAuthentication{1162Admission: admissionLevel,1163OwnerToken: ws.Status.OwnerToken,1164},1165Repo: convertGitStatus(ws.Status.GitStatus),1166InitializerMetrics: initializerMetrics,1167}11681169return res1170}11711172func mapInitializerMetrics(in *workspacev1.InitializerMetrics) *wsmanapi.InitializerMetrics {1173result := &wsmanapi.InitializerMetrics{}1174// Convert Git metrics1175if in.Git != nil {1176result.Git = &wsmanapi.InitializerMetric{1177Duration: durationToProto(in.Git.Duration),1178Size: uint64(in.Git.Size),1179}1180}11811182// Convert FileDownload metrics1183if in.FileDownload != nil {1184result.FileDownload = &wsmanapi.InitializerMetric{1185Duration: durationToProto(in.FileDownload.Duration),1186Size: uint64(in.FileDownload.Size),1187}1188}11891190// Convert Snapshot metrics1191if in.Snapshot != nil {1192result.Snapshot = &wsmanapi.InitializerMetric{1193Duration: durationToProto(in.Snapshot.Duration),1194Size: uint64(in.Snapshot.Size),1195}1196}11971198// Convert Backup metrics1199if in.Backup != nil {1200result.Backup = &wsmanapi.InitializerMetric{1201Duration: durationToProto(in.Backup.Duration),1202Size: uint64(in.Backup.Size),1203}1204}12051206// Convert Prebuild metrics1207if in.Prebuild != nil {1208result.Prebuild = &wsmanapi.InitializerMetric{1209Duration: durationToProto(in.Prebuild.Duration),1210Size: uint64(in.Prebuild.Size),1211}1212}12131214// Convert Composite metrics1215if in.Composite != nil {1216result.Composite = &wsmanapi.InitializerMetric{1217Duration: durationToProto(in.Composite.Duration),1218Size: uint64(in.Composite.Size),1219}1220}12211222return result1223}12241225func durationToProto(d *metav1.Duration) *durationpb.Duration {1226if d == nil {1227return nil1228}1229return durationpb.New(d.Duration)1230}12311232func getConditionMessageIfTrue(conds []metav1.Condition, tpe string) string {1233for _, c := range conds {1234if c.Type == tpe && c.Status == metav1.ConditionTrue {1235return c.Message1236}1237}1238return ""1239}12401241func convertGitStatus(gs *workspacev1.GitStatus) *csapi.GitStatus {1242if gs == nil {1243return nil1244}1245return &csapi.GitStatus{1246Branch: gs.Branch,1247LatestCommit: gs.LatestCommit,1248UncommitedFiles: gs.UncommitedFiles,1249TotalUncommitedFiles: gs.TotalUncommitedFiles,1250UntrackedFiles: gs.UntrackedFiles,1251TotalUntrackedFiles: gs.TotalUntrackedFiles,1252UnpushedCommits: gs.UnpushedCommits,1253TotalUnpushedCommits: gs.TotalUnpushedCommits,1254}1255}12561257func convertCondition(conds []metav1.Condition, tpe string) wsmanapi.WorkspaceConditionBool {1258res := wsk8s.GetCondition(conds, tpe)1259if res == nil {1260return wsmanapi.WorkspaceConditionBool_FALSE1261}12621263switch res.Status {1264case metav1.ConditionTrue:1265return wsmanapi.WorkspaceConditionBool_TRUE1266default:1267return wsmanapi.WorkspaceConditionBool_FALSE1268}1269}12701271func matchesMetadataAnnotations(ws *workspacev1.Workspace, filter *wsmanapi.MetadataFilter) bool {1272if filter == nil {1273return true1274}1275for k, v := range filter.Annotations {1276av, ok := ws.Annotations[k]1277if !ok || av != v {1278return false1279}1280}1281return true1282}12831284func metadataFilterToLabelSelector(filter *wsmanapi.MetadataFilter) (labels.Selector, error) {1285if filter == nil {1286return nil, nil1287}12881289res := labels.NewSelector()1290if filter.MetaId != "" {1291req, err := labels.NewRequirement(wsk8s.WorkspaceIDLabel, selection.Equals, []string{filter.MetaId})1292if err != nil {1293return nil, xerrors.Errorf("cannot create metaID filter: %w", err)1294}1295res.Add(*req)1296}1297if filter.Owner != "" {1298req, err := labels.NewRequirement(wsk8s.OwnerLabel, selection.Equals, []string{filter.Owner})1299if err != nil {1300return nil, xerrors.Errorf("cannot create owner filter: %w", err)1301}1302res.Add(*req)1303}1304return res, nil1305}13061307func parseTimeout(timeout string) (*metav1.Duration, error) {1308var duration *metav1.Duration1309if timeout != "" {1310d, err := time.ParseDuration(timeout)1311if err != nil {1312return nil, fmt.Errorf("invalid timeout: %v", err)1313}1314duration = &metav1.Duration{Duration: d}1315}13161317return duration, nil1318}13191320type filteringSubscriber struct {1321Sub subscriber1322Filter *wsmanapi.MetadataFilter1323}13241325func matchesMetadataFilter(filter *wsmanapi.MetadataFilter, md *wsmanapi.WorkspaceMetadata) bool {1326if filter == nil {1327return true1328}13291330if filter.MetaId != "" && filter.MetaId != md.MetaId {1331return false1332}1333if filter.Owner != "" && filter.Owner != md.Owner {1334return false1335}1336for k, v := range filter.Annotations {1337av, ok := md.Annotations[k]1338if !ok || av != v {1339return false1340}1341}1342return true1343}13441345func (f *filteringSubscriber) Send(resp *wsmanapi.SubscribeResponse) error {1346var md *wsmanapi.WorkspaceMetadata1347if sts := resp.GetStatus(); sts != nil {1348md = sts.Metadata1349}1350if md == nil {1351// no metadata, no forwarding1352return nil1353}1354if !matchesMetadataFilter(f.Filter, md) {1355return nil1356}13571358return f.Sub.Send(resp)1359}13601361type subscriber interface {1362Send(*wsmanapi.SubscribeResponse) error1363}13641365type subscriptions struct {1366mu sync.RWMutex1367subscribers map[string]chan *wsmanapi.SubscribeResponse1368}13691370func (subs *subscriptions) Subscribe(ctx context.Context, recv subscriber) (err error) {1371incoming := make(chan *wsmanapi.SubscribeResponse, 250)13721373var key string1374peer, ok := peer.FromContext(ctx)1375if ok {1376key = fmt.Sprintf("k%s@%d", peer.Addr.String(), time.Now().UnixNano())1377}13781379subs.mu.Lock()1380if key == "" {1381// if for some reason we didn't get peer information,1382// we must generate they key within the lock, otherwise we might end up with duplicate keys1383key = fmt.Sprintf("k%d@%d", len(subs.subscribers), time.Now().UnixNano())1384}1385subs.subscribers[key] = incoming1386log.WithField("subscriberKey", key).WithField("subscriberCount", len(subs.subscribers)).Info("new subscriber")1387subs.mu.Unlock()13881389defer func() {1390subs.mu.Lock()1391delete(subs.subscribers, key)1392subs.mu.Unlock()1393}()13941395for {1396var inc *wsmanapi.SubscribeResponse1397select {1398case <-ctx.Done():1399return ctx.Err()1400case inc = <-incoming:1401}14021403if inc == nil {1404log.WithField("subscriberKey", key).Warn("subscription was canceled")1405return xerrors.Errorf("subscription was canceled")1406}14071408err = recv.Send(inc)1409if err != nil {1410log.WithField("subscriberKey", key).WithError(err).Error("cannot send update - dropping subscriber")1411return err1412}1413}1414}14151416func (subs *subscriptions) PublishToSubscribers(ctx context.Context, update *wsmanapi.SubscribeResponse) {1417subs.mu.RLock()1418var dropouts []string1419for k, sub := range subs.subscribers {1420select {1421case sub <- update:1422// all is well1423default:1424// writing to subscriber cannel blocked, which means the subscriber isn't consuming fast enough and1425// would block others. We'll drop this consumer later (do not drop here to avoid concurrency issues).1426dropouts = append(dropouts, k)1427}1428}1429// we cannot defer this call as dropSubscriber will attempt to acquire a write lock1430subs.mu.RUnlock()14311432// we check if there are any dropouts here to avoid the non-inlinable dropSubscriber call.1433if len(dropouts) > 0 {1434subs.DropSubscriber(dropouts)1435}1436}14371438func (subs *subscriptions) DropSubscriber(dropouts []string) {1439defer func() {1440err := recover()1441if err != nil {1442log.WithField("error", err).Error("caught panic in dropSubscriber")1443}1444}()14451446subs.mu.Lock()1447defer subs.mu.Unlock()14481449for _, k := range dropouts {1450sub, ok := subs.subscribers[k]1451if !ok {1452continue1453}14541455log.WithField("subscriber", k).WithField("subscriberCount", len(subs.subscribers)).Warn("subscriber channel was full - dropping subscriber")1456// despite closing the subscriber channel, the subscriber's serve Go routine will still try to send1457// all prior updates up to this point. See https://play.golang.org/p/XR-9nLrQLQs1458close(sub)1459delete(subs.subscribers, k)1460}1461}14621463// onChange is the default OnChange implementation which publishes workspace status updates to subscribers1464func (subs *subscriptions) OnChange(ctx context.Context, status *wsmanapi.WorkspaceStatus) {1465log := log.WithFields(log.OWI(status.Metadata.Owner, status.Metadata.MetaId, status.Id))14661467header := make(map[string]string)1468span := opentracing.SpanFromContext(ctx)1469if span != nil {1470tracingHeader := make(opentracing.HTTPHeadersCarrier)1471err := opentracing.GlobalTracer().Inject(span.Context(), opentracing.HTTPHeaders, tracingHeader)1472if err != nil {1473// if the error was caused by the span coming from the Noop tracer - ignore it.1474// This can happen if the workspace doesn't have a span associated with it, then we resort to creating Noop spans.1475if _, isNoopTracer := span.Tracer().(opentracing.NoopTracer); !isNoopTracer {1476log.WithError(err).Debug("unable to extract tracing information - trace will be broken")1477}1478} else {1479for k, v := range tracingHeader {1480if len(v) != 1 {1481continue1482}1483header[k] = v[0]1484}1485}1486}14871488subs.PublishToSubscribers(ctx, &wsmanapi.SubscribeResponse{1489Status: status,1490Header: header,1491})14921493// subs.metrics.OnChange(status)14941495// There are some conditions we'd like to get notified about, for example while running experiements or because1496// they represent out-of-the-ordinary situations.1497// We attempt to use the GCP Error Reporting for this, hence log these situations as errors.1498if status.Conditions.Failed != "" {1499log.WithField("status", status).Error("workspace failed")1500}1501if status.Phase == 0 {1502log.WithField("status", status).Error("workspace in UNKNOWN phase")1503}1504}15051506type workspaceMetrics struct {1507totalStartsCounterVec *prometheus.CounterVec1508}15091510func newWorkspaceMetrics(namespace string, k8s client.Client) *workspaceMetrics {1511return &workspaceMetrics{1512totalStartsCounterVec: prometheus.NewCounterVec(prometheus.CounterOpts{1513Namespace: "gitpod",1514Subsystem: "ws_manager_mk2",1515Name: "workspace_starts_total",1516Help: "total number of workspaces started",1517}, []string{"type", "class"}),1518}1519}15201521func (m *workspaceMetrics) recordWorkspaceStart(ws *workspacev1.Workspace) {1522tpe := string(ws.Spec.Type)1523class := ws.Spec.Class15241525counter, err := m.totalStartsCounterVec.GetMetricWithLabelValues(tpe, class)1526if err != nil {1527log.WithError(err).WithField("type", tpe).WithField("class", class)1528}1529counter.Inc()1530}15311532// Describe implements Collector. It will send exactly one Desc to the provided channel.1533func (m *workspaceMetrics) Describe(ch chan<- *prometheus.Desc) {1534m.totalStartsCounterVec.Describe(ch)1535}15361537// Collect implements Collector.1538func (m *workspaceMetrics) Collect(ch chan<- prometheus.Metric) {1539m.totalStartsCounterVec.Collect(ch)1540}154115421543