Path: blob/main/component/mimir/rules/kubernetes/events_test.go
4096 views
package rules12import (3"context"4"os"5"sync"6"testing"7"time"89"github.com/go-kit/log"10mimirClient "github.com/grafana/agent/pkg/mimir/client"11v1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"12promListers "github.com/prometheus-operator/prometheus-operator/pkg/client/listers/monitoring/v1"13"github.com/prometheus/prometheus/model/rulefmt"14"github.com/stretchr/testify/require"15corev1 "k8s.io/api/core/v1"16metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"17"k8s.io/apimachinery/pkg/labels"18"k8s.io/apimachinery/pkg/types"19"k8s.io/apimachinery/pkg/util/intstr"20coreListers "k8s.io/client-go/listers/core/v1"21"k8s.io/client-go/tools/cache"22"k8s.io/client-go/util/workqueue"23)2425type fakeMimirClient struct {26rulesMut sync.RWMutex27rules map[string][]rulefmt.RuleGroup28}2930var _ mimirClient.Interface = &fakeMimirClient{}3132func newFakeMimirClient() *fakeMimirClient {33return &fakeMimirClient{34rules: make(map[string][]rulefmt.RuleGroup),35}36}3738func (m *fakeMimirClient) CreateRuleGroup(ctx context.Context, namespace string, rule rulefmt.RuleGroup) error {39m.rulesMut.Lock()40defer m.rulesMut.Unlock()41m.deleteLocked(namespace, rule.Name)42m.rules[namespace] = append(m.rules[namespace], rule)43return nil44}4546func (m *fakeMimirClient) DeleteRuleGroup(ctx context.Context, namespace, group string) error {47m.rulesMut.Lock()48defer m.rulesMut.Unlock()49m.deleteLocked(namespace, group)50return nil51}5253func (m *fakeMimirClient) deleteLocked(namespace, group string) {54for ns, v := range m.rules {55if namespace != "" && namespace != ns {56continue57}58for i, g := range v {59if g.Name == group {60m.rules[ns] = append(m.rules[ns][:i], m.rules[ns][i+1:]...)6162if len(m.rules[ns]) == 0 {63delete(m.rules, ns)64}6566return67}68}69}70}7172func (m *fakeMimirClient) ListRules(ctx context.Context, namespace string) (map[string][]rulefmt.RuleGroup, error) {73m.rulesMut.RLock()74defer m.rulesMut.RUnlock()75output := make(map[string][]rulefmt.RuleGroup)76for ns, v := range m.rules {77if namespace != "" && namespace != ns {78continue79}80output[ns] = v81}82return output, nil83}8485func TestEventLoop(t *testing.T) {86nsIndexer := cache.NewIndexer(87cache.DeletionHandlingMetaNamespaceKeyFunc,88cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},89)90nsLister := coreListers.NewNamespaceLister(nsIndexer)9192ruleIndexer := cache.NewIndexer(93cache.DeletionHandlingMetaNamespaceKeyFunc,94cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},95)96ruleLister := promListers.NewPrometheusRuleLister(ruleIndexer)9798ns := &corev1.Namespace{99ObjectMeta: metav1.ObjectMeta{100Name: "namespace",101UID: types.UID("33f8860c-bd06-4c0d-a0b1-a114d6b9937b"),102},103}104105rule := &v1.PrometheusRule{106ObjectMeta: metav1.ObjectMeta{107Name: "name",108Namespace: "namespace",109UID: types.UID("64aab764-c95e-4ee9-a932-cd63ba57e6cf"),110},111Spec: v1.PrometheusRuleSpec{112Groups: []v1.RuleGroup{113{114Name: "group",115Rules: []v1.Rule{116{117Alert: "alert",118Expr: intstr.FromString("expr"),119},120},121},122},123},124}125126component := Component{127log: log.NewLogfmtLogger(os.Stdout),128queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),129namespaceLister: nsLister,130namespaceSelector: labels.Everything(),131ruleLister: ruleLister,132ruleSelector: labels.Everything(),133mimirClient: newFakeMimirClient(),134args: Arguments{MimirNameSpacePrefix: "agent"},135metrics: newMetrics(),136}137eventHandler := newQueuedEventHandler(component.log, component.queue)138139ctx, cancel := context.WithCancel(context.Background())140defer cancel()141142go component.eventLoop(ctx)143144// Add a namespace and rule to kubernetes145nsIndexer.Add(ns)146ruleIndexer.Add(rule)147eventHandler.OnAdd(rule)148149// Wait for the rule to be added to mimir150require.Eventually(t, func() bool {151rules, err := component.mimirClient.ListRules(ctx, "")152require.NoError(t, err)153return len(rules) == 1154}, time.Second, 10*time.Millisecond)155component.queue.AddRateLimited(event{typ: eventTypeSyncMimir})156157// Update the rule in kubernetes158rule.Spec.Groups[0].Rules = append(rule.Spec.Groups[0].Rules, v1.Rule{159Alert: "alert2",160Expr: intstr.FromString("expr2"),161})162ruleIndexer.Update(rule)163eventHandler.OnUpdate(rule, rule)164165// Wait for the rule to be updated in mimir166require.Eventually(t, func() bool {167allRules, err := component.mimirClient.ListRules(ctx, "")168require.NoError(t, err)169rules := allRules[mimirNamespaceForRuleCRD("agent", rule)][0].Rules170return len(rules) == 2171}, time.Second, 10*time.Millisecond)172component.queue.AddRateLimited(event{typ: eventTypeSyncMimir})173174// Remove the rule from kubernetes175ruleIndexer.Delete(rule)176eventHandler.OnDelete(rule)177178// Wait for the rule to be removed from mimir179require.Eventually(t, func() bool {180rules, err := component.mimirClient.ListRules(ctx, "")181require.NoError(t, err)182return len(rules) == 0183}, time.Second, 10*time.Millisecond)184}185186187