Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/mimir/rules/kubernetes/rules.go
4096 views
1
package rules
2
3
import (
4
"context"
5
"fmt"
6
"sync"
7
"time"
8
9
"github.com/go-kit/log"
10
"github.com/go-kit/log/level"
11
"github.com/grafana/agent/component"
12
mimirClient "github.com/grafana/agent/pkg/mimir/client"
13
promListers "github.com/prometheus-operator/prometheus-operator/pkg/client/listers/monitoring/v1"
14
"github.com/prometheus/client_golang/prometheus"
15
"github.com/weaveworks/common/instrument"
16
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17
"k8s.io/apimachinery/pkg/labels"
18
"k8s.io/client-go/informers"
19
"k8s.io/client-go/kubernetes"
20
coreListers "k8s.io/client-go/listers/core/v1"
21
"k8s.io/client-go/tools/cache"
22
"k8s.io/client-go/util/workqueue"
23
_ "k8s.io/component-base/metrics/prometheus/workqueue"
24
controller "sigs.k8s.io/controller-runtime"
25
26
promExternalVersions "github.com/prometheus-operator/prometheus-operator/pkg/client/informers/externalversions"
27
promVersioned "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned"
28
)
29
30
func init() {
31
component.Register(component.Registration{
32
Name: "mimir.rules.kubernetes",
33
Args: Arguments{},
34
Exports: nil,
35
Build: func(o component.Options, c component.Arguments) (component.Component, error) {
36
return NewComponent(o, c.(Arguments))
37
},
38
})
39
}
40
41
type Component struct {
42
log log.Logger
43
opts component.Options
44
args Arguments
45
46
mimirClient mimirClient.Interface
47
k8sClient kubernetes.Interface
48
promClient promVersioned.Interface
49
ruleLister promListers.PrometheusRuleLister
50
ruleInformer cache.SharedIndexInformer
51
52
namespaceLister coreListers.NamespaceLister
53
namespaceInformer cache.SharedIndexInformer
54
informerStopChan chan struct{}
55
ticker *time.Ticker
56
57
queue workqueue.RateLimitingInterface
58
configUpdates chan ConfigUpdate
59
60
namespaceSelector labels.Selector
61
ruleSelector labels.Selector
62
63
currentState ruleGroupsByNamespace
64
65
metrics *metrics
66
healthMut sync.RWMutex
67
health component.Health
68
}
69
70
type metrics struct {
71
configUpdatesTotal prometheus.Counter
72
73
eventsTotal *prometheus.CounterVec
74
eventsFailed *prometheus.CounterVec
75
eventsRetried *prometheus.CounterVec
76
77
mimirClientTiming *prometheus.HistogramVec
78
}
79
80
func (m *metrics) Register(r prometheus.Registerer) error {
81
r.MustRegister(
82
m.configUpdatesTotal,
83
m.eventsTotal,
84
m.eventsFailed,
85
m.eventsRetried,
86
m.mimirClientTiming,
87
)
88
return nil
89
}
90
91
func newMetrics() *metrics {
92
return &metrics{
93
configUpdatesTotal: prometheus.NewCounter(prometheus.CounterOpts{
94
Subsystem: "mimir_rules",
95
Name: "config_updates_total",
96
Help: "Total number of times the configuration has been updated.",
97
}),
98
eventsTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
99
Subsystem: "mimir_rules",
100
Name: "events_total",
101
Help: "Total number of events processed, partitioned by event type.",
102
}, []string{"type"}),
103
eventsFailed: prometheus.NewCounterVec(prometheus.CounterOpts{
104
Subsystem: "mimir_rules",
105
Name: "events_failed_total",
106
Help: "Total number of events that failed to be processed, even after retries, partitioned by event type.",
107
}, []string{"type"}),
108
eventsRetried: prometheus.NewCounterVec(prometheus.CounterOpts{
109
Subsystem: "mimir_rules",
110
Name: "events_retried_total",
111
Help: "Total number of retries across all events, partitioned by event type.",
112
}, []string{"type"}),
113
mimirClientTiming: prometheus.NewHistogramVec(prometheus.HistogramOpts{
114
Subsystem: "mimir_rules",
115
Name: "mimir_client_request_duration_seconds",
116
Help: "Duration of requests to the Mimir API.",
117
Buckets: instrument.DefBuckets,
118
}, instrument.HistogramCollectorBuckets),
119
}
120
}
121
122
type ConfigUpdate struct {
123
args Arguments
124
err chan error
125
}
126
127
var _ component.Component = (*Component)(nil)
128
var _ component.DebugComponent = (*Component)(nil)
129
var _ component.HealthComponent = (*Component)(nil)
130
131
func NewComponent(o component.Options, args Arguments) (*Component, error) {
132
metrics := newMetrics()
133
err := metrics.Register(o.Registerer)
134
if err != nil {
135
return nil, fmt.Errorf("registering metrics failed: %w", err)
136
}
137
138
c := &Component{
139
log: o.Logger,
140
opts: o,
141
args: args,
142
configUpdates: make(chan ConfigUpdate),
143
ticker: time.NewTicker(args.SyncInterval),
144
metrics: metrics,
145
}
146
147
err = c.init()
148
if err != nil {
149
return nil, fmt.Errorf("initializing component failed: %w", err)
150
}
151
152
return c, nil
153
}
154
155
func (c *Component) Run(ctx context.Context) error {
156
err := c.startup(ctx)
157
if err != nil {
158
level.Error(c.log).Log("msg", "starting up component failed", "err", err)
159
c.reportUnhealthy(err)
160
}
161
162
for {
163
select {
164
case update := <-c.configUpdates:
165
c.metrics.configUpdatesTotal.Inc()
166
c.shutdown()
167
168
c.args = update.args
169
err := c.init()
170
if err != nil {
171
level.Error(c.log).Log("msg", "updating configuration failed", "err", err)
172
c.reportUnhealthy(err)
173
update.err <- err
174
continue
175
}
176
177
err = c.startup(ctx)
178
if err != nil {
179
level.Error(c.log).Log("msg", "updating configuration failed", "err", err)
180
c.reportUnhealthy(err)
181
update.err <- err
182
continue
183
}
184
185
update.err <- nil
186
case <-ctx.Done():
187
c.shutdown()
188
return nil
189
case <-c.ticker.C:
190
c.queue.Add(event{
191
typ: eventTypeSyncMimir,
192
})
193
}
194
}
195
}
196
197
// startup launches the informers and starts the event loop.
198
func (c *Component) startup(ctx context.Context) error {
199
c.queue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "mimir.rules.kubernetes")
200
c.informerStopChan = make(chan struct{})
201
202
if err := c.startNamespaceInformer(); err != nil {
203
return err
204
}
205
if err := c.startRuleInformer(); err != nil {
206
return err
207
}
208
err := c.syncMimir(ctx)
209
if err != nil {
210
return err
211
}
212
go c.eventLoop(ctx)
213
return nil
214
}
215
216
func (c *Component) shutdown() {
217
close(c.informerStopChan)
218
c.queue.ShutDownWithDrain()
219
}
220
221
func (c *Component) Update(newConfig component.Arguments) error {
222
errChan := make(chan error)
223
c.configUpdates <- ConfigUpdate{
224
args: newConfig.(Arguments),
225
err: errChan,
226
}
227
return <-errChan
228
}
229
230
func (c *Component) init() error {
231
level.Info(c.log).Log("msg", "initializing with new configuration")
232
233
// TODO: allow overriding some stuff in RestConfig and k8s client options?
234
restConfig, err := controller.GetConfig()
235
if err != nil {
236
return fmt.Errorf("failed to get k8s config: %w", err)
237
}
238
239
c.k8sClient, err = kubernetes.NewForConfig(restConfig)
240
if err != nil {
241
return fmt.Errorf("failed to create k8s client: %w", err)
242
}
243
244
c.promClient, err = promVersioned.NewForConfig(restConfig)
245
if err != nil {
246
return fmt.Errorf("failed to create prometheus operator client: %w", err)
247
}
248
249
httpClient := c.args.HTTPClientConfig.Convert()
250
251
c.mimirClient, err = mimirClient.New(c.log, mimirClient.Config{
252
ID: c.args.TenantID,
253
Address: c.args.Address,
254
UseLegacyRoutes: c.args.UseLegacyRoutes,
255
HTTPClientConfig: *httpClient,
256
}, c.metrics.mimirClientTiming)
257
if err != nil {
258
return err
259
}
260
261
c.ticker.Reset(c.args.SyncInterval)
262
263
c.namespaceSelector, err = convertSelectorToListOptions(c.args.RuleNamespaceSelector)
264
if err != nil {
265
return err
266
}
267
268
c.ruleSelector, err = convertSelectorToListOptions(c.args.RuleSelector)
269
if err != nil {
270
return err
271
}
272
273
return nil
274
}
275
276
func convertSelectorToListOptions(selector LabelSelector) (labels.Selector, error) {
277
matchExpressions := []metav1.LabelSelectorRequirement{}
278
279
for _, me := range selector.MatchExpressions {
280
matchExpressions = append(matchExpressions, metav1.LabelSelectorRequirement{
281
Key: me.Key,
282
Operator: metav1.LabelSelectorOperator(me.Operator),
283
Values: me.Values,
284
})
285
}
286
287
return metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
288
MatchLabels: selector.MatchLabels,
289
MatchExpressions: matchExpressions,
290
})
291
}
292
293
func (c *Component) startNamespaceInformer() error {
294
factory := informers.NewSharedInformerFactoryWithOptions(
295
c.k8sClient,
296
24*time.Hour,
297
informers.WithTweakListOptions(func(lo *metav1.ListOptions) {
298
lo.LabelSelector = c.namespaceSelector.String()
299
}),
300
)
301
302
namespaces := factory.Core().V1().Namespaces()
303
c.namespaceLister = namespaces.Lister()
304
c.namespaceInformer = namespaces.Informer()
305
_, err := c.namespaceInformer.AddEventHandler(newQueuedEventHandler(c.log, c.queue))
306
if err != nil {
307
return err
308
}
309
310
factory.Start(c.informerStopChan)
311
factory.WaitForCacheSync(c.informerStopChan)
312
return nil
313
}
314
315
func (c *Component) startRuleInformer() error {
316
factory := promExternalVersions.NewSharedInformerFactoryWithOptions(
317
c.promClient,
318
24*time.Hour,
319
promExternalVersions.WithTweakListOptions(func(lo *metav1.ListOptions) {
320
lo.LabelSelector = c.ruleSelector.String()
321
}),
322
)
323
324
promRules := factory.Monitoring().V1().PrometheusRules()
325
c.ruleLister = promRules.Lister()
326
c.ruleInformer = promRules.Informer()
327
_, err := c.ruleInformer.AddEventHandler(newQueuedEventHandler(c.log, c.queue))
328
if err != nil {
329
return err
330
}
331
332
factory.Start(c.informerStopChan)
333
factory.WaitForCacheSync(c.informerStopChan)
334
return nil
335
}
336
337