Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/ws-manager-mk2/service/manager.go
2496 views
1
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License-AGPL.txt in the project root for license information.
4
5
package service
6
7
import (
8
"context"
9
"crypto/sha256"
10
"fmt"
11
"sort"
12
"strconv"
13
"strings"
14
"sync"
15
"time"
16
17
validation "github.com/go-ozzo/ozzo-validation"
18
"github.com/opentracing/opentracing-go"
19
"github.com/prometheus/client_golang/prometheus"
20
"github.com/sirupsen/logrus"
21
"golang.org/x/xerrors"
22
"google.golang.org/grpc/codes"
23
"google.golang.org/grpc/peer"
24
"google.golang.org/grpc/status"
25
"google.golang.org/protobuf/proto"
26
"google.golang.org/protobuf/types/known/durationpb"
27
"google.golang.org/protobuf/types/known/timestamppb"
28
corev1 "k8s.io/api/core/v1"
29
"k8s.io/apimachinery/pkg/api/errors"
30
"k8s.io/apimachinery/pkg/api/resource"
31
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32
"k8s.io/apimachinery/pkg/labels"
33
"k8s.io/apimachinery/pkg/selection"
34
"k8s.io/apimachinery/pkg/types"
35
"k8s.io/apimachinery/pkg/util/wait"
36
"k8s.io/client-go/util/retry"
37
"k8s.io/utils/pointer"
38
"sigs.k8s.io/controller-runtime/pkg/client"
39
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
40
41
wsk8s "github.com/gitpod-io/gitpod/common-go/kubernetes"
42
"github.com/gitpod-io/gitpod/common-go/log"
43
"github.com/gitpod-io/gitpod/common-go/tracing"
44
"github.com/gitpod-io/gitpod/common-go/util"
45
csapi "github.com/gitpod-io/gitpod/content-service/api"
46
"github.com/gitpod-io/gitpod/ws-manager-mk2/pkg/activity"
47
"github.com/gitpod-io/gitpod/ws-manager-mk2/pkg/constants"
48
"github.com/gitpod-io/gitpod/ws-manager-mk2/pkg/maintenance"
49
wsmanapi "github.com/gitpod-io/gitpod/ws-manager/api"
50
"github.com/gitpod-io/gitpod/ws-manager/api/config"
51
workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1"
52
)
53
54
const (
55
// stopWorkspaceNormallyGracePeriod is the grace period we use when stopping a pod with StopWorkspaceNormally policy
56
stopWorkspaceNormallyGracePeriod = 30 * time.Second
57
// stopWorkspaceImmediatelyGracePeriod is the grace period we use when stopping a pod as soon as possible
58
stopWorkspaceImmediatelyGracePeriod = 1 * time.Second
59
)
60
61
var (
62
// retryParams are custom backoff parameters used to modify a workspace.
63
// These params retry more quickly than the default retry.DefaultBackoff.
64
retryParams = wait.Backoff{
65
Steps: 10,
66
Duration: 10 * time.Millisecond,
67
Factor: 2.0,
68
Jitter: 0.2,
69
}
70
)
71
72
func NewWorkspaceManagerServer(clnt client.Client, cfg *config.Configuration, reg prometheus.Registerer, maintenance maintenance.Maintenance) *WorkspaceManagerServer {
73
metrics := newWorkspaceMetrics(cfg.Namespace, clnt)
74
reg.MustRegister(metrics)
75
76
return &WorkspaceManagerServer{
77
Client: clnt,
78
Config: cfg,
79
metrics: metrics,
80
maintenance: maintenance,
81
subs: subscriptions{
82
subscribers: make(map[string]chan *wsmanapi.SubscribeResponse),
83
},
84
}
85
}
86
87
type WorkspaceManagerServer struct {
88
Client client.Client
89
Config *config.Configuration
90
metrics *workspaceMetrics
91
maintenance maintenance.Maintenance
92
93
subs subscriptions
94
wsmanapi.UnimplementedWorkspaceManagerServer
95
}
96
97
// OnWorkspaceReconcile is called by the controller whenever it reconciles a workspace.
98
// This function then publishes to subscribers.
99
func (wsm *WorkspaceManagerServer) OnWorkspaceReconcile(ctx context.Context, ws *workspacev1.Workspace) {
100
wsm.subs.PublishToSubscribers(ctx, &wsmanapi.SubscribeResponse{
101
Status: wsm.extractWorkspaceStatus(ws),
102
})
103
}
104
105
func (wsm *WorkspaceManagerServer) StartWorkspace(ctx context.Context, req *wsmanapi.StartWorkspaceRequest) (resp *wsmanapi.StartWorkspaceResponse, err error) {
106
owi := log.OWI(req.Metadata.Owner, req.Metadata.MetaId, req.Id)
107
span, ctx := tracing.FromContext(ctx, "StartWorkspace")
108
tracing.LogRequestSafe(span, req)
109
tracing.ApplyOWI(span, owi)
110
defer tracing.FinishSpan(span, &err)
111
112
if wsm.maintenance.IsEnabled(ctx) {
113
return &wsmanapi.StartWorkspaceResponse{}, status.Error(codes.FailedPrecondition, "under maintenance")
114
}
115
116
if err := validateStartWorkspaceRequest(req); err != nil {
117
return nil, err
118
}
119
120
var workspaceType workspacev1.WorkspaceType
121
switch req.Type {
122
case wsmanapi.WorkspaceType_IMAGEBUILD:
123
workspaceType = workspacev1.WorkspaceTypeImageBuild
124
case wsmanapi.WorkspaceType_PREBUILD:
125
workspaceType = workspacev1.WorkspaceTypePrebuild
126
case wsmanapi.WorkspaceType_REGULAR:
127
workspaceType = workspacev1.WorkspaceTypeRegular
128
default:
129
return nil, status.Errorf(codes.InvalidArgument, "unsupported workspace type: %v", req.Type)
130
}
131
132
var git *workspacev1.GitSpec
133
if req.Spec.Git != nil {
134
git = &workspacev1.GitSpec{
135
Username: req.Spec.Git.Username,
136
Email: req.Spec.Git.Email,
137
}
138
}
139
140
timeout, err := parseTimeout(req.Spec.Timeout)
141
if err != nil {
142
return nil, status.Error(codes.InvalidArgument, err.Error())
143
}
144
145
closedTimeout, err := parseTimeout(req.Spec.ClosedTimeout)
146
if err != nil {
147
return nil, status.Error(codes.InvalidArgument, err.Error())
148
}
149
150
maximumLifetime, err := parseTimeout(req.Spec.MaximumLifetime)
151
if err != nil {
152
return nil, status.Error(codes.InvalidArgument, err.Error())
153
}
154
155
var admissionLevel workspacev1.AdmissionLevel
156
switch req.Spec.Admission {
157
case wsmanapi.AdmissionLevel_ADMIT_EVERYONE:
158
admissionLevel = workspacev1.AdmissionLevelEveryone
159
case wsmanapi.AdmissionLevel_ADMIT_OWNER_ONLY:
160
admissionLevel = workspacev1.AdmissionLevelOwner
161
default:
162
return nil, status.Errorf(codes.InvalidArgument, "unsupported admission level: %v", req.Spec.Admission)
163
}
164
165
ports := make([]workspacev1.PortSpec, 0, len(req.Spec.Ports))
166
for _, p := range req.Spec.Ports {
167
v := workspacev1.AdmissionLevelOwner
168
if p.Visibility == wsmanapi.PortVisibility_PORT_VISIBILITY_PUBLIC {
169
v = workspacev1.AdmissionLevelEveryone
170
}
171
protocol := workspacev1.PortProtocolHttp
172
if p.Protocol == wsmanapi.PortProtocol_PORT_PROTOCOL_HTTPS {
173
protocol = workspacev1.PortProtocolHttps
174
}
175
ports = append(ports, workspacev1.PortSpec{
176
Port: p.Port,
177
Visibility: v,
178
Protocol: protocol,
179
})
180
}
181
182
var classID string
183
_, ok := wsm.Config.WorkspaceClasses[req.Spec.Class]
184
if !ok {
185
classID = config.DefaultWorkspaceClass
186
} else {
187
classID = req.Spec.Class
188
}
189
190
class, ok := wsm.Config.WorkspaceClasses[classID]
191
if !ok {
192
return nil, status.Errorf(codes.InvalidArgument, "workspace class \"%s\" is unknown", req.Spec.Class)
193
}
194
195
storage, err := class.Container.Limits.StorageQuantity()
196
if err != nil {
197
msg := fmt.Sprintf("workspace class %s has invalid storage quantity: %v", class.Name, err)
198
return nil, status.Errorf(codes.InvalidArgument, "%s", msg)
199
}
200
201
annotations := make(map[string]string)
202
for k, v := range req.Metadata.Annotations {
203
annotations[k] = v
204
}
205
206
limits := class.Container.Limits
207
if limits != nil && limits.CPU != nil {
208
if limits.CPU.MinLimit != "" {
209
annotations[wsk8s.WorkspaceCpuMinLimitAnnotation] = limits.CPU.MinLimit
210
}
211
212
if limits.CPU.BurstLimit != "" {
213
annotations[wsk8s.WorkspaceCpuBurstLimitAnnotation] = limits.CPU.BurstLimit
214
}
215
}
216
217
var sshGatewayCAPublicKey string
218
for _, feature := range req.Spec.FeatureFlags {
219
switch feature {
220
case wsmanapi.WorkspaceFeatureFlag_WORKSPACE_CONNECTION_LIMITING:
221
annotations[wsk8s.WorkspaceNetConnLimitAnnotation] = util.BooleanTrueString
222
case wsmanapi.WorkspaceFeatureFlag_WORKSPACE_PSI:
223
annotations[wsk8s.WorkspacePressureStallInfoAnnotation] = util.BooleanTrueString
224
case wsmanapi.WorkspaceFeatureFlag_SSH_CA:
225
sshGatewayCAPublicKey = wsm.Config.SSHGatewayCAPublicKey
226
}
227
}
228
229
envSecretName := fmt.Sprintf("%s-%s", req.Id, "env")
230
userEnvVars, envData := extractWorkspaceUserEnv(envSecretName, req.Spec.Envvars, req.Spec.SysEnvvars)
231
sysEnvVars := extractWorkspaceSysEnv(req.Spec.SysEnvvars)
232
233
tokenData := extractWorkspaceTokenData(req.Spec)
234
initializer, err := proto.Marshal(req.Spec.Initializer)
235
if err != nil {
236
return nil, status.Errorf(codes.InvalidArgument, "cannot serialise content initializer: %v", err)
237
}
238
239
ws := workspacev1.Workspace{
240
TypeMeta: metav1.TypeMeta{
241
APIVersion: workspacev1.GroupVersion.String(),
242
Kind: "Workspace",
243
},
244
ObjectMeta: metav1.ObjectMeta{
245
Name: req.Id,
246
Annotations: annotations,
247
Namespace: wsm.Config.Namespace,
248
Labels: map[string]string{
249
wsk8s.WorkspaceIDLabel: req.Metadata.MetaId,
250
wsk8s.OwnerLabel: req.Metadata.Owner,
251
wsk8s.WorkspaceManagedByLabel: constants.ManagedBy,
252
},
253
},
254
Spec: workspacev1.WorkspaceSpec{
255
Ownership: workspacev1.Ownership{
256
Owner: req.Metadata.Owner,
257
WorkspaceID: req.Metadata.MetaId,
258
},
259
Type: workspaceType,
260
Class: classID,
261
Image: workspacev1.WorkspaceImages{
262
Workspace: workspacev1.WorkspaceImage{
263
Ref: pointer.String(req.Spec.WorkspaceImage),
264
},
265
IDE: workspacev1.IDEImages{
266
Web: req.Spec.IdeImage.WebRef,
267
Refs: req.Spec.IdeImageLayers,
268
Supervisor: req.Spec.IdeImage.SupervisorRef,
269
},
270
},
271
Initializer: initializer,
272
UserEnvVars: userEnvVars,
273
SysEnvVars: sysEnvVars,
274
WorkspaceLocation: req.Spec.WorkspaceLocation,
275
Git: git,
276
Timeout: workspacev1.TimeoutSpec{
277
Time: timeout,
278
ClosedTimeout: closedTimeout,
279
MaximumLifetime: maximumLifetime,
280
},
281
Admission: workspacev1.AdmissionSpec{
282
Level: admissionLevel,
283
},
284
Ports: ports,
285
SshPublicKeys: req.Spec.SshPublicKeys,
286
StorageQuota: int(storage.Value()),
287
SSHGatewayCAPublicKey: sshGatewayCAPublicKey,
288
},
289
}
290
controllerutil.AddFinalizer(&ws, workspacev1.GitpodFinalizerName)
291
292
exists, err := wsm.workspaceExists(ctx, req.Metadata.MetaId)
293
if err != nil {
294
return nil, fmt.Errorf("cannot check if workspace %s exists: %w", req.Metadata.MetaId, err)
295
}
296
297
if exists {
298
return nil, status.Errorf(codes.AlreadyExists, "workspace %s already exists", req.Metadata.MetaId)
299
}
300
301
err = wsm.createWorkspaceSecret(ctx, &ws, envSecretName, wsm.Config.Namespace, envData)
302
if err != nil {
303
return nil, fmt.Errorf("cannot create env secret for workspace %s: %w", req.Id, err)
304
}
305
306
err = wsm.createWorkspaceSecret(ctx, &ws, fmt.Sprintf("%s-%s", req.Id, "tokens"), wsm.Config.SecretsNamespace, tokenData)
307
if err != nil {
308
return nil, fmt.Errorf("cannot create token secret for workspace %s: %w", req.Id, err)
309
}
310
311
wsm.metrics.recordWorkspaceStart(&ws)
312
err = wsm.Client.Create(ctx, &ws)
313
if err != nil {
314
log.WithError(err).WithFields(owi).Error("error creating workspace")
315
return nil, status.Errorf(codes.FailedPrecondition, "cannot create workspace")
316
}
317
318
var wsr workspacev1.Workspace
319
err = wait.PollWithContext(ctx, 100*time.Millisecond, 30*time.Second, func(c context.Context) (done bool, err error) {
320
err = wsm.Client.Get(ctx, types.NamespacedName{Namespace: wsm.Config.Namespace, Name: ws.Name}, &wsr)
321
if err != nil {
322
return false, nil
323
}
324
325
if wsr.Status.OwnerToken != "" && wsr.Status.URL != "" {
326
return true, nil
327
}
328
329
return false, nil
330
})
331
if err != nil {
332
return nil, status.Errorf(codes.FailedPrecondition, "cannot wait for workspace URL: %q", err)
333
}
334
335
return &wsmanapi.StartWorkspaceResponse{
336
Url: wsr.Status.URL,
337
OwnerToken: wsr.Status.OwnerToken,
338
}, nil
339
}
340
341
func (wsm *WorkspaceManagerServer) workspaceExists(ctx context.Context, id string) (bool, error) {
342
var workspaces workspacev1.WorkspaceList
343
err := wsm.Client.List(ctx, &workspaces, client.MatchingLabels{wsk8s.WorkspaceIDLabel: id})
344
if err != nil {
345
return false, err
346
}
347
348
for _, ws := range workspaces.Items {
349
if ws.Status.Phase != workspacev1.WorkspacePhaseStopped {
350
return true, nil
351
}
352
}
353
354
return false, nil
355
}
356
357
func isProtectedEnvVar(name string, sysEnvvars []*wsmanapi.EnvironmentVariable) bool {
358
switch name {
359
case "THEIA_SUPERVISOR_TOKENS":
360
return true
361
default:
362
if isGitpodInternalEnvVar(name) {
363
return false
364
}
365
for _, env := range sysEnvvars {
366
if env.Name == name {
367
return false
368
}
369
}
370
return true
371
}
372
}
373
374
func isGitpodInternalEnvVar(name string) bool {
375
return strings.HasPrefix(name, "GITPOD_") ||
376
strings.HasPrefix(name, "SUPERVISOR_") ||
377
strings.HasPrefix(name, "BOB_") ||
378
strings.HasPrefix(name, "THEIA_") ||
379
name == "NODE_EXTRA_CA_CERTS" ||
380
name == "VSX_REGISTRY_URL"
381
}
382
383
func (wsm *WorkspaceManagerServer) createWorkspaceSecret(ctx context.Context, owner client.Object, name, namespace string, data map[string]string) error {
384
secret := corev1.Secret{
385
ObjectMeta: metav1.ObjectMeta{
386
Name: name,
387
Namespace: namespace,
388
},
389
StringData: data,
390
}
391
392
err := wsm.Client.Create(ctx, &secret)
393
if err != nil && !errors.IsAlreadyExists(err) {
394
return err
395
}
396
397
return nil
398
}
399
400
func (wsm *WorkspaceManagerServer) StopWorkspace(ctx context.Context, req *wsmanapi.StopWorkspaceRequest) (res *wsmanapi.StopWorkspaceResponse, err error) {
401
owi := log.OWI("", "", req.Id)
402
span, ctx := tracing.FromContext(ctx, "StopWorkspace")
403
tracing.LogRequestSafe(span, req)
404
tracing.ApplyOWI(span, owi)
405
defer tracing.FinishSpan(span, &err)
406
407
if wsm.maintenance.IsEnabled(ctx) {
408
return &wsmanapi.StopWorkspaceResponse{}, status.Error(codes.FailedPrecondition, "under maintenance")
409
}
410
411
gracePeriod := stopWorkspaceNormallyGracePeriod
412
if req.Policy == wsmanapi.StopWorkspacePolicy_IMMEDIATELY {
413
span.LogKV("policy", "immediately")
414
gracePeriod = stopWorkspaceImmediatelyGracePeriod
415
} else if req.Policy == wsmanapi.StopWorkspacePolicy_ABORT {
416
span.LogKV("policy", "abort")
417
gracePeriod = stopWorkspaceImmediatelyGracePeriod
418
if err = wsm.modifyWorkspace(ctx, req.Id, true, func(ws *workspacev1.Workspace) error {
419
ws.Status.SetCondition(workspacev1.NewWorkspaceConditionAborted("StopWorkspaceRequest"))
420
return nil
421
}); err != nil {
422
log.WithError(err).WithFields(owi).Error("failed to add Aborted condition to workspace")
423
}
424
}
425
err = wsm.modifyWorkspace(ctx, req.Id, true, func(ws *workspacev1.Workspace) error {
426
ws.Status.SetCondition(workspacev1.NewWorkspaceConditionStoppedByRequest(gracePeriod.String()))
427
return nil
428
})
429
// Ignore NotFound errors, workspace has already been stopped.
430
if err != nil && status.Code(err) != codes.NotFound {
431
return nil, err
432
}
433
return &wsmanapi.StopWorkspaceResponse{}, nil
434
}
435
436
func (wsm *WorkspaceManagerServer) GetWorkspaces(ctx context.Context, req *wsmanapi.GetWorkspacesRequest) (*wsmanapi.GetWorkspacesResponse, error) {
437
labelSelector, err := metadataFilterToLabelSelector(req.MustMatch)
438
if err != nil {
439
return nil, status.Errorf(codes.FailedPrecondition, "cannot convert metadata filter: %v", err)
440
}
441
442
var workspaces workspacev1.WorkspaceList
443
err = wsm.Client.List(ctx, &workspaces, &client.ListOptions{
444
LabelSelector: labelSelector,
445
})
446
if err != nil {
447
return nil, status.Errorf(codes.FailedPrecondition, "cannot list workspaces: %v", err)
448
}
449
450
res := make([]*wsmanapi.WorkspaceStatus, 0, len(workspaces.Items))
451
for _, ws := range workspaces.Items {
452
if !matchesMetadataAnnotations(&ws, req.MustMatch) {
453
continue
454
}
455
456
res = append(res, wsm.extractWorkspaceStatus(&ws))
457
}
458
459
return &wsmanapi.GetWorkspacesResponse{Status: res}, nil
460
}
461
462
func (wsm *WorkspaceManagerServer) DescribeWorkspace(ctx context.Context, req *wsmanapi.DescribeWorkspaceRequest) (*wsmanapi.DescribeWorkspaceResponse, error) {
463
var ws workspacev1.Workspace
464
err := wsm.Client.Get(ctx, types.NamespacedName{Namespace: wsm.Config.Namespace, Name: req.Id}, &ws)
465
if errors.IsNotFound(err) {
466
return nil, status.Errorf(codes.NotFound, "workspace %s not found", req.Id)
467
}
468
if err != nil {
469
return nil, status.Errorf(codes.Internal, "cannot lookup workspace: %v", err)
470
}
471
472
result := &wsmanapi.DescribeWorkspaceResponse{
473
Status: wsm.extractWorkspaceStatus(&ws),
474
}
475
476
lastActivity := activity.Last(&ws)
477
if lastActivity != nil {
478
result.LastActivity = lastActivity.UTC().Format(time.RFC3339Nano)
479
}
480
return result, nil
481
}
482
483
// Subscribe streams all status updates to a client
484
func (m *WorkspaceManagerServer) Subscribe(req *wsmanapi.SubscribeRequest, srv wsmanapi.WorkspaceManager_SubscribeServer) (err error) {
485
var sub subscriber = srv
486
if req.MustMatch != nil {
487
sub = &filteringSubscriber{srv, req.MustMatch}
488
}
489
490
return m.subs.Subscribe(srv.Context(), sub)
491
}
492
493
// MarkActive records a workspace as being active which prevents it from timing out
494
func (wsm *WorkspaceManagerServer) MarkActive(ctx context.Context, req *wsmanapi.MarkActiveRequest) (res *wsmanapi.MarkActiveResponse, err error) {
495
span, ctx := tracing.FromContext(ctx, "MarkActive")
496
tracing.ApplyOWI(span, log.OWI("", "", req.Id))
497
defer tracing.FinishSpan(span, &err)
498
499
workspaceID := req.Id
500
501
var ws workspacev1.Workspace
502
err = wsm.Client.Get(ctx, types.NamespacedName{Namespace: wsm.Config.Namespace, Name: req.Id}, &ws)
503
if errors.IsNotFound(err) {
504
return nil, status.Errorf(codes.NotFound, "workspace %s does not exist", req.Id)
505
}
506
if err != nil {
507
return nil, status.Errorf(codes.Internal, "cannot mark workspace: %v", err)
508
}
509
510
var firstUserActivity *timestamppb.Timestamp
511
if c := wsk8s.GetCondition(ws.Status.Conditions, string(workspacev1.WorkspaceConditionFirstUserActivity)); c != nil {
512
firstUserActivity = timestamppb.New(c.LastTransitionTime.Time)
513
}
514
515
// if user already mark workspace as active and this request has IgnoreIfActive flag, just simple ignore it
516
if firstUserActivity != nil && req.IgnoreIfActive {
517
return &wsmanapi.MarkActiveResponse{}, nil
518
}
519
520
now := time.Now().UTC()
521
lastActivityStatus := metav1.NewTime(now)
522
ws.Status.LastActivity = &lastActivityStatus
523
524
err = wsm.modifyWorkspace(ctx, req.Id, true, func(ws *workspacev1.Workspace) error {
525
ws.Status.LastActivity = &lastActivityStatus
526
return nil
527
})
528
if err != nil {
529
log.WithError(err).WithFields(log.OWI("", "", workspaceID)).Warn("was unable to update status")
530
}
531
532
// We do however maintain the "closed" flag as condition on the workspace. This flag should not change
533
// very often and provides a better UX if it persists across ws-manager restarts.
534
isMarkedClosed := ws.IsConditionTrue(workspacev1.WorkspaceConditionClosed)
535
if req.Closed && !isMarkedClosed {
536
err = wsm.modifyWorkspace(ctx, req.Id, true, func(ws *workspacev1.Workspace) error {
537
ws.Status.SetCondition(workspacev1.NewWorkspaceConditionClosed(metav1.ConditionTrue, "MarkActiveRequest"))
538
return nil
539
})
540
} else if !req.Closed && isMarkedClosed {
541
err = wsm.modifyWorkspace(ctx, req.Id, true, func(ws *workspacev1.Workspace) error {
542
ws.Status.SetCondition(workspacev1.NewWorkspaceConditionClosed(metav1.ConditionFalse, "MarkActiveRequest"))
543
return nil
544
})
545
}
546
if err != nil {
547
logFields := logrus.Fields{
548
"closed": req.Closed,
549
"isMarkedClosed": isMarkedClosed,
550
}
551
log.WithError(err).WithFields(log.OWI("", "", workspaceID)).WithFields(logFields).Warn("was unable to mark workspace properly")
552
}
553
554
// If it's the first call: Mark the pod with FirstUserActivity condition.
555
if firstUserActivity == nil {
556
err := wsm.modifyWorkspace(ctx, req.Id, true, func(ws *workspacev1.Workspace) error {
557
ws.Status.SetCondition(workspacev1.NewWorkspaceConditionFirstUserActivity("MarkActiveRequest"))
558
return nil
559
})
560
if err != nil {
561
log.WithError(err).WithFields(log.OWI("", "", workspaceID)).Warn("was unable to set FirstUserActivity condition on workspace")
562
return nil, err
563
}
564
}
565
566
return &wsmanapi.MarkActiveResponse{}, nil
567
}
568
569
func (wsm *WorkspaceManagerServer) SetTimeout(ctx context.Context, req *wsmanapi.SetTimeoutRequest) (*wsmanapi.SetTimeoutResponse, error) {
570
duration, err := time.ParseDuration(req.Duration)
571
if err != nil {
572
return nil, status.Errorf(codes.InvalidArgument, "invalid duration: %v", err)
573
}
574
575
if req.Type == wsmanapi.TimeoutType_WORKSPACE_TIMEOUT {
576
err = wsm.modifyWorkspace(ctx, req.Id, false, func(ws *workspacev1.Workspace) error {
577
ws.Spec.Timeout.Time = &metav1.Duration{Duration: duration}
578
ws.Spec.Timeout.ClosedTimeout = &metav1.Duration{Duration: time.Duration(0)}
579
return nil
580
})
581
} else if req.Type == wsmanapi.TimeoutType_CLOSED_TIMEOUT {
582
err = wsm.modifyWorkspace(ctx, req.Id, false, func(ws *workspacev1.Workspace) error {
583
ws.Spec.Timeout.ClosedTimeout = &metav1.Duration{Duration: duration}
584
return nil
585
})
586
}
587
if err != nil {
588
return nil, err
589
}
590
591
return &wsmanapi.SetTimeoutResponse{}, nil
592
}
593
594
func (wsm *WorkspaceManagerServer) ControlPort(ctx context.Context, req *wsmanapi.ControlPortRequest) (res *wsmanapi.ControlPortResponse, err error) {
595
span, ctx := tracing.FromContext(ctx, "ControlPort")
596
tracing.ApplyOWI(span, log.OWI("", "", req.Id))
597
defer tracing.FinishSpan(span, &err)
598
599
if req.Spec == nil {
600
return nil, status.Errorf(codes.InvalidArgument, "missing spec")
601
}
602
603
port := req.Spec.Port
604
err = wsm.modifyWorkspace(ctx, req.Id, false, func(ws *workspacev1.Workspace) error {
605
n := 0
606
for _, x := range ws.Spec.Ports {
607
if x.Port != port {
608
ws.Spec.Ports[n] = x
609
n++
610
}
611
}
612
ws.Spec.Ports = ws.Spec.Ports[:n]
613
614
if req.Expose {
615
visibility := workspacev1.AdmissionLevelOwner
616
protocol := workspacev1.PortProtocolHttp
617
if req.Spec.Visibility == wsmanapi.PortVisibility_PORT_VISIBILITY_PUBLIC {
618
visibility = workspacev1.AdmissionLevelEveryone
619
}
620
if req.Spec.Protocol == wsmanapi.PortProtocol_PORT_PROTOCOL_HTTPS {
621
protocol = workspacev1.PortProtocolHttps
622
}
623
ws.Spec.Ports = append(ws.Spec.Ports, workspacev1.PortSpec{
624
Port: port,
625
Visibility: visibility,
626
Protocol: protocol,
627
})
628
}
629
630
return nil
631
})
632
if err != nil {
633
return nil, err
634
}
635
return &wsmanapi.ControlPortResponse{}, nil
636
}
637
638
func (wsm *WorkspaceManagerServer) TakeSnapshot(ctx context.Context, req *wsmanapi.TakeSnapshotRequest) (res *wsmanapi.TakeSnapshotResponse, err error) {
639
span, ctx := tracing.FromContext(ctx, "TakeSnapshot")
640
tracing.ApplyOWI(span, log.OWI("", "", req.Id))
641
defer tracing.FinishSpan(span, &err)
642
643
if wsm.maintenance.IsEnabled(ctx) {
644
return &wsmanapi.TakeSnapshotResponse{}, status.Error(codes.FailedPrecondition, "under maintenance")
645
}
646
647
var ws workspacev1.Workspace
648
err = wsm.Client.Get(ctx, types.NamespacedName{Namespace: wsm.Config.Namespace, Name: req.Id}, &ws)
649
if errors.IsNotFound(err) {
650
return nil, status.Errorf(codes.NotFound, "workspace %s not found", req.Id)
651
}
652
if err != nil {
653
return nil, status.Errorf(codes.Internal, "cannot lookup workspace: %v", err)
654
}
655
656
if ws.Status.Phase != workspacev1.WorkspacePhaseRunning {
657
return nil, status.Errorf(codes.FailedPrecondition, "snapshots can only be taken of running workspaces, not %s workspaces", ws.Status.Phase)
658
}
659
660
snapshot := workspacev1.Snapshot{
661
TypeMeta: metav1.TypeMeta{
662
APIVersion: workspacev1.GroupVersion.String(),
663
Kind: "Snapshot",
664
},
665
ObjectMeta: metav1.ObjectMeta{
666
Name: fmt.Sprintf("%s-%d", req.Id, time.Now().UnixNano()),
667
Namespace: wsm.Config.Namespace,
668
},
669
Spec: workspacev1.SnapshotSpec{
670
NodeName: ws.Status.Runtime.NodeName,
671
WorkspaceID: ws.Name,
672
},
673
}
674
675
err = controllerutil.SetOwnerReference(&ws, &snapshot, wsm.Client.Scheme())
676
if err != nil {
677
return nil, status.Errorf(codes.Internal, "cannot set owner for snapshot: %q", err)
678
}
679
680
err = wsm.Client.Create(ctx, &snapshot)
681
if err != nil {
682
return nil, status.Errorf(codes.Internal, "cannot create snapshot object: %q", err)
683
}
684
685
var sso workspacev1.Snapshot
686
err = wait.PollWithContext(ctx, 100*time.Millisecond, 10*time.Second, func(c context.Context) (done bool, err error) {
687
err = wsm.Client.Get(ctx, types.NamespacedName{Namespace: wsm.Config.Namespace, Name: snapshot.Name}, &sso)
688
if err != nil {
689
return false, err
690
}
691
692
if sso.Status.Error != "" {
693
return true, fmt.Errorf("%s", sso.Status.Error)
694
}
695
696
if sso.Status.URL != "" {
697
return true, nil
698
}
699
700
return false, nil
701
})
702
703
if err != nil {
704
return nil, status.Errorf(codes.Internal, "cannot wait for snapshot URL: %v", err)
705
}
706
707
if !req.ReturnImmediately {
708
err = wait.PollWithContext(ctx, 100*time.Millisecond, 0, func(c context.Context) (done bool, err error) {
709
err = wsm.Client.Get(ctx, types.NamespacedName{Namespace: wsm.Config.Namespace, Name: ws.Name}, &sso)
710
if err != nil {
711
return false, nil
712
}
713
714
if sso.Status.Completed {
715
return true, nil
716
}
717
718
return false, nil
719
})
720
721
if err != nil {
722
return nil, status.Errorf(codes.Internal, "cannot wait for snapshot: %q", err)
723
}
724
725
if sso.Status.Error != "" {
726
return nil, status.Errorf(codes.Internal, "cannot take snapshot: %q", sso.Status.Error)
727
}
728
}
729
730
return &wsmanapi.TakeSnapshotResponse{
731
Url: sso.Status.URL,
732
}, nil
733
}
734
735
func (wsm *WorkspaceManagerServer) ControlAdmission(ctx context.Context, req *wsmanapi.ControlAdmissionRequest) (*wsmanapi.ControlAdmissionResponse, error) {
736
err := wsm.modifyWorkspace(ctx, req.Id, false, func(ws *workspacev1.Workspace) error {
737
switch req.Level {
738
case wsmanapi.AdmissionLevel_ADMIT_EVERYONE:
739
ws.Spec.Admission.Level = workspacev1.AdmissionLevelEveryone
740
case wsmanapi.AdmissionLevel_ADMIT_OWNER_ONLY:
741
ws.Spec.Admission.Level = workspacev1.AdmissionLevelOwner
742
default:
743
return status.Errorf(codes.InvalidArgument, "unsupported admission level: %v", req.Level)
744
}
745
return nil
746
})
747
if err != nil {
748
return nil, err
749
}
750
return &wsmanapi.ControlAdmissionResponse{}, nil
751
}
752
753
func (wsm *WorkspaceManagerServer) UpdateSSHKey(ctx context.Context, req *wsmanapi.UpdateSSHKeyRequest) (res *wsmanapi.UpdateSSHKeyResponse, err error) {
754
span, ctx := tracing.FromContext(ctx, "UpdateSSHKey")
755
tracing.ApplyOWI(span, log.OWI("", "", req.Id))
756
defer tracing.FinishSpan(span, &err)
757
758
if err = validateUpdateSSHKeyRequest(req); err != nil {
759
return &wsmanapi.UpdateSSHKeyResponse{}, err
760
}
761
762
err = wsm.modifyWorkspace(ctx, req.Id, false, func(ws *workspacev1.Workspace) error {
763
ws.Spec.SshPublicKeys = req.Keys
764
return nil
765
})
766
767
return &wsmanapi.UpdateSSHKeyResponse{}, err
768
}
769
770
func (wsm *WorkspaceManagerServer) DescribeCluster(ctx context.Context, req *wsmanapi.DescribeClusterRequest) (res *wsmanapi.DescribeClusterResponse, err error) {
771
//nolint:ineffassign
772
span, ctx := tracing.FromContext(ctx, "DescribeCluster")
773
defer tracing.FinishSpan(span, &err)
774
775
classes := make([]*wsmanapi.WorkspaceClass, 0, len(wsm.Config.WorkspaceClasses))
776
for id, class := range wsm.Config.WorkspaceClasses {
777
var cpu, ram, disk resource.Quantity
778
desc := class.Description
779
if desc == "" {
780
if class.Container.Limits != nil {
781
cpu, _ = resource.ParseQuantity(class.Container.Limits.CPU.BurstLimit)
782
ram, _ = resource.ParseQuantity(class.Container.Limits.Memory)
783
disk, _ = resource.ParseQuantity(class.Container.Limits.Storage)
784
}
785
if cpu.Value() == 0 && class.Container.Requests != nil {
786
cpu, _ = resource.ParseQuantity(class.Container.Requests.CPU)
787
}
788
if ram.Value() == 0 && class.Container.Requests != nil {
789
ram, _ = resource.ParseQuantity(class.Container.Requests.Memory)
790
}
791
desc = fmt.Sprintf("%d vCPU, %dGB memory, %dGB disk", cpu.Value(), ram.ScaledValue(resource.Giga), disk.ScaledValue(resource.Giga))
792
}
793
classes = append(classes, &wsmanapi.WorkspaceClass{
794
Id: id,
795
DisplayName: class.Name,
796
Description: desc,
797
CreditsPerMinute: class.CreditsPerMinute,
798
})
799
}
800
sort.Slice(classes, func(i, j int) bool {
801
return classes[i].Id < classes[j].Id
802
})
803
804
return &wsmanapi.DescribeClusterResponse{
805
WorkspaceClasses: classes,
806
PreferredWorkspaceClass: wsm.Config.PreferredWorkspaceClass,
807
}, nil
808
}
809
810
// modifyWorkspace modifies a workspace object using the mod function. If the mod function returns a gRPC status error, that error
811
// is returned directly. If mod returns a non-gRPC error it is turned into one.
812
func (wsm *WorkspaceManagerServer) modifyWorkspace(ctx context.Context, id string, updateStatus bool, mod func(ws *workspacev1.Workspace) error) (err error) {
813
span, ctx := tracing.FromContext(ctx, "modifyWorkspace")
814
tracing.ApplyOWI(span, log.OWI("", "", id))
815
defer tracing.FinishSpan(span, &err)
816
817
err = retry.RetryOnConflict(retryParams, func() (err error) {
818
span, ctx := tracing.FromContext(ctx, "modifyWorkspaceRetryFn")
819
defer tracing.FinishSpan(span, &err)
820
821
var ws workspacev1.Workspace
822
err = wsm.Client.Get(ctx, types.NamespacedName{Namespace: wsm.Config.Namespace, Name: id}, &ws)
823
if err != nil {
824
return err
825
}
826
827
err = mod(&ws)
828
if err != nil {
829
return err
830
}
831
832
if updateStatus {
833
err = wsm.Client.Status().Update(ctx, &ws)
834
} else {
835
err = wsm.Client.Update(ctx, &ws)
836
837
}
838
return err
839
})
840
if errors.IsNotFound(err) {
841
return status.Errorf(codes.NotFound, "workspace %s not found", id)
842
}
843
if c := status.Code(err); c != codes.Unknown && c != codes.OK {
844
return err
845
}
846
if err != nil {
847
return status.Errorf(codes.Internal, "cannot modify workspace: %v", err)
848
}
849
return nil
850
}
851
852
// validateStartWorkspaceRequest ensures that acting on this request will not leave the system in an invalid state
853
func validateStartWorkspaceRequest(req *wsmanapi.StartWorkspaceRequest) error {
854
err := validation.ValidateStruct(req.Spec,
855
validation.Field(&req.Spec.WorkspaceImage, validation.Required),
856
validation.Field(&req.Spec.WorkspaceLocation, validation.Required),
857
validation.Field(&req.Spec.Ports, validation.By(areValidPorts)),
858
validation.Field(&req.Spec.Initializer, validation.Required),
859
validation.Field(&req.Spec.FeatureFlags, validation.By(areValidFeatureFlags)),
860
)
861
if err != nil {
862
return status.Errorf(codes.InvalidArgument, "invalid request: %v", err)
863
}
864
865
rules := make([]*validation.FieldRules, 0)
866
rules = append(rules, validation.Field(&req.Id, validation.Required))
867
rules = append(rules, validation.Field(&req.Spec, validation.Required))
868
rules = append(rules, validation.Field(&req.Type, validation.By(isValidWorkspaceType)))
869
if req.Type == wsmanapi.WorkspaceType_REGULAR {
870
rules = append(rules, validation.Field(&req.ServicePrefix, validation.Required))
871
}
872
err = validation.ValidateStruct(req, rules...)
873
if err != nil {
874
return status.Errorf(codes.InvalidArgument, "invalid request: %v", err)
875
}
876
877
return nil
878
}
879
880
func validateUpdateSSHKeyRequest(req *wsmanapi.UpdateSSHKeyRequest) error {
881
err := validation.ValidateStruct(req,
882
validation.Field(&req.Id, validation.Required),
883
validation.Field(&req.Keys, validation.Required),
884
)
885
886
if err != nil {
887
return status.Errorf(codes.InvalidArgument, "invalid request: %v", err)
888
}
889
890
return nil
891
}
892
893
func isValidWorkspaceType(value interface{}) error {
894
s, ok := value.(wsmanapi.WorkspaceType)
895
if !ok {
896
return xerrors.Errorf("value is not a workspace type")
897
}
898
899
_, ok = wsmanapi.WorkspaceType_name[int32(s)]
900
if !ok {
901
return xerrors.Errorf("value %d is out of range", s)
902
}
903
904
return nil
905
}
906
907
func areValidPorts(value interface{}) error {
908
s, ok := value.([]*wsmanapi.PortSpec)
909
if !ok {
910
return xerrors.Errorf("value is not a port spec list")
911
}
912
913
idx := make(map[uint32]struct{})
914
for _, p := range s {
915
if _, exists := idx[p.Port]; exists {
916
return xerrors.Errorf("port %d is not unique", p.Port)
917
}
918
idx[p.Port] = struct{}{}
919
920
// TODO [cw]: probably the target should be unique as well.
921
// I don't want to introduce new issues with too
922
// tight validation though.
923
}
924
925
return nil
926
}
927
928
func areValidFeatureFlags(value interface{}) error {
929
s, ok := value.([]wsmanapi.WorkspaceFeatureFlag)
930
if !ok {
931
return xerrors.Errorf("value not a feature flag list")
932
}
933
934
idx := make(map[wsmanapi.WorkspaceFeatureFlag]struct{}, len(s))
935
for _, k := range s {
936
idx[k] = struct{}{}
937
}
938
939
return nil
940
}
941
942
func extractWorkspaceUserEnv(secretName string, userEnvs, sysEnvs []*wsmanapi.EnvironmentVariable) ([]corev1.EnvVar, map[string]string) {
943
envVars := make([]corev1.EnvVar, 0, len(userEnvs))
944
secrets := make(map[string]string)
945
for _, e := range userEnvs {
946
switch {
947
case e.Secret != nil:
948
securedEnv := corev1.EnvVar{
949
Name: e.Name,
950
ValueFrom: &corev1.EnvVarSource{
951
SecretKeyRef: &corev1.SecretKeySelector{
952
LocalObjectReference: corev1.LocalObjectReference{Name: e.Secret.SecretName},
953
Key: e.Secret.Key,
954
},
955
},
956
}
957
958
envVars = append(envVars, securedEnv)
959
960
case e.Value == "":
961
continue
962
963
case !isProtectedEnvVar(e.Name, sysEnvs):
964
unprotectedEnv := corev1.EnvVar{
965
Name: e.Name,
966
Value: e.Value,
967
}
968
969
envVars = append(envVars, unprotectedEnv)
970
971
default:
972
name := fmt.Sprintf("%x", sha256.Sum256([]byte(e.Name)))
973
protectedEnv := corev1.EnvVar{
974
Name: e.Name,
975
ValueFrom: &corev1.EnvVarSource{
976
SecretKeyRef: &corev1.SecretKeySelector{
977
LocalObjectReference: corev1.LocalObjectReference{Name: secretName},
978
Key: name,
979
},
980
},
981
}
982
983
envVars = append(envVars, protectedEnv)
984
secrets[name] = e.Value
985
}
986
}
987
988
return envVars, secrets
989
}
990
991
func extractWorkspaceSysEnv(sysEnvs []*wsmanapi.EnvironmentVariable) []corev1.EnvVar {
992
envs := make([]corev1.EnvVar, 0, len(sysEnvs))
993
for _, e := range sysEnvs {
994
envs = append(envs, corev1.EnvVar{
995
Name: e.Name,
996
Value: e.Value,
997
})
998
}
999
1000
return envs
1001
}
1002
1003
func extractWorkspaceTokenData(spec *wsmanapi.StartWorkspaceSpec) map[string]string {
1004
secrets := make(map[string]string)
1005
for k, v := range csapi.ExtractAndReplaceSecretsFromInitializer(spec.Initializer) {
1006
secrets[k] = v
1007
}
1008
return secrets
1009
}
1010
1011
func (wsm *WorkspaceManagerServer) extractWorkspaceStatus(ws *workspacev1.Workspace) *wsmanapi.WorkspaceStatus {
1012
log := log.WithFields(log.OWI(ws.Spec.Ownership.Owner, ws.Spec.Ownership.WorkspaceID, ws.Name))
1013
version, _ := strconv.ParseUint(ws.ResourceVersion, 10, 64)
1014
1015
var tpe wsmanapi.WorkspaceType
1016
switch ws.Spec.Type {
1017
case workspacev1.WorkspaceTypeImageBuild:
1018
tpe = wsmanapi.WorkspaceType_IMAGEBUILD
1019
case workspacev1.WorkspaceTypePrebuild:
1020
tpe = wsmanapi.WorkspaceType_PREBUILD
1021
case workspacev1.WorkspaceTypeRegular:
1022
tpe = wsmanapi.WorkspaceType_REGULAR
1023
}
1024
1025
timeout := wsm.Config.Timeouts.RegularWorkspace.String()
1026
if ws.Spec.Timeout.Time != nil {
1027
timeout = ws.Spec.Timeout.Time.Duration.String()
1028
}
1029
1030
closedTimeout := wsm.Config.Timeouts.AfterClose.String()
1031
if ws.Spec.Timeout.ClosedTimeout != nil {
1032
closedTimeout = ws.Spec.Timeout.ClosedTimeout.Duration.String()
1033
}
1034
1035
var phase wsmanapi.WorkspacePhase
1036
switch ws.Status.Phase {
1037
case workspacev1.WorkspacePhasePending:
1038
phase = wsmanapi.WorkspacePhase_PENDING
1039
case workspacev1.WorkspacePhaseImageBuild:
1040
// TODO(cw): once we have an imagebuild phase on the protocol, map this properly
1041
phase = wsmanapi.WorkspacePhase_PENDING
1042
case workspacev1.WorkspacePhaseCreating:
1043
phase = wsmanapi.WorkspacePhase_CREATING
1044
case workspacev1.WorkspacePhaseInitializing:
1045
phase = wsmanapi.WorkspacePhase_INITIALIZING
1046
case workspacev1.WorkspacePhaseRunning:
1047
phase = wsmanapi.WorkspacePhase_RUNNING
1048
case workspacev1.WorkspacePhaseStopping:
1049
phase = wsmanapi.WorkspacePhase_STOPPING
1050
case workspacev1.WorkspacePhaseStopped:
1051
phase = wsmanapi.WorkspacePhase_STOPPED
1052
case workspacev1.WorkspacePhaseUnknown:
1053
phase = wsmanapi.WorkspacePhase_UNKNOWN
1054
}
1055
1056
var firstUserActivity *timestamppb.Timestamp
1057
for _, c := range ws.Status.Conditions {
1058
if c.Type == string(workspacev1.WorkspaceConditionFirstUserActivity) {
1059
firstUserActivity = timestamppb.New(c.LastTransitionTime.Time)
1060
}
1061
}
1062
1063
var runtime *wsmanapi.WorkspaceRuntimeInfo
1064
if rt := ws.Status.Runtime; rt != nil {
1065
runtime = &wsmanapi.WorkspaceRuntimeInfo{
1066
NodeName: rt.NodeName,
1067
NodeIp: rt.HostIP,
1068
PodName: rt.PodName,
1069
}
1070
}
1071
1072
var admissionLevel wsmanapi.AdmissionLevel
1073
switch ws.Spec.Admission.Level {
1074
case workspacev1.AdmissionLevelEveryone:
1075
admissionLevel = wsmanapi.AdmissionLevel_ADMIT_EVERYONE
1076
case workspacev1.AdmissionLevelOwner:
1077
admissionLevel = wsmanapi.AdmissionLevel_ADMIT_OWNER_ONLY
1078
}
1079
1080
ports := make([]*wsmanapi.PortSpec, 0, len(ws.Spec.Ports))
1081
for _, p := range ws.Spec.Ports {
1082
v := wsmanapi.PortVisibility_PORT_VISIBILITY_PRIVATE
1083
if p.Visibility == workspacev1.AdmissionLevelEveryone {
1084
v = wsmanapi.PortVisibility_PORT_VISIBILITY_PUBLIC
1085
}
1086
protocol := wsmanapi.PortProtocol_PORT_PROTOCOL_HTTP
1087
if p.Protocol == workspacev1.PortProtocolHttps {
1088
protocol = wsmanapi.PortProtocol_PORT_PROTOCOL_HTTPS
1089
}
1090
url, err := config.RenderWorkspacePortURL(wsm.Config.WorkspacePortURLTemplate, config.PortURLContext{
1091
Host: wsm.Config.GitpodHostURL,
1092
ID: ws.Name,
1093
IngressPort: fmt.Sprint(p.Port),
1094
Prefix: ws.Spec.Ownership.WorkspaceID,
1095
WorkspacePort: fmt.Sprint(p.Port),
1096
})
1097
if err != nil {
1098
log.WithError(err).WithField("port", p.Port).Error("cannot render public URL for port, excluding the port from the workspace status")
1099
continue
1100
}
1101
ports = append(ports, &wsmanapi.PortSpec{
1102
Port: p.Port,
1103
Visibility: v,
1104
Url: url,
1105
Protocol: protocol,
1106
})
1107
}
1108
1109
var metrics *wsmanapi.WorkspaceMetadata_Metrics
1110
if ws.Status.ImageInfo != nil {
1111
metrics = &wsmanapi.WorkspaceMetadata_Metrics{
1112
Image: &wsmanapi.WorkspaceMetadata_ImageInfo{
1113
TotalSize: ws.Status.ImageInfo.TotalSize,
1114
WorkspaceImageSize: ws.Status.ImageInfo.WorkspaceImageSize,
1115
},
1116
}
1117
}
1118
1119
var initializerMetrics *wsmanapi.InitializerMetrics
1120
if ws.Status.InitializerMetrics != nil {
1121
initializerMetrics = mapInitializerMetrics(ws.Status.InitializerMetrics)
1122
}
1123
1124
res := &wsmanapi.WorkspaceStatus{
1125
Id: ws.Name,
1126
StatusVersion: version,
1127
Metadata: &wsmanapi.WorkspaceMetadata{
1128
Owner: ws.Spec.Ownership.Owner,
1129
MetaId: ws.Spec.Ownership.WorkspaceID,
1130
StartedAt: timestamppb.New(ws.CreationTimestamp.Time),
1131
Annotations: ws.Annotations,
1132
Metrics: metrics,
1133
},
1134
Spec: &wsmanapi.WorkspaceSpec{
1135
Class: ws.Spec.Class,
1136
ExposedPorts: ports,
1137
WorkspaceImage: pointer.StringDeref(ws.Spec.Image.Workspace.Ref, ""),
1138
IdeImage: &wsmanapi.IDEImage{
1139
WebRef: ws.Spec.Image.IDE.Web,
1140
SupervisorRef: ws.Spec.Image.IDE.Supervisor,
1141
},
1142
IdeImageLayers: ws.Spec.Image.IDE.Refs,
1143
Headless: ws.IsHeadless(),
1144
Url: ws.Status.URL,
1145
Type: tpe,
1146
Timeout: timeout,
1147
ClosedTimeout: closedTimeout,
1148
},
1149
Phase: phase,
1150
Conditions: &wsmanapi.WorkspaceConditions{
1151
Failed: getConditionMessageIfTrue(ws.Status.Conditions, string(workspacev1.WorkspaceConditionFailed)),
1152
Timeout: getConditionMessageIfTrue(ws.Status.Conditions, string(workspacev1.WorkspaceConditionTimeout)),
1153
Snapshot: ws.Status.Snapshot,
1154
Deployed: convertCondition(ws.Status.Conditions, string(workspacev1.WorkspaceConditionDeployed)),
1155
FirstUserActivity: firstUserActivity,
1156
HeadlessTaskFailed: getConditionMessageIfTrue(ws.Status.Conditions, string(workspacev1.WorkspaceConditionsHeadlessTaskFailed)),
1157
StoppedByRequest: convertCondition(ws.Status.Conditions, string(workspacev1.WorkspaceConditionStoppedByRequest)),
1158
FinalBackupComplete: convertCondition(ws.Status.Conditions, string(workspacev1.WorkspaceConditionBackupComplete)),
1159
Aborted: convertCondition(ws.Status.Conditions, string(workspacev1.WorkspaceConditionAborted)),
1160
},
1161
Runtime: runtime,
1162
Auth: &wsmanapi.WorkspaceAuthentication{
1163
Admission: admissionLevel,
1164
OwnerToken: ws.Status.OwnerToken,
1165
},
1166
Repo: convertGitStatus(ws.Status.GitStatus),
1167
InitializerMetrics: initializerMetrics,
1168
}
1169
1170
return res
1171
}
1172
1173
func mapInitializerMetrics(in *workspacev1.InitializerMetrics) *wsmanapi.InitializerMetrics {
1174
result := &wsmanapi.InitializerMetrics{}
1175
// Convert Git metrics
1176
if in.Git != nil {
1177
result.Git = &wsmanapi.InitializerMetric{
1178
Duration: durationToProto(in.Git.Duration),
1179
Size: uint64(in.Git.Size),
1180
}
1181
}
1182
1183
// Convert FileDownload metrics
1184
if in.FileDownload != nil {
1185
result.FileDownload = &wsmanapi.InitializerMetric{
1186
Duration: durationToProto(in.FileDownload.Duration),
1187
Size: uint64(in.FileDownload.Size),
1188
}
1189
}
1190
1191
// Convert Snapshot metrics
1192
if in.Snapshot != nil {
1193
result.Snapshot = &wsmanapi.InitializerMetric{
1194
Duration: durationToProto(in.Snapshot.Duration),
1195
Size: uint64(in.Snapshot.Size),
1196
}
1197
}
1198
1199
// Convert Backup metrics
1200
if in.Backup != nil {
1201
result.Backup = &wsmanapi.InitializerMetric{
1202
Duration: durationToProto(in.Backup.Duration),
1203
Size: uint64(in.Backup.Size),
1204
}
1205
}
1206
1207
// Convert Prebuild metrics
1208
if in.Prebuild != nil {
1209
result.Prebuild = &wsmanapi.InitializerMetric{
1210
Duration: durationToProto(in.Prebuild.Duration),
1211
Size: uint64(in.Prebuild.Size),
1212
}
1213
}
1214
1215
// Convert Composite metrics
1216
if in.Composite != nil {
1217
result.Composite = &wsmanapi.InitializerMetric{
1218
Duration: durationToProto(in.Composite.Duration),
1219
Size: uint64(in.Composite.Size),
1220
}
1221
}
1222
1223
return result
1224
}
1225
1226
func durationToProto(d *metav1.Duration) *durationpb.Duration {
1227
if d == nil {
1228
return nil
1229
}
1230
return durationpb.New(d.Duration)
1231
}
1232
1233
func getConditionMessageIfTrue(conds []metav1.Condition, tpe string) string {
1234
for _, c := range conds {
1235
if c.Type == tpe && c.Status == metav1.ConditionTrue {
1236
return c.Message
1237
}
1238
}
1239
return ""
1240
}
1241
1242
func convertGitStatus(gs *workspacev1.GitStatus) *csapi.GitStatus {
1243
if gs == nil {
1244
return nil
1245
}
1246
return &csapi.GitStatus{
1247
Branch: gs.Branch,
1248
LatestCommit: gs.LatestCommit,
1249
UncommitedFiles: gs.UncommitedFiles,
1250
TotalUncommitedFiles: gs.TotalUncommitedFiles,
1251
UntrackedFiles: gs.UntrackedFiles,
1252
TotalUntrackedFiles: gs.TotalUntrackedFiles,
1253
UnpushedCommits: gs.UnpushedCommits,
1254
TotalUnpushedCommits: gs.TotalUnpushedCommits,
1255
}
1256
}
1257
1258
func convertCondition(conds []metav1.Condition, tpe string) wsmanapi.WorkspaceConditionBool {
1259
res := wsk8s.GetCondition(conds, tpe)
1260
if res == nil {
1261
return wsmanapi.WorkspaceConditionBool_FALSE
1262
}
1263
1264
switch res.Status {
1265
case metav1.ConditionTrue:
1266
return wsmanapi.WorkspaceConditionBool_TRUE
1267
default:
1268
return wsmanapi.WorkspaceConditionBool_FALSE
1269
}
1270
}
1271
1272
func matchesMetadataAnnotations(ws *workspacev1.Workspace, filter *wsmanapi.MetadataFilter) bool {
1273
if filter == nil {
1274
return true
1275
}
1276
for k, v := range filter.Annotations {
1277
av, ok := ws.Annotations[k]
1278
if !ok || av != v {
1279
return false
1280
}
1281
}
1282
return true
1283
}
1284
1285
func metadataFilterToLabelSelector(filter *wsmanapi.MetadataFilter) (labels.Selector, error) {
1286
if filter == nil {
1287
return nil, nil
1288
}
1289
1290
res := labels.NewSelector()
1291
if filter.MetaId != "" {
1292
req, err := labels.NewRequirement(wsk8s.WorkspaceIDLabel, selection.Equals, []string{filter.MetaId})
1293
if err != nil {
1294
return nil, xerrors.Errorf("cannot create metaID filter: %w", err)
1295
}
1296
res.Add(*req)
1297
}
1298
if filter.Owner != "" {
1299
req, err := labels.NewRequirement(wsk8s.OwnerLabel, selection.Equals, []string{filter.Owner})
1300
if err != nil {
1301
return nil, xerrors.Errorf("cannot create owner filter: %w", err)
1302
}
1303
res.Add(*req)
1304
}
1305
return res, nil
1306
}
1307
1308
func parseTimeout(timeout string) (*metav1.Duration, error) {
1309
var duration *metav1.Duration
1310
if timeout != "" {
1311
d, err := time.ParseDuration(timeout)
1312
if err != nil {
1313
return nil, fmt.Errorf("invalid timeout: %v", err)
1314
}
1315
duration = &metav1.Duration{Duration: d}
1316
}
1317
1318
return duration, nil
1319
}
1320
1321
type filteringSubscriber struct {
1322
Sub subscriber
1323
Filter *wsmanapi.MetadataFilter
1324
}
1325
1326
func matchesMetadataFilter(filter *wsmanapi.MetadataFilter, md *wsmanapi.WorkspaceMetadata) bool {
1327
if filter == nil {
1328
return true
1329
}
1330
1331
if filter.MetaId != "" && filter.MetaId != md.MetaId {
1332
return false
1333
}
1334
if filter.Owner != "" && filter.Owner != md.Owner {
1335
return false
1336
}
1337
for k, v := range filter.Annotations {
1338
av, ok := md.Annotations[k]
1339
if !ok || av != v {
1340
return false
1341
}
1342
}
1343
return true
1344
}
1345
1346
func (f *filteringSubscriber) Send(resp *wsmanapi.SubscribeResponse) error {
1347
var md *wsmanapi.WorkspaceMetadata
1348
if sts := resp.GetStatus(); sts != nil {
1349
md = sts.Metadata
1350
}
1351
if md == nil {
1352
// no metadata, no forwarding
1353
return nil
1354
}
1355
if !matchesMetadataFilter(f.Filter, md) {
1356
return nil
1357
}
1358
1359
return f.Sub.Send(resp)
1360
}
1361
1362
type subscriber interface {
1363
Send(*wsmanapi.SubscribeResponse) error
1364
}
1365
1366
type subscriptions struct {
1367
mu sync.RWMutex
1368
subscribers map[string]chan *wsmanapi.SubscribeResponse
1369
}
1370
1371
func (subs *subscriptions) Subscribe(ctx context.Context, recv subscriber) (err error) {
1372
incoming := make(chan *wsmanapi.SubscribeResponse, 250)
1373
1374
var key string
1375
peer, ok := peer.FromContext(ctx)
1376
if ok {
1377
key = fmt.Sprintf("k%s@%d", peer.Addr.String(), time.Now().UnixNano())
1378
}
1379
1380
subs.mu.Lock()
1381
if key == "" {
1382
// if for some reason we didn't get peer information,
1383
// we must generate they key within the lock, otherwise we might end up with duplicate keys
1384
key = fmt.Sprintf("k%d@%d", len(subs.subscribers), time.Now().UnixNano())
1385
}
1386
subs.subscribers[key] = incoming
1387
log.WithField("subscriberKey", key).WithField("subscriberCount", len(subs.subscribers)).Info("new subscriber")
1388
subs.mu.Unlock()
1389
1390
defer func() {
1391
subs.mu.Lock()
1392
delete(subs.subscribers, key)
1393
subs.mu.Unlock()
1394
}()
1395
1396
for {
1397
var inc *wsmanapi.SubscribeResponse
1398
select {
1399
case <-ctx.Done():
1400
return ctx.Err()
1401
case inc = <-incoming:
1402
}
1403
1404
if inc == nil {
1405
log.WithField("subscriberKey", key).Warn("subscription was canceled")
1406
return xerrors.Errorf("subscription was canceled")
1407
}
1408
1409
err = recv.Send(inc)
1410
if err != nil {
1411
log.WithField("subscriberKey", key).WithError(err).Error("cannot send update - dropping subscriber")
1412
return err
1413
}
1414
}
1415
}
1416
1417
func (subs *subscriptions) PublishToSubscribers(ctx context.Context, update *wsmanapi.SubscribeResponse) {
1418
subs.mu.RLock()
1419
var dropouts []string
1420
for k, sub := range subs.subscribers {
1421
select {
1422
case sub <- update:
1423
// all is well
1424
default:
1425
// writing to subscriber cannel blocked, which means the subscriber isn't consuming fast enough and
1426
// would block others. We'll drop this consumer later (do not drop here to avoid concurrency issues).
1427
dropouts = append(dropouts, k)
1428
}
1429
}
1430
// we cannot defer this call as dropSubscriber will attempt to acquire a write lock
1431
subs.mu.RUnlock()
1432
1433
// we check if there are any dropouts here to avoid the non-inlinable dropSubscriber call.
1434
if len(dropouts) > 0 {
1435
subs.DropSubscriber(dropouts)
1436
}
1437
}
1438
1439
func (subs *subscriptions) DropSubscriber(dropouts []string) {
1440
defer func() {
1441
err := recover()
1442
if err != nil {
1443
log.WithField("error", err).Error("caught panic in dropSubscriber")
1444
}
1445
}()
1446
1447
subs.mu.Lock()
1448
defer subs.mu.Unlock()
1449
1450
for _, k := range dropouts {
1451
sub, ok := subs.subscribers[k]
1452
if !ok {
1453
continue
1454
}
1455
1456
log.WithField("subscriber", k).WithField("subscriberCount", len(subs.subscribers)).Warn("subscriber channel was full - dropping subscriber")
1457
// despite closing the subscriber channel, the subscriber's serve Go routine will still try to send
1458
// all prior updates up to this point. See https://play.golang.org/p/XR-9nLrQLQs
1459
close(sub)
1460
delete(subs.subscribers, k)
1461
}
1462
}
1463
1464
// onChange is the default OnChange implementation which publishes workspace status updates to subscribers
1465
func (subs *subscriptions) OnChange(ctx context.Context, status *wsmanapi.WorkspaceStatus) {
1466
log := log.WithFields(log.OWI(status.Metadata.Owner, status.Metadata.MetaId, status.Id))
1467
1468
header := make(map[string]string)
1469
span := opentracing.SpanFromContext(ctx)
1470
if span != nil {
1471
tracingHeader := make(opentracing.HTTPHeadersCarrier)
1472
err := opentracing.GlobalTracer().Inject(span.Context(), opentracing.HTTPHeaders, tracingHeader)
1473
if err != nil {
1474
// if the error was caused by the span coming from the Noop tracer - ignore it.
1475
// This can happen if the workspace doesn't have a span associated with it, then we resort to creating Noop spans.
1476
if _, isNoopTracer := span.Tracer().(opentracing.NoopTracer); !isNoopTracer {
1477
log.WithError(err).Debug("unable to extract tracing information - trace will be broken")
1478
}
1479
} else {
1480
for k, v := range tracingHeader {
1481
if len(v) != 1 {
1482
continue
1483
}
1484
header[k] = v[0]
1485
}
1486
}
1487
}
1488
1489
subs.PublishToSubscribers(ctx, &wsmanapi.SubscribeResponse{
1490
Status: status,
1491
Header: header,
1492
})
1493
1494
// subs.metrics.OnChange(status)
1495
1496
// There are some conditions we'd like to get notified about, for example while running experiements or because
1497
// they represent out-of-the-ordinary situations.
1498
// We attempt to use the GCP Error Reporting for this, hence log these situations as errors.
1499
if status.Conditions.Failed != "" {
1500
log.WithField("status", status).Error("workspace failed")
1501
}
1502
if status.Phase == 0 {
1503
log.WithField("status", status).Error("workspace in UNKNOWN phase")
1504
}
1505
}
1506
1507
type workspaceMetrics struct {
1508
totalStartsCounterVec *prometheus.CounterVec
1509
}
1510
1511
func newWorkspaceMetrics(namespace string, k8s client.Client) *workspaceMetrics {
1512
return &workspaceMetrics{
1513
totalStartsCounterVec: prometheus.NewCounterVec(prometheus.CounterOpts{
1514
Namespace: "gitpod",
1515
Subsystem: "ws_manager_mk2",
1516
Name: "workspace_starts_total",
1517
Help: "total number of workspaces started",
1518
}, []string{"type", "class"}),
1519
}
1520
}
1521
1522
func (m *workspaceMetrics) recordWorkspaceStart(ws *workspacev1.Workspace) {
1523
tpe := string(ws.Spec.Type)
1524
class := ws.Spec.Class
1525
1526
counter, err := m.totalStartsCounterVec.GetMetricWithLabelValues(tpe, class)
1527
if err != nil {
1528
log.WithError(err).WithField("type", tpe).WithField("class", class)
1529
}
1530
counter.Inc()
1531
}
1532
1533
// Describe implements Collector. It will send exactly one Desc to the provided channel.
1534
func (m *workspaceMetrics) Describe(ch chan<- *prometheus.Desc) {
1535
m.totalStartsCounterVec.Describe(ch)
1536
}
1537
1538
// Collect implements Collector.
1539
func (m *workspaceMetrics) Collect(ch chan<- prometheus.Metric) {
1540
m.totalStartsCounterVec.Collect(ch)
1541
}
1542
1543