Path: blob/main/component/mimir/rules/kubernetes/rules.go
4096 views
package rules12import (3"context"4"fmt"5"sync"6"time"78"github.com/go-kit/log"9"github.com/go-kit/log/level"10"github.com/grafana/agent/component"11mimirClient "github.com/grafana/agent/pkg/mimir/client"12promListers "github.com/prometheus-operator/prometheus-operator/pkg/client/listers/monitoring/v1"13"github.com/prometheus/client_golang/prometheus"14"github.com/weaveworks/common/instrument"15metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"16"k8s.io/apimachinery/pkg/labels"17"k8s.io/client-go/informers"18"k8s.io/client-go/kubernetes"19coreListers "k8s.io/client-go/listers/core/v1"20"k8s.io/client-go/tools/cache"21"k8s.io/client-go/util/workqueue"22_ "k8s.io/component-base/metrics/prometheus/workqueue"23controller "sigs.k8s.io/controller-runtime"2425promExternalVersions "github.com/prometheus-operator/prometheus-operator/pkg/client/informers/externalversions"26promVersioned "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned"27)2829func init() {30component.Register(component.Registration{31Name: "mimir.rules.kubernetes",32Args: Arguments{},33Exports: nil,34Build: func(o component.Options, c component.Arguments) (component.Component, error) {35return NewComponent(o, c.(Arguments))36},37})38}3940type Component struct {41log log.Logger42opts component.Options43args Arguments4445mimirClient mimirClient.Interface46k8sClient kubernetes.Interface47promClient promVersioned.Interface48ruleLister promListers.PrometheusRuleLister49ruleInformer cache.SharedIndexInformer5051namespaceLister coreListers.NamespaceLister52namespaceInformer cache.SharedIndexInformer53informerStopChan chan struct{}54ticker *time.Ticker5556queue workqueue.RateLimitingInterface57configUpdates chan ConfigUpdate5859namespaceSelector labels.Selector60ruleSelector labels.Selector6162currentState ruleGroupsByNamespace6364metrics *metrics65healthMut sync.RWMutex66health component.Health67}6869type metrics struct {70configUpdatesTotal prometheus.Counter7172eventsTotal *prometheus.CounterVec73eventsFailed *prometheus.CounterVec74eventsRetried *prometheus.CounterVec7576mimirClientTiming *prometheus.HistogramVec77}7879func (m *metrics) Register(r prometheus.Registerer) error {80r.MustRegister(81m.configUpdatesTotal,82m.eventsTotal,83m.eventsFailed,84m.eventsRetried,85m.mimirClientTiming,86)87return nil88}8990func newMetrics() *metrics {91return &metrics{92configUpdatesTotal: prometheus.NewCounter(prometheus.CounterOpts{93Subsystem: "mimir_rules",94Name: "config_updates_total",95Help: "Total number of times the configuration has been updated.",96}),97eventsTotal: prometheus.NewCounterVec(prometheus.CounterOpts{98Subsystem: "mimir_rules",99Name: "events_total",100Help: "Total number of events processed, partitioned by event type.",101}, []string{"type"}),102eventsFailed: prometheus.NewCounterVec(prometheus.CounterOpts{103Subsystem: "mimir_rules",104Name: "events_failed_total",105Help: "Total number of events that failed to be processed, even after retries, partitioned by event type.",106}, []string{"type"}),107eventsRetried: prometheus.NewCounterVec(prometheus.CounterOpts{108Subsystem: "mimir_rules",109Name: "events_retried_total",110Help: "Total number of retries across all events, partitioned by event type.",111}, []string{"type"}),112mimirClientTiming: prometheus.NewHistogramVec(prometheus.HistogramOpts{113Subsystem: "mimir_rules",114Name: "mimir_client_request_duration_seconds",115Help: "Duration of requests to the Mimir API.",116Buckets: instrument.DefBuckets,117}, instrument.HistogramCollectorBuckets),118}119}120121type ConfigUpdate struct {122args Arguments123err chan error124}125126var _ component.Component = (*Component)(nil)127var _ component.DebugComponent = (*Component)(nil)128var _ component.HealthComponent = (*Component)(nil)129130func NewComponent(o component.Options, args Arguments) (*Component, error) {131metrics := newMetrics()132err := metrics.Register(o.Registerer)133if err != nil {134return nil, fmt.Errorf("registering metrics failed: %w", err)135}136137c := &Component{138log: o.Logger,139opts: o,140args: args,141configUpdates: make(chan ConfigUpdate),142ticker: time.NewTicker(args.SyncInterval),143metrics: metrics,144}145146err = c.init()147if err != nil {148return nil, fmt.Errorf("initializing component failed: %w", err)149}150151return c, nil152}153154func (c *Component) Run(ctx context.Context) error {155err := c.startup(ctx)156if err != nil {157level.Error(c.log).Log("msg", "starting up component failed", "err", err)158c.reportUnhealthy(err)159}160161for {162select {163case update := <-c.configUpdates:164c.metrics.configUpdatesTotal.Inc()165c.shutdown()166167c.args = update.args168err := c.init()169if err != nil {170level.Error(c.log).Log("msg", "updating configuration failed", "err", err)171c.reportUnhealthy(err)172update.err <- err173continue174}175176err = c.startup(ctx)177if err != nil {178level.Error(c.log).Log("msg", "updating configuration failed", "err", err)179c.reportUnhealthy(err)180update.err <- err181continue182}183184update.err <- nil185case <-ctx.Done():186c.shutdown()187return nil188case <-c.ticker.C:189c.queue.Add(event{190typ: eventTypeSyncMimir,191})192}193}194}195196// startup launches the informers and starts the event loop.197func (c *Component) startup(ctx context.Context) error {198c.queue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "mimir.rules.kubernetes")199c.informerStopChan = make(chan struct{})200201if err := c.startNamespaceInformer(); err != nil {202return err203}204if err := c.startRuleInformer(); err != nil {205return err206}207err := c.syncMimir(ctx)208if err != nil {209return err210}211go c.eventLoop(ctx)212return nil213}214215func (c *Component) shutdown() {216close(c.informerStopChan)217c.queue.ShutDownWithDrain()218}219220func (c *Component) Update(newConfig component.Arguments) error {221errChan := make(chan error)222c.configUpdates <- ConfigUpdate{223args: newConfig.(Arguments),224err: errChan,225}226return <-errChan227}228229func (c *Component) init() error {230level.Info(c.log).Log("msg", "initializing with new configuration")231232// TODO: allow overriding some stuff in RestConfig and k8s client options?233restConfig, err := controller.GetConfig()234if err != nil {235return fmt.Errorf("failed to get k8s config: %w", err)236}237238c.k8sClient, err = kubernetes.NewForConfig(restConfig)239if err != nil {240return fmt.Errorf("failed to create k8s client: %w", err)241}242243c.promClient, err = promVersioned.NewForConfig(restConfig)244if err != nil {245return fmt.Errorf("failed to create prometheus operator client: %w", err)246}247248httpClient := c.args.HTTPClientConfig.Convert()249250c.mimirClient, err = mimirClient.New(c.log, mimirClient.Config{251ID: c.args.TenantID,252Address: c.args.Address,253UseLegacyRoutes: c.args.UseLegacyRoutes,254HTTPClientConfig: *httpClient,255}, c.metrics.mimirClientTiming)256if err != nil {257return err258}259260c.ticker.Reset(c.args.SyncInterval)261262c.namespaceSelector, err = convertSelectorToListOptions(c.args.RuleNamespaceSelector)263if err != nil {264return err265}266267c.ruleSelector, err = convertSelectorToListOptions(c.args.RuleSelector)268if err != nil {269return err270}271272return nil273}274275func convertSelectorToListOptions(selector LabelSelector) (labels.Selector, error) {276matchExpressions := []metav1.LabelSelectorRequirement{}277278for _, me := range selector.MatchExpressions {279matchExpressions = append(matchExpressions, metav1.LabelSelectorRequirement{280Key: me.Key,281Operator: metav1.LabelSelectorOperator(me.Operator),282Values: me.Values,283})284}285286return metav1.LabelSelectorAsSelector(&metav1.LabelSelector{287MatchLabels: selector.MatchLabels,288MatchExpressions: matchExpressions,289})290}291292func (c *Component) startNamespaceInformer() error {293factory := informers.NewSharedInformerFactoryWithOptions(294c.k8sClient,29524*time.Hour,296informers.WithTweakListOptions(func(lo *metav1.ListOptions) {297lo.LabelSelector = c.namespaceSelector.String()298}),299)300301namespaces := factory.Core().V1().Namespaces()302c.namespaceLister = namespaces.Lister()303c.namespaceInformer = namespaces.Informer()304_, err := c.namespaceInformer.AddEventHandler(newQueuedEventHandler(c.log, c.queue))305if err != nil {306return err307}308309factory.Start(c.informerStopChan)310factory.WaitForCacheSync(c.informerStopChan)311return nil312}313314func (c *Component) startRuleInformer() error {315factory := promExternalVersions.NewSharedInformerFactoryWithOptions(316c.promClient,31724*time.Hour,318promExternalVersions.WithTweakListOptions(func(lo *metav1.ListOptions) {319lo.LabelSelector = c.ruleSelector.String()320}),321)322323promRules := factory.Monitoring().V1().PrometheusRules()324c.ruleLister = promRules.Lister()325c.ruleInformer = promRules.Informer()326_, err := c.ruleInformer.AddEventHandler(newQueuedEventHandler(c.log, c.queue))327if err != nil {328return err329}330331factory.Start(c.informerStopChan)332factory.WaitForCacheSync(c.informerStopChan)333return nil334}335336337