Path: blob/main/component/mimir/rules/kubernetes/events.go
4096 views
package rules12import (3"context"4"fmt"5"regexp"6"time"78"github.com/go-kit/log"9"github.com/go-kit/log/level"10"github.com/hashicorp/go-multierror"11promv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"12"github.com/prometheus/prometheus/model/rulefmt"13"k8s.io/client-go/tools/cache"14"k8s.io/client-go/util/workqueue"15"sigs.k8s.io/yaml" // Used for CRD compatibility instead of gopkg.in/yaml.v216)1718// This type must be hashable, so it is kept simple. The indexer will maintain a19// cache of current state, so this is mostly used for logging.20type event struct {21typ eventType22objectKey string23}2425type eventType string2627const (28eventTypeResourceChanged eventType = "resource-changed"29eventTypeSyncMimir eventType = "sync-mimir"30)3132type queuedEventHandler struct {33log log.Logger34queue workqueue.RateLimitingInterface35}3637func newQueuedEventHandler(log log.Logger, queue workqueue.RateLimitingInterface) *queuedEventHandler {38return &queuedEventHandler{39log: log,40queue: queue,41}42}4344// OnAdd implements the cache.ResourceEventHandler interface.45func (c *queuedEventHandler) OnAdd(obj interface{}) {46c.publishEvent(obj)47}4849// OnUpdate implements the cache.ResourceEventHandler interface.50func (c *queuedEventHandler) OnUpdate(oldObj, newObj interface{}) {51c.publishEvent(newObj)52}5354// OnDelete implements the cache.ResourceEventHandler interface.55func (c *queuedEventHandler) OnDelete(obj interface{}) {56c.publishEvent(obj)57}5859func (c *queuedEventHandler) publishEvent(obj interface{}) {60key, err := cache.MetaNamespaceKeyFunc(obj)61if err != nil {62level.Error(c.log).Log("msg", "failed to get key for object", "err", err)63return64}6566c.queue.AddRateLimited(event{67typ: eventTypeResourceChanged,68objectKey: key,69})70}7172func (c *Component) eventLoop(ctx context.Context) {73for {74eventInterface, shutdown := c.queue.Get()75if shutdown {76level.Info(c.log).Log("msg", "shutting down event loop")77return78}7980evt := eventInterface.(event)81c.metrics.eventsTotal.WithLabelValues(string(evt.typ)).Inc()82err := c.processEvent(ctx, evt)8384if err != nil {85retries := c.queue.NumRequeues(evt)86if retries < 5 {87c.metrics.eventsRetried.WithLabelValues(string(evt.typ)).Inc()88c.queue.AddRateLimited(evt)89level.Error(c.log).Log(90"msg", "failed to process event, will retry",91"retries", fmt.Sprintf("%d/5", retries),92"err", err,93)94continue95} else {96c.metrics.eventsFailed.WithLabelValues(string(evt.typ)).Inc()97level.Error(c.log).Log(98"msg", "failed to process event, max retries exceeded",99"retries", fmt.Sprintf("%d/5", retries),100"err", err,101)102c.reportUnhealthy(err)103}104} else {105c.reportHealthy()106}107108c.queue.Forget(evt)109}110}111112func (c *Component) processEvent(ctx context.Context, e event) error {113defer c.queue.Done(e)114115switch e.typ {116case eventTypeResourceChanged:117level.Info(c.log).Log("msg", "processing event", "type", e.typ, "key", e.objectKey)118case eventTypeSyncMimir:119level.Debug(c.log).Log("msg", "syncing current state from ruler")120err := c.syncMimir(ctx)121if err != nil {122return err123}124default:125return fmt.Errorf("unknown event type: %s", e.typ)126}127128return c.reconcileState(ctx)129}130131func (c *Component) syncMimir(ctx context.Context) error {132rulesByNamespace, err := c.mimirClient.ListRules(ctx, "")133if err != nil {134level.Error(c.log).Log("msg", "failed to list rules from mimir", "err", err)135return err136}137138for ns := range rulesByNamespace {139if !isManagedMimirNamespace(c.args.MimirNameSpacePrefix, ns) {140delete(rulesByNamespace, ns)141}142}143144c.currentState = rulesByNamespace145146return nil147}148149func (c *Component) reconcileState(ctx context.Context) error {150ctx, cancel := context.WithTimeout(ctx, 5*time.Second)151defer cancel()152153desiredState, err := c.loadStateFromK8s()154if err != nil {155return err156}157158diffs := diffRuleState(desiredState, c.currentState)159var result error160for ns, diff := range diffs {161err = c.applyChanges(ctx, ns, diff)162if err != nil {163result = multierror.Append(result, err)164continue165}166}167168return result169}170171func (c *Component) loadStateFromK8s() (ruleGroupsByNamespace, error) {172matchedNamespaces, err := c.namespaceLister.List(c.namespaceSelector)173if err != nil {174return nil, fmt.Errorf("failed to list namespaces: %w", err)175}176177desiredState := make(ruleGroupsByNamespace)178for _, ns := range matchedNamespaces {179crdState, err := c.ruleLister.PrometheusRules(ns.Name).List(c.ruleSelector)180if err != nil {181return nil, fmt.Errorf("failed to list rules: %w", err)182}183184for _, pr := range crdState {185mimirNs := mimirNamespaceForRuleCRD(c.args.MimirNameSpacePrefix, pr)186187groups, err := convertCRDRuleGroupToRuleGroup(pr.Spec)188if err != nil {189return nil, fmt.Errorf("failed to convert rule group: %w", err)190}191192desiredState[mimirNs] = groups193}194}195196return desiredState, nil197}198199func convertCRDRuleGroupToRuleGroup(crd promv1.PrometheusRuleSpec) ([]rulefmt.RuleGroup, error) {200buf, err := yaml.Marshal(crd)201if err != nil {202return nil, err203}204205groups, errs := rulefmt.Parse(buf)206if len(errs) > 0 {207return nil, multierror.Append(nil, errs...)208}209210return groups.Groups, nil211}212213func (c *Component) applyChanges(ctx context.Context, namespace string, diffs []ruleGroupDiff) error {214if len(diffs) == 0 {215return nil216}217218for _, diff := range diffs {219switch diff.Kind {220case ruleGroupDiffKindAdd:221err := c.mimirClient.CreateRuleGroup(ctx, namespace, diff.Desired)222if err != nil {223return err224}225level.Info(c.log).Log("msg", "added rule group", "namespace", namespace, "group", diff.Desired.Name)226case ruleGroupDiffKindRemove:227err := c.mimirClient.DeleteRuleGroup(ctx, namespace, diff.Actual.Name)228if err != nil {229return err230}231level.Info(c.log).Log("msg", "removed rule group", "namespace", namespace, "group", diff.Actual.Name)232case ruleGroupDiffKindUpdate:233err := c.mimirClient.CreateRuleGroup(ctx, namespace, diff.Desired)234if err != nil {235return err236}237level.Info(c.log).Log("msg", "updated rule group", "namespace", namespace, "group", diff.Desired.Name)238default:239level.Error(c.log).Log("msg", "unknown rule group diff kind", "kind", diff.Kind)240}241}242243// resync mimir state after applying changes244return c.syncMimir(ctx)245}246247// mimirNamespaceForRuleCRD returns the namespace that the rule CRD should be248// stored in mimir. This function, along with isManagedNamespace, is used to249// determine if a rule CRD is managed by the agent.250func mimirNamespaceForRuleCRD(prefix string, pr *promv1.PrometheusRule) string {251return fmt.Sprintf("%s/%s/%s/%s", prefix, pr.Namespace, pr.Name, pr.UID)252}253254// isManagedMimirNamespace returns true if the namespace is managed by the agent.255// Unmanaged namespaces are left as is by the operator.256func isManagedMimirNamespace(prefix, namespace string) bool {257prefixPart := regexp.QuoteMeta(prefix)258namespacePart := `.+`259namePart := `.+`260uuidPart := `[0-9a-fA-F]{8}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{12}`261managedNamespaceRegex := regexp.MustCompile(262fmt.Sprintf("^%s/%s/%s/%s$", prefixPart, namespacePart, namePart, uuidPart),263)264return managedNamespaceRegex.MatchString(namespace)265}266267268