Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/mimir/rules/kubernetes/events.go
4096 views
1
package rules
2
3
import (
4
"context"
5
"fmt"
6
"regexp"
7
"time"
8
9
"github.com/go-kit/log"
10
"github.com/go-kit/log/level"
11
"github.com/hashicorp/go-multierror"
12
promv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
13
"github.com/prometheus/prometheus/model/rulefmt"
14
"k8s.io/client-go/tools/cache"
15
"k8s.io/client-go/util/workqueue"
16
"sigs.k8s.io/yaml" // Used for CRD compatibility instead of gopkg.in/yaml.v2
17
)
18
19
// This type must be hashable, so it is kept simple. The indexer will maintain a
20
// cache of current state, so this is mostly used for logging.
21
type event struct {
22
typ eventType
23
objectKey string
24
}
25
26
type eventType string
27
28
const (
29
eventTypeResourceChanged eventType = "resource-changed"
30
eventTypeSyncMimir eventType = "sync-mimir"
31
)
32
33
type queuedEventHandler struct {
34
log log.Logger
35
queue workqueue.RateLimitingInterface
36
}
37
38
func newQueuedEventHandler(log log.Logger, queue workqueue.RateLimitingInterface) *queuedEventHandler {
39
return &queuedEventHandler{
40
log: log,
41
queue: queue,
42
}
43
}
44
45
// OnAdd implements the cache.ResourceEventHandler interface.
46
func (c *queuedEventHandler) OnAdd(obj interface{}) {
47
c.publishEvent(obj)
48
}
49
50
// OnUpdate implements the cache.ResourceEventHandler interface.
51
func (c *queuedEventHandler) OnUpdate(oldObj, newObj interface{}) {
52
c.publishEvent(newObj)
53
}
54
55
// OnDelete implements the cache.ResourceEventHandler interface.
56
func (c *queuedEventHandler) OnDelete(obj interface{}) {
57
c.publishEvent(obj)
58
}
59
60
func (c *queuedEventHandler) publishEvent(obj interface{}) {
61
key, err := cache.MetaNamespaceKeyFunc(obj)
62
if err != nil {
63
level.Error(c.log).Log("msg", "failed to get key for object", "err", err)
64
return
65
}
66
67
c.queue.AddRateLimited(event{
68
typ: eventTypeResourceChanged,
69
objectKey: key,
70
})
71
}
72
73
func (c *Component) eventLoop(ctx context.Context) {
74
for {
75
eventInterface, shutdown := c.queue.Get()
76
if shutdown {
77
level.Info(c.log).Log("msg", "shutting down event loop")
78
return
79
}
80
81
evt := eventInterface.(event)
82
c.metrics.eventsTotal.WithLabelValues(string(evt.typ)).Inc()
83
err := c.processEvent(ctx, evt)
84
85
if err != nil {
86
retries := c.queue.NumRequeues(evt)
87
if retries < 5 {
88
c.metrics.eventsRetried.WithLabelValues(string(evt.typ)).Inc()
89
c.queue.AddRateLimited(evt)
90
level.Error(c.log).Log(
91
"msg", "failed to process event, will retry",
92
"retries", fmt.Sprintf("%d/5", retries),
93
"err", err,
94
)
95
continue
96
} else {
97
c.metrics.eventsFailed.WithLabelValues(string(evt.typ)).Inc()
98
level.Error(c.log).Log(
99
"msg", "failed to process event, max retries exceeded",
100
"retries", fmt.Sprintf("%d/5", retries),
101
"err", err,
102
)
103
c.reportUnhealthy(err)
104
}
105
} else {
106
c.reportHealthy()
107
}
108
109
c.queue.Forget(evt)
110
}
111
}
112
113
func (c *Component) processEvent(ctx context.Context, e event) error {
114
defer c.queue.Done(e)
115
116
switch e.typ {
117
case eventTypeResourceChanged:
118
level.Info(c.log).Log("msg", "processing event", "type", e.typ, "key", e.objectKey)
119
case eventTypeSyncMimir:
120
level.Debug(c.log).Log("msg", "syncing current state from ruler")
121
err := c.syncMimir(ctx)
122
if err != nil {
123
return err
124
}
125
default:
126
return fmt.Errorf("unknown event type: %s", e.typ)
127
}
128
129
return c.reconcileState(ctx)
130
}
131
132
func (c *Component) syncMimir(ctx context.Context) error {
133
rulesByNamespace, err := c.mimirClient.ListRules(ctx, "")
134
if err != nil {
135
level.Error(c.log).Log("msg", "failed to list rules from mimir", "err", err)
136
return err
137
}
138
139
for ns := range rulesByNamespace {
140
if !isManagedMimirNamespace(c.args.MimirNameSpacePrefix, ns) {
141
delete(rulesByNamespace, ns)
142
}
143
}
144
145
c.currentState = rulesByNamespace
146
147
return nil
148
}
149
150
func (c *Component) reconcileState(ctx context.Context) error {
151
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
152
defer cancel()
153
154
desiredState, err := c.loadStateFromK8s()
155
if err != nil {
156
return err
157
}
158
159
diffs := diffRuleState(desiredState, c.currentState)
160
var result error
161
for ns, diff := range diffs {
162
err = c.applyChanges(ctx, ns, diff)
163
if err != nil {
164
result = multierror.Append(result, err)
165
continue
166
}
167
}
168
169
return result
170
}
171
172
func (c *Component) loadStateFromK8s() (ruleGroupsByNamespace, error) {
173
matchedNamespaces, err := c.namespaceLister.List(c.namespaceSelector)
174
if err != nil {
175
return nil, fmt.Errorf("failed to list namespaces: %w", err)
176
}
177
178
desiredState := make(ruleGroupsByNamespace)
179
for _, ns := range matchedNamespaces {
180
crdState, err := c.ruleLister.PrometheusRules(ns.Name).List(c.ruleSelector)
181
if err != nil {
182
return nil, fmt.Errorf("failed to list rules: %w", err)
183
}
184
185
for _, pr := range crdState {
186
mimirNs := mimirNamespaceForRuleCRD(c.args.MimirNameSpacePrefix, pr)
187
188
groups, err := convertCRDRuleGroupToRuleGroup(pr.Spec)
189
if err != nil {
190
return nil, fmt.Errorf("failed to convert rule group: %w", err)
191
}
192
193
desiredState[mimirNs] = groups
194
}
195
}
196
197
return desiredState, nil
198
}
199
200
func convertCRDRuleGroupToRuleGroup(crd promv1.PrometheusRuleSpec) ([]rulefmt.RuleGroup, error) {
201
buf, err := yaml.Marshal(crd)
202
if err != nil {
203
return nil, err
204
}
205
206
groups, errs := rulefmt.Parse(buf)
207
if len(errs) > 0 {
208
return nil, multierror.Append(nil, errs...)
209
}
210
211
return groups.Groups, nil
212
}
213
214
func (c *Component) applyChanges(ctx context.Context, namespace string, diffs []ruleGroupDiff) error {
215
if len(diffs) == 0 {
216
return nil
217
}
218
219
for _, diff := range diffs {
220
switch diff.Kind {
221
case ruleGroupDiffKindAdd:
222
err := c.mimirClient.CreateRuleGroup(ctx, namespace, diff.Desired)
223
if err != nil {
224
return err
225
}
226
level.Info(c.log).Log("msg", "added rule group", "namespace", namespace, "group", diff.Desired.Name)
227
case ruleGroupDiffKindRemove:
228
err := c.mimirClient.DeleteRuleGroup(ctx, namespace, diff.Actual.Name)
229
if err != nil {
230
return err
231
}
232
level.Info(c.log).Log("msg", "removed rule group", "namespace", namespace, "group", diff.Actual.Name)
233
case ruleGroupDiffKindUpdate:
234
err := c.mimirClient.CreateRuleGroup(ctx, namespace, diff.Desired)
235
if err != nil {
236
return err
237
}
238
level.Info(c.log).Log("msg", "updated rule group", "namespace", namespace, "group", diff.Desired.Name)
239
default:
240
level.Error(c.log).Log("msg", "unknown rule group diff kind", "kind", diff.Kind)
241
}
242
}
243
244
// resync mimir state after applying changes
245
return c.syncMimir(ctx)
246
}
247
248
// mimirNamespaceForRuleCRD returns the namespace that the rule CRD should be
249
// stored in mimir. This function, along with isManagedNamespace, is used to
250
// determine if a rule CRD is managed by the agent.
251
func mimirNamespaceForRuleCRD(prefix string, pr *promv1.PrometheusRule) string {
252
return fmt.Sprintf("%s/%s/%s/%s", prefix, pr.Namespace, pr.Name, pr.UID)
253
}
254
255
// isManagedMimirNamespace returns true if the namespace is managed by the agent.
256
// Unmanaged namespaces are left as is by the operator.
257
func isManagedMimirNamespace(prefix, namespace string) bool {
258
prefixPart := regexp.QuoteMeta(prefix)
259
namespacePart := `.+`
260
namePart := `.+`
261
uuidPart := `[0-9a-fA-F]{8}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{12}`
262
managedNamespaceRegex := regexp.MustCompile(
263
fmt.Sprintf("^%s/%s/%s/%s$", prefixPart, namespacePart, namePart, uuidPart),
264
)
265
return managedNamespaceRegex.MatchString(namespace)
266
}
267
268