Path: blob/main/component/loki/source/podlogs/controller.go
5302 views
package podlogs12import (3"context"4"errors"5"fmt"6"sync"7"time"89"github.com/go-kit/log"10"github.com/go-kit/log/level"11monitoringv1alpha2 "github.com/grafana/agent/component/loki/source/podlogs/internal/apis/monitoring/v1alpha2"12corev1 "k8s.io/api/core/v1"13"k8s.io/apimachinery/pkg/runtime"14"k8s.io/client-go/rest"15toolscache "k8s.io/client-go/tools/cache"16"sigs.k8s.io/controller-runtime/pkg/cache"17"sigs.k8s.io/controller-runtime/pkg/client"18)1920type controller struct {21log log.Logger22reconciler *reconciler2324mut sync.RWMutex25informers cache.Informers26client client.Client27reloadCh chan struct{} // Written to when informers or client changes2829reconcileCh chan struct{}30doneCh chan struct{}31}3233// Generous timeout period for configuring all informers34const informerSyncTimeout = 10 * time.Second3536// newController creates a new, unstarted controller. The controller will37// request a reconcile when the state of Kubernetes changes.38func newController(l log.Logger, reconciler *reconciler) *controller {39return &controller{40log: l,41reconciler: reconciler,42reloadCh: make(chan struct{}, 1),4344reconcileCh: make(chan struct{}, 1),45doneCh: make(chan struct{}),46}47}4849func (ctrl *controller) UpdateConfig(cfg *rest.Config) error {50scheme := runtime.NewScheme()51for _, add := range []func(*runtime.Scheme) error{52corev1.AddToScheme,53monitoringv1alpha2.AddToScheme,54} {55if err := add(scheme); err != nil {56return fmt.Errorf("unable to register scheme: %w", err)57}58}5960cli, err := client.New(cfg, client.Options{Scheme: scheme})61if err != nil {62return err63}6465cache, err := cache.New(cfg, cache.Options{Scheme: scheme})66if err != nil {67return err68}6970delegateCli, err := client.NewDelegatingClient(client.NewDelegatingClientInput{71CacheReader: cache,72Client: cli,73})74if err != nil {75return err76}7778// Update the stored informers and client and schedule a reload.79ctrl.mut.Lock()80ctrl.informers = cache81ctrl.client = delegateCli82ctrl.mut.Unlock()8384select {85case ctrl.reloadCh <- struct{}{}:86default:87// Reload is already scheduled88}89return nil90}9192// Run the controller.93func (ctrl *controller) Run(ctx context.Context) error {94var (95cancel context.CancelFunc96informers cache.Informers97)9899for {100select {101case <-ctx.Done():102return nil103case <-ctrl.reloadCh:104ctrl.mut.RLock()105var (106newInformers = ctrl.informers107newClient = ctrl.client108)109ctrl.mut.RUnlock()110111// Stop old informers.112if informers != nil {113cancel()114}115116informerContext, informerCancel := context.WithCancel(ctx)117118go func() {119if err := ctrl.run(informerContext, newInformers, newClient); err != nil {120level.Error(ctrl.log).Log("msg", "failed to run controller", "err", err)121}122}()123124cancel = informerCancel125informers = newInformers126}127}128}129130func (ctrl *controller) run(ctx context.Context, informers cache.Informers, client client.Client) error {131level.Info(ctrl.log).Log("msg", "starting controller")132defer level.Info(ctrl.log).Log("msg", "controller exiting")133134go func() {135err := informers.Start(ctx)136if err != nil && ctx.Err() != nil {137level.Error(ctrl.log).Log("msg", "failed to start informers", "err", err)138}139}()140141if !informers.WaitForCacheSync(ctx) {142return fmt.Errorf("informer caches failed to sync")143}144145if err := ctrl.configureInformers(ctx, informers); err != nil {146return fmt.Errorf("failed to configure informers: %w", err)147}148149for {150select {151case <-ctx.Done():152return nil153case <-ctrl.reconcileCh:154if err := ctrl.reconciler.Reconcile(ctx, client); err != nil {155level.Error(ctrl.log).Log("msg", "reconcile failed", "err", err)156}157}158}159}160161// configureInformers starts the informers used by this controller to perform reconciles.162func (ctrl *controller) configureInformers(ctx context.Context, informers cache.Informers) error {163// We want to re-reconcile the set of PodLogs whenever namespaces, pods, or164// PodLogs changes. Reconciling on namespaces and pods is important so that165// we can reevaluate selectors defined in PodLogs.166types := []client.Object{167&corev1.Namespace{},168&corev1.Pod{},169&monitoringv1alpha2.PodLogs{},170}171172informerCtx, cancel := context.WithTimeout(ctx, informerSyncTimeout)173defer cancel()174175for _, ty := range types {176informer, err := informers.GetInformer(informerCtx, ty)177if err != nil {178if errors.Is(informerCtx.Err(), context.DeadlineExceeded) { // Check the context to prevent GetInformer returning a fake timeout179return fmt.Errorf("Timeout exceeded while configuring informers. Check the connection"+180" to the Kubernetes API is stable and that the Agent has appropriate RBAC permissions for %v", ty)181}182183return err184}185_, err = informer.AddEventHandler(onChangeEventHandler{ChangeFunc: ctrl.RequestReconcile})186if err != nil {187return err188}189}190return nil191}192193func (ctrl *controller) RequestReconcile() {194select {195case ctrl.reconcileCh <- struct{}{}:196default:197// Reconcile is already queued; do nothing.198}199}200201// onChangeEventHandler implements [toolscache.ResourceEventHandler], calling202// ChangeFunc when any change occurs. Objects are not sent to the handler.203type onChangeEventHandler struct {204ChangeFunc func()205}206207var _ toolscache.ResourceEventHandler = onChangeEventHandler{}208209func (h onChangeEventHandler) OnAdd(_ interface{}) { h.ChangeFunc() }210func (h onChangeEventHandler) OnUpdate(_, _ interface{}) { h.ChangeFunc() }211func (h onChangeEventHandler) OnDelete(_ interface{}) { h.ChangeFunc() }212213214