Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/ws-manager-mk2/controllers/workspace_controller.go
2498 views
1
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License-AGPL.txt in the project root for license information.
4
5
package controllers
6
7
import (
8
"context"
9
"fmt"
10
"strings"
11
"sync"
12
"time"
13
14
corev1 "k8s.io/api/core/v1"
15
"k8s.io/apimachinery/pkg/api/equality"
16
apierrors "k8s.io/apimachinery/pkg/api/errors"
17
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18
"k8s.io/apimachinery/pkg/runtime"
19
"k8s.io/apimachinery/pkg/types"
20
"k8s.io/apimachinery/pkg/util/wait"
21
"k8s.io/apimachinery/pkg/version"
22
"k8s.io/client-go/kubernetes"
23
"k8s.io/client-go/rest"
24
"k8s.io/client-go/tools/record"
25
"k8s.io/client-go/util/workqueue"
26
ctrl "sigs.k8s.io/controller-runtime"
27
"sigs.k8s.io/controller-runtime/pkg/client"
28
"sigs.k8s.io/controller-runtime/pkg/controller"
29
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
30
"sigs.k8s.io/controller-runtime/pkg/event"
31
"sigs.k8s.io/controller-runtime/pkg/handler"
32
"sigs.k8s.io/controller-runtime/pkg/log"
33
"sigs.k8s.io/controller-runtime/pkg/predicate"
34
"sigs.k8s.io/controller-runtime/pkg/reconcile"
35
36
wsk8s "github.com/gitpod-io/gitpod/common-go/kubernetes"
37
"github.com/gitpod-io/gitpod/common-go/tracing"
38
"github.com/gitpod-io/gitpod/ws-manager-mk2/pkg/constants"
39
"github.com/gitpod-io/gitpod/ws-manager-mk2/pkg/maintenance"
40
config "github.com/gitpod-io/gitpod/ws-manager/api/config"
41
workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1"
42
"github.com/go-logr/logr"
43
"github.com/prometheus/client_golang/prometheus"
44
)
45
46
const (
47
metricsNamespace = "gitpod"
48
metricsWorkspaceSubsystem = "ws_manager_mk2"
49
// kubernetesOperationTimeout is the time we give Kubernetes operations in general.
50
kubernetesOperationTimeout = 5 * time.Second
51
maintenanceRequeue = 1 * time.Minute
52
)
53
54
func NewWorkspaceReconciler(c client.Client, restConfig *rest.Config, scheme *runtime.Scheme, recorder record.EventRecorder, cfg *config.Configuration, reg prometheus.Registerer, maintenance maintenance.Maintenance) (*WorkspaceReconciler, error) {
55
// Create kubernetes clientset
56
kubeClient, err := kubernetes.NewForConfig(restConfig)
57
if err != nil {
58
return nil, fmt.Errorf("failed to create kubernetes client: %w", err)
59
}
60
61
reconciler := &WorkspaceReconciler{
62
Client: c,
63
Scheme: scheme,
64
Config: cfg,
65
maintenance: maintenance,
66
Recorder: recorder,
67
kubeClient: kubeClient,
68
}
69
70
metrics, err := newControllerMetrics(reconciler)
71
if err != nil {
72
return nil, err
73
}
74
reg.MustRegister(metrics)
75
reconciler.metrics = metrics
76
77
return reconciler, nil
78
}
79
80
// WorkspaceReconciler reconciles a Workspace object
81
type WorkspaceReconciler struct {
82
client.Client
83
Scheme *runtime.Scheme
84
85
Config *config.Configuration
86
metrics *controllerMetrics
87
maintenance maintenance.Maintenance
88
Recorder record.EventRecorder
89
90
kubeClient kubernetes.Interface
91
}
92
93
//+kubebuilder:rbac:groups=workspace.gitpod.io,resources=workspaces,verbs=get;list;watch;create;update;patch;delete
94
//+kubebuilder:rbac:groups=workspace.gitpod.io,resources=workspaces/status,verbs=get;update;patch
95
//+kubebuilder:rbac:groups=workspace.gitpod.io,resources=workspaces/finalizers,verbs=update
96
//+kubebuilder:rbac:groups=core,resources=pod,verbs=get;list;watch;create;update;patch;delete
97
//+kubebuilder:rbac:groups=core,resources=pod/status,verbs=get
98
99
// Reconcile is part of the main kubernetes reconciliation loop which aims to
100
// move the current state of the cluster closer to the desired state.
101
// Modify the Reconcile function to compare the state specified by
102
// the Workspace object against the actual cluster state, and then
103
// perform operations to make the cluster state reflect the state specified by
104
// the user.
105
//
106
// For more details, check Reconcile and its Result here:
107
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
108
func (r *WorkspaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, err error) {
109
span, ctx := tracing.FromContext(ctx, "WorkspaceReconciler.Reconcile")
110
defer tracing.FinishSpan(span, &err)
111
log := log.FromContext(ctx)
112
113
var workspace workspacev1.Workspace
114
err = r.Get(ctx, req.NamespacedName, &workspace)
115
if err != nil {
116
if !apierrors.IsNotFound(err) {
117
log.Error(err, "unable to fetch workspace")
118
}
119
// we'll ignore not-found errors, since they can't be fixed by an immediate
120
// requeue (we'll need to wait for a new notification), and we can get them
121
// on deleted requests.
122
return ctrl.Result{}, client.IgnoreNotFound(err)
123
}
124
125
if workspace.Status.Conditions == nil {
126
workspace.Status.Conditions = []metav1.Condition{}
127
}
128
129
log = log.WithValues("owi", workspace.OWI())
130
ctx = logr.NewContext(ctx, log)
131
log.V(2).Info("reconciling workspace", "workspace", req.NamespacedName, "phase", workspace.Status.Phase)
132
133
workspacePods, err := r.listWorkspacePods(ctx, &workspace)
134
if err != nil {
135
log.Error(err, "unable to list workspace pods")
136
return ctrl.Result{}, fmt.Errorf("failed to list workspace pods: %w", err)
137
}
138
139
oldStatus := workspace.Status.DeepCopy()
140
err = r.updateWorkspaceStatus(ctx, &workspace, workspacePods, r.Config)
141
if err != nil {
142
return ctrl.Result{}, fmt.Errorf("failed to compute latest workspace status: %w", err)
143
}
144
145
r.updateMetrics(ctx, &workspace)
146
r.emitPhaseEvents(ctx, &workspace, oldStatus)
147
148
var podStatus *corev1.PodStatus
149
if len(workspacePods.Items) > 0 {
150
podStatus = &workspacePods.Items[0].Status
151
}
152
153
if !equality.Semantic.DeepDerivative(oldStatus, workspace.Status) {
154
log.Info("updating workspace status", "status", workspace.Status, "podStatus", podStatus, "pods", len(workspacePods.Items))
155
}
156
157
err = r.Status().Update(ctx, &workspace)
158
if err != nil {
159
return errorResultLogConflict(log, fmt.Errorf("failed to update workspace status: %w", err))
160
}
161
162
result, err = r.actOnStatus(ctx, &workspace, workspacePods)
163
if err != nil {
164
return errorResultLogConflict(log, fmt.Errorf("failed to act on status: %w", err))
165
}
166
167
return result, nil
168
}
169
170
func (r *WorkspaceReconciler) listWorkspacePods(ctx context.Context, ws *workspacev1.Workspace) (list *corev1.PodList, err error) {
171
span, ctx := tracing.FromContext(ctx, "listWorkspacePods")
172
defer tracing.FinishSpan(span, &err)
173
174
var workspacePods corev1.PodList
175
err = r.List(ctx, &workspacePods, client.InNamespace(ws.Namespace), client.MatchingFields{wsOwnerKey: ws.Name})
176
if err != nil {
177
return nil, err
178
}
179
180
return &workspacePods, nil
181
}
182
183
func (r *WorkspaceReconciler) actOnStatus(ctx context.Context, workspace *workspacev1.Workspace, workspacePods *corev1.PodList) (result ctrl.Result, err error) {
184
span, ctx := tracing.FromContext(ctx, "actOnStatus")
185
defer tracing.FinishSpan(span, &err)
186
log := log.FromContext(ctx)
187
188
if workspace.Status.Phase != workspacev1.WorkspacePhaseStopped && !r.metrics.containsWorkspace(workspace) {
189
// If the workspace hasn't stopped yet, and we don't know about this workspace yet, remember it.
190
r.metrics.rememberWorkspace(workspace, nil)
191
}
192
193
if len(workspacePods.Items) == 0 {
194
// if there isn't a workspace pod and we're not currently deleting this workspace,// create one.
195
switch {
196
case workspace.Status.PodStarts == 0 || workspace.Status.PodStarts-workspace.Status.PodRecreated < 1:
197
serverVersion := r.getServerVersion(ctx)
198
sctx, err := newStartWorkspaceContext(ctx, r.Config, workspace, serverVersion)
199
if err != nil {
200
log.Error(err, "unable to create startWorkspace context")
201
return ctrl.Result{Requeue: true}, err
202
}
203
204
pod, err := r.createWorkspacePod(sctx)
205
if err != nil {
206
log.Error(err, "unable to produce workspace pod")
207
return ctrl.Result{}, err
208
}
209
210
if err := ctrl.SetControllerReference(workspace, pod, r.Scheme); err != nil {
211
return ctrl.Result{}, err
212
}
213
214
err = r.Create(ctx, pod)
215
if apierrors.IsAlreadyExists(err) {
216
// pod exists, we're good
217
} else if err != nil {
218
log.Error(err, "unable to create Pod for Workspace", "pod", pod)
219
return ctrl.Result{Requeue: true}, err
220
} else {
221
// Must increment and persist the pod starts, and ensure we retry on conflict.
222
// If we fail to persist this value, it's possible that the Pod gets recreated
223
// when the workspace stops, due to PodStarts still being 0 when the original Pod
224
// disappears.
225
// Use a Patch instead of an Update, to prevent conflicts.
226
patch := client.MergeFrom(workspace.DeepCopy())
227
workspace.Status.PodStarts++
228
if err := r.Status().Patch(ctx, workspace, patch); err != nil {
229
log.Error(err, "Failed to patch PodStarts in workspace status")
230
return ctrl.Result{}, err
231
}
232
233
r.Recorder.Event(workspace, corev1.EventTypeNormal, "Creating", "")
234
}
235
236
case workspace.Status.Phase == workspacev1.WorkspacePhaseStopped && workspace.IsConditionTrue(workspacev1.WorkspaceConditionPodRejected):
237
if workspace.Status.PodRecreated > r.Config.PodRecreationMaxRetries {
238
workspace.Status.SetCondition(workspacev1.NewWorkspaceConditionPodRejected(fmt.Sprintf("Pod reached maximum recreations %d, failing", workspace.Status.PodRecreated), metav1.ConditionFalse))
239
return ctrl.Result{Requeue: true}, nil // requeue so we end up in the "Stopped" case below
240
}
241
log = log.WithValues("PodStarts", workspace.Status.PodStarts, "PodRecreated", workspace.Status.PodRecreated, "Phase", workspace.Status.Phase)
242
243
// Make sure to wait for "recreationTimeout" before creating the pod again
244
if workspace.Status.PodDeletionTime == nil {
245
log.Info("pod recreation: waiting for pod deletion time to be populated...")
246
return ctrl.Result{Requeue: true, RequeueAfter: 5 * time.Second}, nil
247
}
248
249
recreationTimeout := r.podRecreationTimeout()
250
podDeletionTime := workspace.Status.PodDeletionTime.Time
251
waitTime := time.Until(podDeletionTime.Add(recreationTimeout))
252
log = log.WithValues("waitTime", waitTime.String(), "recreationTimeout", recreationTimeout.String(), "podDeletionTime", podDeletionTime.String())
253
if waitTime > 0 {
254
log.Info("pod recreation: waiting for timeout...")
255
return ctrl.Result{Requeue: true, RequeueAfter: waitTime}, nil
256
}
257
log.Info("trigger pod recreation")
258
259
// Reset status
260
sc := workspace.Status.DeepCopy()
261
workspace.Status = workspacev1.WorkspaceStatus{}
262
workspace.Status.Phase = workspacev1.WorkspacePhasePending
263
workspace.Status.OwnerToken = sc.OwnerToken
264
workspace.Status.PodStarts = sc.PodStarts
265
workspace.Status.PodRecreated = sc.PodRecreated + 1
266
workspace.Status.SetCondition(workspacev1.NewWorkspaceConditionPodRejected(fmt.Sprintf("Recreating pod... (%d retry)", workspace.Status.PodRecreated), metav1.ConditionFalse))
267
268
if err := r.Status().Update(ctx, workspace); err != nil {
269
log.Error(err, "Failed to update workspace status-reset")
270
return ctrl.Result{}, err
271
}
272
273
// Reset metrics cache
274
r.metrics.forgetWorkspace(workspace)
275
276
r.Recorder.Event(workspace, corev1.EventTypeNormal, "Recreating", "")
277
return ctrl.Result{Requeue: true}, nil
278
279
case workspace.Status.Phase == workspacev1.WorkspacePhaseStopped:
280
if err := r.deleteWorkspaceSecrets(ctx, workspace); err != nil {
281
return ctrl.Result{}, err
282
}
283
284
// Done stopping workspace - remove finalizer.
285
if controllerutil.ContainsFinalizer(workspace, workspacev1.GitpodFinalizerName) {
286
controllerutil.RemoveFinalizer(workspace, workspacev1.GitpodFinalizerName)
287
if err := r.Update(ctx, workspace); err != nil {
288
if apierrors.IsNotFound(err) {
289
return ctrl.Result{}, nil
290
} else {
291
return ctrl.Result{}, fmt.Errorf("failed to remove gitpod finalizer from workspace: %w", err)
292
}
293
}
294
}
295
296
// Workspace might have already been in a deleting state,
297
// but not guaranteed, so try deleting anyway.
298
r.Recorder.Event(workspace, corev1.EventTypeNormal, "Deleting", "")
299
err := r.Client.Delete(ctx, workspace)
300
return ctrl.Result{}, client.IgnoreNotFound(err)
301
}
302
303
return ctrl.Result{}, nil
304
}
305
306
// all actions below assume there is a pod
307
if len(workspacePods.Items) == 0 {
308
return ctrl.Result{}, nil
309
}
310
pod := &workspacePods.Items[0]
311
312
switch {
313
// if there is a pod, and it's failed, delete it
314
case workspace.IsConditionTrue(workspacev1.WorkspaceConditionFailed) && !isPodBeingDeleted(pod):
315
return r.deleteWorkspacePod(ctx, pod, "workspace failed")
316
317
// if the pod was stopped by request, delete it
318
case workspace.IsConditionTrue(workspacev1.WorkspaceConditionStoppedByRequest) && !isPodBeingDeleted(pod):
319
var gracePeriodSeconds *int64
320
if c := wsk8s.GetCondition(workspace.Status.Conditions, string(workspacev1.WorkspaceConditionStoppedByRequest)); c != nil {
321
if dt, err := time.ParseDuration(c.Message); err == nil {
322
s := int64(dt.Seconds())
323
gracePeriodSeconds = &s
324
}
325
}
326
err := r.Client.Delete(ctx, pod, &client.DeleteOptions{
327
GracePeriodSeconds: gracePeriodSeconds,
328
})
329
if apierrors.IsNotFound(err) {
330
// pod is gone - nothing to do here
331
} else {
332
return ctrl.Result{Requeue: true}, err
333
}
334
335
// if the node disappeared, delete the pod.
336
case workspace.IsConditionTrue(workspacev1.WorkspaceConditionNodeDisappeared) && !isPodBeingDeleted(pod):
337
return r.deleteWorkspacePod(ctx, pod, "node disappeared")
338
339
// if the workspace timed out, delete it
340
case workspace.IsConditionTrue(workspacev1.WorkspaceConditionTimeout) && !isPodBeingDeleted(pod):
341
return r.deleteWorkspacePod(ctx, pod, "timed out")
342
343
// if the content initialization failed, delete the pod
344
case wsk8s.ConditionWithStatusAndReason(workspace.Status.Conditions, string(workspacev1.WorkspaceConditionContentReady), false, workspacev1.ReasonInitializationFailure) && !isPodBeingDeleted(pod):
345
return r.deleteWorkspacePod(ctx, pod, "init failed")
346
347
case isWorkspaceBeingDeleted(workspace) && !isPodBeingDeleted(pod):
348
return r.deleteWorkspacePod(ctx, pod, "workspace deleted")
349
350
case workspace.IsHeadless() && workspace.Status.Phase == workspacev1.WorkspacePhaseStopped && !isPodBeingDeleted(pod):
351
// Workspace was requested to be deleted, propagate by deleting the Pod.
352
// The Pod deletion will then trigger workspace disposal steps.
353
err := r.Client.Delete(ctx, pod)
354
if apierrors.IsNotFound(err) {
355
// pod is gone - nothing to do here
356
} else {
357
return ctrl.Result{Requeue: true}, err
358
}
359
360
case workspace.Status.Phase == workspacev1.WorkspacePhaseRunning:
361
err := r.deleteWorkspaceSecrets(ctx, workspace)
362
if err != nil {
363
log.Error(err, "could not delete workspace secrets")
364
}
365
366
// we've disposed already - try to remove the finalizer and call it a day
367
case workspace.Status.Phase == workspacev1.WorkspacePhaseStopped:
368
hadFinalizer := controllerutil.ContainsFinalizer(pod, workspacev1.GitpodFinalizerName)
369
controllerutil.RemoveFinalizer(pod, workspacev1.GitpodFinalizerName)
370
if err := r.Client.Update(ctx, pod); err != nil {
371
return ctrl.Result{}, fmt.Errorf("failed to remove gitpod finalizer from pod: %w", err)
372
}
373
374
if hadFinalizer {
375
// Requeue to remove workspace.
376
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
377
}
378
}
379
380
return ctrl.Result{}, nil
381
}
382
383
func (r *WorkspaceReconciler) podRecreationTimeout() time.Duration {
384
recreationTimeout := 15 * time.Second // waiting less time creates issues with ws-daemon's pod-centric control loop ("Dispatch") if the workspace ends up on the same node again
385
if r.Config.PodRecreationBackoff != 0 {
386
recreationTimeout = time.Duration(r.Config.PodRecreationBackoff)
387
}
388
return recreationTimeout
389
}
390
391
func (r *WorkspaceReconciler) updateMetrics(ctx context.Context, workspace *workspacev1.Workspace) {
392
log := log.FromContext(ctx)
393
394
ok, lastState := r.metrics.getWorkspace(&log, workspace)
395
if !ok {
396
return
397
}
398
399
if !lastState.recordedInitFailure && wsk8s.ConditionWithStatusAndReason(workspace.Status.Conditions, string(workspacev1.WorkspaceConditionContentReady), false, workspacev1.ReasonInitializationFailure) {
400
r.metrics.countTotalRestoreFailures(&log, workspace)
401
lastState.recordedInitFailure = true
402
}
403
404
if !lastState.recordedFailure && workspace.IsConditionTrue(workspacev1.WorkspaceConditionFailed) {
405
r.metrics.countWorkspaceFailure(&log, workspace)
406
lastState.recordedFailure = true
407
}
408
409
if lastState.pendingStartTime.IsZero() && workspace.Status.Phase == workspacev1.WorkspacePhasePending {
410
lastState.pendingStartTime = time.Now()
411
} else if !lastState.pendingStartTime.IsZero() && workspace.Status.Phase != workspacev1.WorkspacePhasePending {
412
r.metrics.recordWorkspacePendingTime(&log, workspace, lastState.pendingStartTime)
413
lastState.pendingStartTime = time.Time{}
414
}
415
416
if lastState.creatingStartTime.IsZero() && workspace.Status.Phase == workspacev1.WorkspacePhaseCreating {
417
lastState.creatingStartTime = time.Now()
418
} else if !lastState.creatingStartTime.IsZero() && workspace.Status.Phase != workspacev1.WorkspacePhaseCreating {
419
r.metrics.recordWorkspaceCreatingTime(&log, workspace, lastState.creatingStartTime)
420
lastState.creatingStartTime = time.Time{}
421
}
422
423
if !lastState.recordedContentReady && workspace.IsConditionTrue(workspacev1.WorkspaceConditionContentReady) {
424
r.metrics.countTotalRestores(&log, workspace)
425
lastState.recordedContentReady = true
426
}
427
428
if !lastState.recordedBackupFailed && workspace.IsConditionTrue(workspacev1.WorkspaceConditionBackupFailure) {
429
r.metrics.countTotalBackups(&log, workspace)
430
r.metrics.countTotalBackupFailures(&log, workspace)
431
lastState.recordedBackupFailed = true
432
}
433
434
if !lastState.recordedBackupCompleted && workspace.IsConditionTrue(workspacev1.WorkspaceConditionBackupComplete) {
435
r.metrics.countTotalBackups(&log, workspace)
436
lastState.recordedBackupCompleted = true
437
}
438
439
if !lastState.recordedStartTime && workspace.Status.Phase == workspacev1.WorkspacePhaseRunning {
440
r.metrics.recordWorkspaceStartupTime(&log, workspace)
441
lastState.recordedStartTime = true
442
}
443
444
if lastState.recordedRecreations < workspace.Status.PodRecreated {
445
r.metrics.countWorkspaceRecreations(&log, workspace)
446
lastState.recordedRecreations = workspace.Status.PodRecreated
447
}
448
449
if workspace.Status.Phase == workspacev1.WorkspacePhaseStopped {
450
r.metrics.countWorkspaceStop(&log, workspace)
451
452
if !lastState.recordedStartFailure && isStartFailure(workspace) {
453
// Workspace never became ready, count as a startup failure.
454
r.metrics.countWorkspaceStartFailures(&log, workspace)
455
// No need to record in metricState, as we're forgetting the workspace state next anyway.
456
}
457
458
// Forget about this workspace, no more state updates will be recorded after this.
459
r.metrics.forgetWorkspace(workspace)
460
return
461
}
462
463
r.metrics.rememberWorkspace(workspace, &lastState)
464
}
465
466
func isStartFailure(ws *workspacev1.Workspace) bool {
467
// Consider workspaces that never became ready as start failures.
468
everReady := ws.IsConditionTrue(workspacev1.WorkspaceConditionEverReady)
469
// Except for aborted prebuilds, as they can get aborted before becoming ready, which shouldn't be counted
470
// as a start failure.
471
isAborted := ws.IsConditionTrue(workspacev1.WorkspaceConditionAborted)
472
// Also ignore workspaces that are requested to be stopped before they became ready.
473
isStoppedByRequest := ws.IsConditionTrue(workspacev1.WorkspaceConditionStoppedByRequest)
474
// Also ignore pods that got rejected by the node
475
isPodRejected := ws.IsConditionTrue(workspacev1.WorkspaceConditionPodRejected)
476
return !everReady && !isAborted && !isStoppedByRequest && !isPodRejected
477
}
478
479
func (r *WorkspaceReconciler) emitPhaseEvents(ctx context.Context, ws *workspacev1.Workspace, old *workspacev1.WorkspaceStatus) {
480
if ws.Status.Phase == workspacev1.WorkspacePhaseInitializing && old.Phase != workspacev1.WorkspacePhaseInitializing {
481
r.Recorder.Event(ws, corev1.EventTypeNormal, "Initializing", "")
482
}
483
484
if ws.Status.Phase == workspacev1.WorkspacePhaseRunning && old.Phase != workspacev1.WorkspacePhaseRunning {
485
r.Recorder.Event(ws, corev1.EventTypeNormal, "Running", "")
486
}
487
488
if ws.Status.Phase == workspacev1.WorkspacePhaseStopping && old.Phase != workspacev1.WorkspacePhaseStopping {
489
r.Recorder.Event(ws, corev1.EventTypeNormal, "Stopping", "")
490
}
491
}
492
493
func (r *WorkspaceReconciler) deleteWorkspacePod(ctx context.Context, pod *corev1.Pod, reason string) (result ctrl.Result, err error) {
494
span, ctx := tracing.FromContext(ctx, "deleteWorkspacePod")
495
defer tracing.FinishSpan(span, &err)
496
497
// Workspace was requested to be deleted, propagate by deleting the Pod.
498
// The Pod deletion will then trigger workspace disposal steps.
499
err = r.Client.Delete(ctx, pod)
500
if apierrors.IsNotFound(err) {
501
// pod is gone - nothing to do here
502
} else {
503
return ctrl.Result{Requeue: true}, err
504
}
505
506
return ctrl.Result{}, nil
507
}
508
509
func (r *WorkspaceReconciler) deleteWorkspaceSecrets(ctx context.Context, ws *workspacev1.Workspace) (err error) {
510
span, ctx := tracing.FromContext(ctx, "deleteWorkspaceSecrets")
511
defer tracing.FinishSpan(span, &err)
512
log := log.FromContext(ctx).WithValues("owi", ws.OWI())
513
514
// if a secret cannot be deleted we do not return early because we want to attempt
515
// the deletion of the remaining secrets
516
var errs []string
517
err = r.deleteSecret(ctx, fmt.Sprintf("%s-%s", ws.Name, "env"), r.Config.Namespace)
518
if err != nil {
519
errs = append(errs, err.Error())
520
log.Error(err, "could not delete environment secret", "workspace", ws.Name)
521
}
522
523
err = r.deleteSecret(ctx, fmt.Sprintf("%s-%s", ws.Name, "tokens"), r.Config.SecretsNamespace)
524
if err != nil {
525
errs = append(errs, err.Error())
526
log.Error(err, "could not delete token secret", "workspace", ws.Name)
527
}
528
529
if len(errs) != 0 {
530
return fmt.Errorf("%s", strings.Join(errs, ":"))
531
}
532
533
return nil
534
}
535
536
func (r *WorkspaceReconciler) deleteSecret(ctx context.Context, name, namespace string) error {
537
log := log.FromContext(ctx)
538
539
err := wait.ExponentialBackoffWithContext(ctx, wait.Backoff{
540
Duration: 100 * time.Millisecond,
541
Factor: 1.5,
542
Jitter: 0.2,
543
Steps: 3,
544
}, func(ctx context.Context) (bool, error) {
545
var secret corev1.Secret
546
err := r.Client.Get(ctx, types.NamespacedName{Name: name, Namespace: namespace}, &secret)
547
if apierrors.IsNotFound(err) {
548
// nothing to delete
549
return true, nil
550
}
551
552
if err != nil {
553
log.Error(err, "cannot retrieve secret scheduled for deletion", "secret", name)
554
return false, nil
555
}
556
557
err = r.Client.Delete(ctx, &secret)
558
if err != nil && !apierrors.IsNotFound(err) {
559
log.Error(err, "cannot delete secret", "secret", name)
560
return false, nil
561
}
562
563
return true, nil
564
})
565
566
return err
567
}
568
569
// errorLogConflict logs the error if it's a conflict, instead of returning it as a reconciler error.
570
// This is to reduce noise in our error logging, as conflicts are to be expected.
571
// For conflicts, instead a result with `Requeue: true` is returned, which has the same requeuing
572
// behaviour as returning an error.
573
func errorResultLogConflict(log logr.Logger, err error) (ctrl.Result, error) {
574
if apierrors.IsConflict(err) {
575
return ctrl.Result{RequeueAfter: 100 * time.Millisecond}, nil
576
} else {
577
return ctrl.Result{}, err
578
}
579
}
580
581
var (
582
wsOwnerKey = ".metadata.controller"
583
apiGVStr = workspacev1.GroupVersion.String()
584
)
585
586
// SetupWithManager sets up the controller with the Manager.
587
func (r *WorkspaceReconciler) SetupWithManager(mgr ctrl.Manager) error {
588
return ctrl.NewControllerManagedBy(mgr).
589
Named("workspace").
590
WithOptions(controller.Options{
591
MaxConcurrentReconciles: r.Config.WorkspaceMaxConcurrentReconciles,
592
}).
593
For(&workspacev1.Workspace{}).
594
WithEventFilter(predicate.NewPredicateFuncs(func(object client.Object) bool {
595
_, ok := object.(*corev1.Node)
596
if ok {
597
return true
598
}
599
600
for k, v := range object.GetLabels() {
601
if k == wsk8s.WorkspaceManagedByLabel {
602
switch v {
603
case constants.ManagedBy:
604
return true
605
default:
606
return false
607
}
608
}
609
}
610
611
return true
612
})).
613
Owns(&corev1.Pod{}).
614
// Add a watch for Nodes, so that they're cached in memory and don't require calling the k8s API
615
// when reconciling workspaces.
616
Watches(&corev1.Node{}, &handler.Funcs{
617
// Only enqueue events for workspaces when the node gets deleted,
618
// such that we can trigger their cleanup.
619
DeleteFunc: func(ctx context.Context, e event.TypedDeleteEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
620
if e.Object == nil {
621
return
622
}
623
624
var wsList workspacev1.WorkspaceList
625
err := r.List(ctx, &wsList)
626
if err != nil {
627
log.FromContext(ctx).Error(err, "cannot list workspaces")
628
return
629
}
630
for _, ws := range wsList.Items {
631
if ws.Status.Runtime == nil || ws.Status.Runtime.NodeName != e.Object.GetName() {
632
continue
633
}
634
queue.Add(ctrl.Request{NamespacedName: types.NamespacedName{
635
Namespace: ws.Namespace,
636
Name: ws.Name,
637
}})
638
}
639
},
640
}).
641
Complete(r)
642
}
643
644
func (r *WorkspaceReconciler) getServerVersion(ctx context.Context) *version.Info {
645
log := log.FromContext(ctx)
646
647
serverVersion, err := r.kubeClient.Discovery().ServerVersion()
648
if err != nil {
649
log.Error(err, "cannot get server version! Assuming 1.30 going forward")
650
serverVersion = &version.Info{
651
Major: "1",
652
Minor: "30",
653
}
654
}
655
return serverVersion
656
}
657
658
func SetupIndexer(mgr ctrl.Manager) error {
659
var err error
660
var once sync.Once
661
once.Do(func() {
662
idx := func(rawObj client.Object) []string {
663
// grab the job object, extract the owner...
664
job := rawObj.(*corev1.Pod)
665
owner := metav1.GetControllerOf(job)
666
if owner == nil {
667
return nil
668
}
669
// ...make sure it's a workspace...
670
if owner.APIVersion != apiGVStr || owner.Kind != "Workspace" {
671
return nil
672
}
673
674
// ...and if so, return it
675
return []string{owner.Name}
676
}
677
678
err = mgr.GetFieldIndexer().IndexField(context.Background(), &corev1.Pod{}, wsOwnerKey, idx)
679
})
680
681
return err
682
}
683
684