Path: blob/main/component/prometheus/operator/common/crdmanager.go
5330 views
package common12import (3"context"4"errors"5"fmt"6"strings"7"sync"8"time"910"github.com/go-kit/log"11"github.com/go-kit/log/level"12"github.com/grafana/agent/component"13"github.com/grafana/agent/component/prometheus"14"github.com/prometheus/prometheus/config"15"github.com/prometheus/prometheus/discovery"16"github.com/prometheus/prometheus/discovery/targetgroup"17"github.com/prometheus/prometheus/scrape"18toolscache "k8s.io/client-go/tools/cache"19"sigs.k8s.io/controller-runtime/pkg/cache"20"sigs.k8s.io/controller-runtime/pkg/client"2122"github.com/grafana/agent/component/prometheus/operator"23"github.com/grafana/agent/component/prometheus/operator/configgen"24compscrape "github.com/grafana/agent/component/prometheus/scrape"25promopv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"26"k8s.io/apimachinery/pkg/labels"27"k8s.io/apimachinery/pkg/runtime"28)2930// Generous timeout period for configuring all informers31const informerSyncTimeout = 10 * time.Second3233// crdManager is all of the fields required to run a crd based component.34// on update, this entire thing should be recreated and restarted35type crdManager struct {36mut sync.Mutex37discoveryConfigs map[string]discovery.Configs38scrapeConfigs map[string]*config.ScrapeConfig39debugInfo map[string]*operator.DiscoveredResource40discoveryManager *discovery.Manager41scrapeManager *scrape.Manager4243opts component.Options44logger log.Logger45args *operator.Arguments46configGen configgen.ConfigGenerator4748kind string49}5051const (52KindPodMonitor string = "podMonitor"53KindServiceMonitor string = "serviceMonitor"54)5556func newCrdManager(opts component.Options, logger log.Logger, args *operator.Arguments, kind string) *crdManager {57switch kind {58case KindPodMonitor, KindServiceMonitor:59default:60panic(fmt.Sprintf("Unknown kind for crdManager: %s", kind))61}62return &crdManager{63opts: opts,64logger: logger,65args: args,66discoveryConfigs: map[string]discovery.Configs{},67scrapeConfigs: map[string]*config.ScrapeConfig{},68debugInfo: map[string]*operator.DiscoveredResource{},69kind: kind,70}71}7273func (c *crdManager) Run(ctx context.Context) error {74c.configGen = configgen.ConfigGenerator{75Client: &c.args.Client,76}7778// Start prometheus service discovery manager79c.discoveryManager = discovery.NewManager(ctx, c.logger, discovery.Name(c.opts.ID))80go func() {81err := c.discoveryManager.Run()82if err != nil {83level.Error(c.logger).Log("msg", "discovery manager stopped", "err", err)84}85}()8687if err := c.runInformers(ctx); err != nil {88return err89}90level.Info(c.logger).Log("msg", "informers started")9192// Start prometheus scrape manager.93flowAppendable := prometheus.NewFanout(c.args.ForwardTo, c.opts.ID, c.opts.Registerer)94opts := &scrape.Options{}95c.scrapeManager = scrape.NewManager(opts, c.logger, flowAppendable)96defer c.scrapeManager.Stop()97targetSetsChan := make(chan map[string][]*targetgroup.Group)98go func() {99err := c.scrapeManager.Run(targetSetsChan)100level.Info(c.logger).Log("msg", "scrape manager stopped")101if err != nil {102level.Error(c.logger).Log("msg", "scrape manager failed", "err", err)103}104}()105106// Start the target discovery loop to update the scrape manager with new targets.107for {108select {109case <-ctx.Done():110return nil111case m := <-c.discoveryManager.SyncCh():112targetSetsChan <- m113}114}115}116117// DebugInfo returns debug information for the CRDManager.118func (c *crdManager) DebugInfo() interface{} {119c.mut.Lock()120defer c.mut.Unlock()121122var info operator.DebugInfo123for _, pm := range c.debugInfo {124info.DiscoveredCRDs = append(info.DiscoveredCRDs, pm)125}126info.Targets = compscrape.BuildTargetStatuses(c.scrapeManager.TargetsActive())127return info128}129130// runInformers starts all the informers that are required to discover CRDs.131func (c *crdManager) runInformers(ctx context.Context) error {132config, err := c.args.Client.BuildRESTConfig(c.logger)133if err != nil {134return fmt.Errorf("creating rest config: %w", err)135}136137scheme := runtime.NewScheme()138for _, add := range []func(*runtime.Scheme) error{139promopv1.AddToScheme,140} {141if err := add(scheme); err != nil {142return fmt.Errorf("unable to register scheme: %w", err)143}144}145146ls, err := c.args.LabelSelector.BuildSelector()147if err != nil {148return fmt.Errorf("building label selector: %w", err)149}150for _, ns := range c.args.Namespaces {151opts := cache.Options{152Scheme: scheme,153Namespace: ns,154}155156if ls != labels.Nothing() {157opts.DefaultSelector.Label = ls158}159cache, err := cache.New(config, opts)160if err != nil {161return err162}163164informers := cache165166go func() {167err := informers.Start(ctx)168// If the context was canceled, we don't want to log an error.169if err != nil && ctx.Err() == nil {170level.Error(c.logger).Log("msg", "failed to start informers", "err", err)171}172}()173if !informers.WaitForCacheSync(ctx) {174return fmt.Errorf("informer caches failed to sync")175}176if err := c.configureInformers(ctx, informers); err != nil {177return fmt.Errorf("failed to configure informers: %w", err)178}179}180181return nil182}183184// configureInformers configures the informers for the CRDManager to watch for crd changes.185func (c *crdManager) configureInformers(ctx context.Context, informers cache.Informers) error {186var prototype client.Object187switch c.kind {188case KindPodMonitor:189prototype = &promopv1.PodMonitor{}190case KindServiceMonitor:191prototype = &promopv1.ServiceMonitor{}192default:193return fmt.Errorf("unknown kind to configure Informers: %s", c.kind)194}195196informerCtx, cancel := context.WithTimeout(ctx, informerSyncTimeout)197defer cancel()198199informer, err := informers.GetInformer(informerCtx, prototype)200if err != nil {201if errors.Is(informerCtx.Err(), context.DeadlineExceeded) { // Check the context to prevent GetInformer returning a fake timeout202return fmt.Errorf("timeout exceeded while configuring informers. Check the connection"+203" to the Kubernetes API is stable and that the Agent has appropriate RBAC permissions for %v", prototype)204}205206return err207}208switch c.kind {209case KindPodMonitor:210_, err = informer.AddEventHandler((toolscache.ResourceEventHandlerFuncs{211AddFunc: c.onAddPodMonitor,212UpdateFunc: c.onUpdatePodMonitor,213DeleteFunc: c.onDeletePodMonitor,214}))215case KindServiceMonitor:216_, err = informer.AddEventHandler((toolscache.ResourceEventHandlerFuncs{217AddFunc: c.onAddServiceMonitor,218UpdateFunc: c.onUpdateServiceMonitor,219DeleteFunc: c.onDeleteServiceMonitor,220}))221default:222return fmt.Errorf("unknown kind to configure Informers: %s", c.kind)223}224225if err != nil {226return err227}228return nil229}230231// apply applies the current state of the Manager to the Prometheus discovery manager and scrape manager.232func (c *crdManager) apply() error {233c.mut.Lock()234defer c.mut.Unlock()235err := c.discoveryManager.ApplyConfig(c.discoveryConfigs)236if err != nil {237level.Error(c.logger).Log("msg", "error applying discovery configs", "err", err)238return err239}240scs := []*config.ScrapeConfig{}241for _, sc := range c.scrapeConfigs {242scs = append(scs, sc)243}244err = c.scrapeManager.ApplyConfig(&config.Config{245ScrapeConfigs: scs,246})247if err != nil {248level.Error(c.logger).Log("msg", "error applying scrape configs", "err", err)249return err250}251level.Debug(c.logger).Log("msg", "scrape config was updated")252return nil253}254255func (c *crdManager) addDebugInfo(ns string, name string, err error) {256c.mut.Lock()257defer c.mut.Unlock()258debug := &operator.DiscoveredResource{}259debug.Namespace = ns260debug.Name = name261debug.LastReconcile = time.Now()262if err != nil {263debug.ReconcileError = err.Error()264} else {265debug.ReconcileError = ""266}267prefix := fmt.Sprintf("%s/%s/%s", c.kind, ns, name)268c.debugInfo[prefix] = debug269}270271func (c *crdManager) addPodMonitor(pm *promopv1.PodMonitor) {272var err error273for i, ep := range pm.Spec.PodMetricsEndpoints {274var pmc *config.ScrapeConfig275pmc, err = c.configGen.GeneratePodMonitorConfig(pm, ep, i)276if err != nil {277// TODO(jcreixell): Generate Kubernetes event to inform of this error when running `kubectl get <podmonitor>`.278level.Error(c.logger).Log("name", pm.Name, "err", err, "msg", "error generating scrapeconfig from podmonitor")279break280}281c.mut.Lock()282c.discoveryConfigs[pmc.JobName] = pmc.ServiceDiscoveryConfigs283c.scrapeConfigs[pmc.JobName] = pmc284c.mut.Unlock()285}286if err != nil {287c.addDebugInfo(pm.Namespace, pm.Name, err)288return289}290if err = c.apply(); err != nil {291level.Error(c.logger).Log("name", pm.Name, "err", err, "msg", "error applying scrape configs from "+c.kind)292}293c.addDebugInfo(pm.Namespace, pm.Name, err)294}295296func (c *crdManager) onAddPodMonitor(obj interface{}) {297pm := obj.(*promopv1.PodMonitor)298level.Info(c.logger).Log("msg", "found pod monitor", "name", pm.Name)299c.addPodMonitor(pm)300}301func (c *crdManager) onUpdatePodMonitor(oldObj, newObj interface{}) {302pm := oldObj.(*promopv1.PodMonitor)303c.clearConfigs("podMonitor", pm.Namespace, pm.Name)304c.addPodMonitor(newObj.(*promopv1.PodMonitor))305}306307func (c *crdManager) onDeletePodMonitor(obj interface{}) {308pm := obj.(*promopv1.PodMonitor)309c.clearConfigs("podMonitor", pm.Namespace, pm.Name)310if err := c.apply(); err != nil {311level.Error(c.logger).Log("name", pm.Name, "err", err, "msg", "error applying scrape configs after deleting "+c.kind)312}313}314315func (c *crdManager) addServiceMonitor(sm *promopv1.ServiceMonitor) {316var err error317for i, ep := range sm.Spec.Endpoints {318var pmc *config.ScrapeConfig319pmc, err = c.configGen.GenerateServiceMonitorConfig(sm, ep, i)320if err != nil {321// TODO(jcreixell): Generate Kubernetes event to inform of this error when running `kubectl get <servicemonitor>`.322level.Error(c.logger).Log("name", sm.Name, "err", err, "msg", "error generating scrapeconfig from serviceMonitor")323break324}325c.mut.Lock()326c.discoveryConfigs[pmc.JobName] = pmc.ServiceDiscoveryConfigs327c.scrapeConfigs[pmc.JobName] = pmc328c.mut.Unlock()329}330if err != nil {331c.addDebugInfo(sm.Namespace, sm.Name, err)332return333}334if err = c.apply(); err != nil {335level.Error(c.logger).Log("name", sm.Name, "err", err, "msg", "error applying scrape configs from "+c.kind)336}337c.addDebugInfo(sm.Namespace, sm.Name, err)338}339340func (c *crdManager) onAddServiceMonitor(obj interface{}) {341pm := obj.(*promopv1.ServiceMonitor)342level.Info(c.logger).Log("msg", "found pod monitor", "name", pm.Name)343c.addServiceMonitor(pm)344}345func (c *crdManager) onUpdateServiceMonitor(oldObj, newObj interface{}) {346pm := oldObj.(*promopv1.ServiceMonitor)347c.clearConfigs("serviceMonitor", pm.Namespace, pm.Name)348c.addServiceMonitor(newObj.(*promopv1.ServiceMonitor))349}350351func (c *crdManager) onDeleteServiceMonitor(obj interface{}) {352pm := obj.(*promopv1.ServiceMonitor)353c.clearConfigs("serviceMonitor", pm.Namespace, pm.Name)354if err := c.apply(); err != nil {355level.Error(c.logger).Log("name", pm.Name, "err", err, "msg", "error applying scrape configs after deleting "+c.kind)356}357}358359func (c *crdManager) clearConfigs(kind, ns, name string) {360c.mut.Lock()361defer c.mut.Unlock()362prefix := fmt.Sprintf("%s/%s/%s", kind, ns, name)363for k := range c.discoveryConfigs {364if strings.HasPrefix(k, prefix) {365delete(c.discoveryConfigs, k)366delete(c.scrapeConfigs, k)367}368}369delete(c.debugInfo, prefix)370}371372373