Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/ws-manager-mk2/controllers/subscriber_controller.go
2498 views
1
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License-AGPL.txt in the project root for license information.
4
5
package controllers
6
7
import (
8
"context"
9
"os"
10
11
"github.com/google/go-cmp/cmp"
12
"github.com/google/go-cmp/cmp/cmpopts"
13
"k8s.io/apimachinery/pkg/api/errors"
14
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15
ctrl "sigs.k8s.io/controller-runtime"
16
"sigs.k8s.io/controller-runtime/pkg/client"
17
"sigs.k8s.io/controller-runtime/pkg/controller"
18
"sigs.k8s.io/controller-runtime/pkg/event"
19
"sigs.k8s.io/controller-runtime/pkg/handler"
20
"sigs.k8s.io/controller-runtime/pkg/log"
21
"sigs.k8s.io/controller-runtime/pkg/predicate"
22
"sigs.k8s.io/controller-runtime/pkg/source"
23
24
"github.com/gitpod-io/gitpod/ws-manager-mk2/pkg/constants"
25
config "github.com/gitpod-io/gitpod/ws-manager/api/config"
26
workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1"
27
28
k8s "github.com/gitpod-io/gitpod/common-go/kubernetes"
29
)
30
31
func NewSubscriberReconciler(c client.Client, cfg *config.Configuration) (*SubscriberReconciler, error) {
32
reconciler := &SubscriberReconciler{
33
Client: c,
34
Config: cfg,
35
}
36
37
return reconciler, nil
38
}
39
40
type SubscriberReconciler struct {
41
client.Client
42
43
Config *config.Configuration
44
45
OnReconcile func(ctx context.Context, ws *workspacev1.Workspace)
46
}
47
48
func (r *SubscriberReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
49
log := log.FromContext(ctx)
50
51
var workspace workspacev1.Workspace
52
if err := r.Get(ctx, req.NamespacedName, &workspace); err != nil {
53
if !errors.IsNotFound(err) {
54
log.Error(err, "unable to fetch workspace")
55
}
56
57
return ctrl.Result{}, client.IgnoreNotFound(err)
58
}
59
60
if workspace.Status.Conditions == nil {
61
workspace.Status.Conditions = []metav1.Condition{}
62
}
63
64
if workspace.IsConditionTrue(workspacev1.WorkspaceConditionPodRejected) {
65
// In this situation, we are about to re-create the pod. We don't want clients to see all the "stopping, stopped, starting" chatter, so we hide it here.
66
// TODO(gpl) Is this a sane approach?
67
return ctrl.Result{}, nil
68
}
69
70
if r.OnReconcile != nil {
71
r.OnReconcile(ctx, &workspace)
72
}
73
74
return ctrl.Result{}, nil
75
}
76
77
// SetupWithManager sets up the controller with the Manager.
78
func (r *SubscriberReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
79
c, err := controller.NewUnmanaged("subscribers-controller", mgr, controller.Options{Reconciler: r})
80
if err != nil {
81
return err
82
}
83
84
go func() {
85
err = c.Start(ctx)
86
if err != nil {
87
log.FromContext(ctx).Error(err, "cannot start Subscriber reconciler")
88
os.Exit(1)
89
}
90
}()
91
92
filterByManagedBy := func(ws *workspacev1.Workspace) bool {
93
mgr, ok := ws.Labels[k8s.WorkspaceManagedByLabel]
94
if !ok {
95
return true
96
}
97
98
return mgr == constants.ManagedBy
99
}
100
101
// we need several reconciliation loops during a workspace creation until it reaches a stable state.
102
// this introduces the side effect of multiple notifications to the subscribers with partial information.
103
// the filterByUpdate predicate acts as a filter to avoid this
104
filterByUpdate := predicate.TypedFuncs[*workspacev1.Workspace]{
105
CreateFunc: func(ce event.TypedCreateEvent[*workspacev1.Workspace]) bool {
106
ws := ce.Object
107
return filterByManagedBy(ws)
108
},
109
UpdateFunc: func(e event.TypedUpdateEvent[*workspacev1.Workspace]) bool {
110
old := e.ObjectOld
111
new := e.ObjectNew
112
113
mgr, ok := new.Labels[k8s.WorkspaceManagedByLabel]
114
if !ok {
115
return true
116
}
117
118
if mgr != constants.ManagedBy {
119
return false
120
}
121
122
if !cmp.Equal(old.Spec.Ports, new.Spec.Ports) {
123
return true
124
}
125
126
return !cmp.Equal(old.Status, new.Status, cmpopts.IgnoreFields(workspacev1.WorkspaceStatus{}, "LastActivity"))
127
},
128
129
DeleteFunc: func(de event.TypedDeleteEvent[*workspacev1.Workspace]) bool {
130
ws := de.Object
131
return filterByManagedBy(ws)
132
},
133
}
134
135
return c.Watch(source.Kind(mgr.GetCache(), &workspacev1.Workspace{}, &handler.TypedEnqueueRequestForObject[*workspacev1.Workspace]{}, filterByUpdate))
136
}
137
138