Path: blob/main/components/ws-manager-mk2/controllers/subscriber_controller.go
2498 views
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.1// Licensed under the GNU Affero General Public License (AGPL).2// See License-AGPL.txt in the project root for license information.34package controllers56import (7"context"8"os"910"github.com/google/go-cmp/cmp"11"github.com/google/go-cmp/cmp/cmpopts"12"k8s.io/apimachinery/pkg/api/errors"13metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"14ctrl "sigs.k8s.io/controller-runtime"15"sigs.k8s.io/controller-runtime/pkg/client"16"sigs.k8s.io/controller-runtime/pkg/controller"17"sigs.k8s.io/controller-runtime/pkg/event"18"sigs.k8s.io/controller-runtime/pkg/handler"19"sigs.k8s.io/controller-runtime/pkg/log"20"sigs.k8s.io/controller-runtime/pkg/predicate"21"sigs.k8s.io/controller-runtime/pkg/source"2223"github.com/gitpod-io/gitpod/ws-manager-mk2/pkg/constants"24config "github.com/gitpod-io/gitpod/ws-manager/api/config"25workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1"2627k8s "github.com/gitpod-io/gitpod/common-go/kubernetes"28)2930func NewSubscriberReconciler(c client.Client, cfg *config.Configuration) (*SubscriberReconciler, error) {31reconciler := &SubscriberReconciler{32Client: c,33Config: cfg,34}3536return reconciler, nil37}3839type SubscriberReconciler struct {40client.Client4142Config *config.Configuration4344OnReconcile func(ctx context.Context, ws *workspacev1.Workspace)45}4647func (r *SubscriberReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {48log := log.FromContext(ctx)4950var workspace workspacev1.Workspace51if err := r.Get(ctx, req.NamespacedName, &workspace); err != nil {52if !errors.IsNotFound(err) {53log.Error(err, "unable to fetch workspace")54}5556return ctrl.Result{}, client.IgnoreNotFound(err)57}5859if workspace.Status.Conditions == nil {60workspace.Status.Conditions = []metav1.Condition{}61}6263if workspace.IsConditionTrue(workspacev1.WorkspaceConditionPodRejected) {64// In this situation, we are about to re-create the pod. We don't want clients to see all the "stopping, stopped, starting" chatter, so we hide it here.65// TODO(gpl) Is this a sane approach?66return ctrl.Result{}, nil67}6869if r.OnReconcile != nil {70r.OnReconcile(ctx, &workspace)71}7273return ctrl.Result{}, nil74}7576// SetupWithManager sets up the controller with the Manager.77func (r *SubscriberReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {78c, err := controller.NewUnmanaged("subscribers-controller", mgr, controller.Options{Reconciler: r})79if err != nil {80return err81}8283go func() {84err = c.Start(ctx)85if err != nil {86log.FromContext(ctx).Error(err, "cannot start Subscriber reconciler")87os.Exit(1)88}89}()9091filterByManagedBy := func(ws *workspacev1.Workspace) bool {92mgr, ok := ws.Labels[k8s.WorkspaceManagedByLabel]93if !ok {94return true95}9697return mgr == constants.ManagedBy98}99100// we need several reconciliation loops during a workspace creation until it reaches a stable state.101// this introduces the side effect of multiple notifications to the subscribers with partial information.102// the filterByUpdate predicate acts as a filter to avoid this103filterByUpdate := predicate.TypedFuncs[*workspacev1.Workspace]{104CreateFunc: func(ce event.TypedCreateEvent[*workspacev1.Workspace]) bool {105ws := ce.Object106return filterByManagedBy(ws)107},108UpdateFunc: func(e event.TypedUpdateEvent[*workspacev1.Workspace]) bool {109old := e.ObjectOld110new := e.ObjectNew111112mgr, ok := new.Labels[k8s.WorkspaceManagedByLabel]113if !ok {114return true115}116117if mgr != constants.ManagedBy {118return false119}120121if !cmp.Equal(old.Spec.Ports, new.Spec.Ports) {122return true123}124125return !cmp.Equal(old.Status, new.Status, cmpopts.IgnoreFields(workspacev1.WorkspaceStatus{}, "LastActivity"))126},127128DeleteFunc: func(de event.TypedDeleteEvent[*workspacev1.Workspace]) bool {129ws := de.Object130return filterByManagedBy(ws)131},132}133134return c.Watch(source.Kind(mgr.GetCache(), &workspacev1.Workspace{}, &handler.TypedEnqueueRequestForObject[*workspacev1.Workspace]{}, filterByUpdate))135}136137138