Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/mimir/rules/kubernetes/events_test.go
4096 views
1
package rules
2
3
import (
4
"context"
5
"os"
6
"sync"
7
"testing"
8
"time"
9
10
"github.com/go-kit/log"
11
mimirClient "github.com/grafana/agent/pkg/mimir/client"
12
v1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
13
promListers "github.com/prometheus-operator/prometheus-operator/pkg/client/listers/monitoring/v1"
14
"github.com/prometheus/prometheus/model/rulefmt"
15
"github.com/stretchr/testify/require"
16
corev1 "k8s.io/api/core/v1"
17
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18
"k8s.io/apimachinery/pkg/labels"
19
"k8s.io/apimachinery/pkg/types"
20
"k8s.io/apimachinery/pkg/util/intstr"
21
coreListers "k8s.io/client-go/listers/core/v1"
22
"k8s.io/client-go/tools/cache"
23
"k8s.io/client-go/util/workqueue"
24
)
25
26
type fakeMimirClient struct {
27
rulesMut sync.RWMutex
28
rules map[string][]rulefmt.RuleGroup
29
}
30
31
var _ mimirClient.Interface = &fakeMimirClient{}
32
33
func newFakeMimirClient() *fakeMimirClient {
34
return &fakeMimirClient{
35
rules: make(map[string][]rulefmt.RuleGroup),
36
}
37
}
38
39
func (m *fakeMimirClient) CreateRuleGroup(ctx context.Context, namespace string, rule rulefmt.RuleGroup) error {
40
m.rulesMut.Lock()
41
defer m.rulesMut.Unlock()
42
m.deleteLocked(namespace, rule.Name)
43
m.rules[namespace] = append(m.rules[namespace], rule)
44
return nil
45
}
46
47
func (m *fakeMimirClient) DeleteRuleGroup(ctx context.Context, namespace, group string) error {
48
m.rulesMut.Lock()
49
defer m.rulesMut.Unlock()
50
m.deleteLocked(namespace, group)
51
return nil
52
}
53
54
func (m *fakeMimirClient) deleteLocked(namespace, group string) {
55
for ns, v := range m.rules {
56
if namespace != "" && namespace != ns {
57
continue
58
}
59
for i, g := range v {
60
if g.Name == group {
61
m.rules[ns] = append(m.rules[ns][:i], m.rules[ns][i+1:]...)
62
63
if len(m.rules[ns]) == 0 {
64
delete(m.rules, ns)
65
}
66
67
return
68
}
69
}
70
}
71
}
72
73
func (m *fakeMimirClient) ListRules(ctx context.Context, namespace string) (map[string][]rulefmt.RuleGroup, error) {
74
m.rulesMut.RLock()
75
defer m.rulesMut.RUnlock()
76
output := make(map[string][]rulefmt.RuleGroup)
77
for ns, v := range m.rules {
78
if namespace != "" && namespace != ns {
79
continue
80
}
81
output[ns] = v
82
}
83
return output, nil
84
}
85
86
func TestEventLoop(t *testing.T) {
87
nsIndexer := cache.NewIndexer(
88
cache.DeletionHandlingMetaNamespaceKeyFunc,
89
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
90
)
91
nsLister := coreListers.NewNamespaceLister(nsIndexer)
92
93
ruleIndexer := cache.NewIndexer(
94
cache.DeletionHandlingMetaNamespaceKeyFunc,
95
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
96
)
97
ruleLister := promListers.NewPrometheusRuleLister(ruleIndexer)
98
99
ns := &corev1.Namespace{
100
ObjectMeta: metav1.ObjectMeta{
101
Name: "namespace",
102
UID: types.UID("33f8860c-bd06-4c0d-a0b1-a114d6b9937b"),
103
},
104
}
105
106
rule := &v1.PrometheusRule{
107
ObjectMeta: metav1.ObjectMeta{
108
Name: "name",
109
Namespace: "namespace",
110
UID: types.UID("64aab764-c95e-4ee9-a932-cd63ba57e6cf"),
111
},
112
Spec: v1.PrometheusRuleSpec{
113
Groups: []v1.RuleGroup{
114
{
115
Name: "group",
116
Rules: []v1.Rule{
117
{
118
Alert: "alert",
119
Expr: intstr.FromString("expr"),
120
},
121
},
122
},
123
},
124
},
125
}
126
127
component := Component{
128
log: log.NewLogfmtLogger(os.Stdout),
129
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
130
namespaceLister: nsLister,
131
namespaceSelector: labels.Everything(),
132
ruleLister: ruleLister,
133
ruleSelector: labels.Everything(),
134
mimirClient: newFakeMimirClient(),
135
args: Arguments{MimirNameSpacePrefix: "agent"},
136
metrics: newMetrics(),
137
}
138
eventHandler := newQueuedEventHandler(component.log, component.queue)
139
140
ctx, cancel := context.WithCancel(context.Background())
141
defer cancel()
142
143
go component.eventLoop(ctx)
144
145
// Add a namespace and rule to kubernetes
146
nsIndexer.Add(ns)
147
ruleIndexer.Add(rule)
148
eventHandler.OnAdd(rule)
149
150
// Wait for the rule to be added to mimir
151
require.Eventually(t, func() bool {
152
rules, err := component.mimirClient.ListRules(ctx, "")
153
require.NoError(t, err)
154
return len(rules) == 1
155
}, time.Second, 10*time.Millisecond)
156
component.queue.AddRateLimited(event{typ: eventTypeSyncMimir})
157
158
// Update the rule in kubernetes
159
rule.Spec.Groups[0].Rules = append(rule.Spec.Groups[0].Rules, v1.Rule{
160
Alert: "alert2",
161
Expr: intstr.FromString("expr2"),
162
})
163
ruleIndexer.Update(rule)
164
eventHandler.OnUpdate(rule, rule)
165
166
// Wait for the rule to be updated in mimir
167
require.Eventually(t, func() bool {
168
allRules, err := component.mimirClient.ListRules(ctx, "")
169
require.NoError(t, err)
170
rules := allRules[mimirNamespaceForRuleCRD("agent", rule)][0].Rules
171
return len(rules) == 2
172
}, time.Second, 10*time.Millisecond)
173
component.queue.AddRateLimited(event{typ: eventTypeSyncMimir})
174
175
// Remove the rule from kubernetes
176
ruleIndexer.Delete(rule)
177
eventHandler.OnDelete(rule)
178
179
// Wait for the rule to be removed from mimir
180
require.Eventually(t, func() bool {
181
rules, err := component.mimirClient.ListRules(ctx, "")
182
require.NoError(t, err)
183
return len(rules) == 0
184
}, time.Second, 10*time.Millisecond)
185
}
186
187