Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/prometheus/operator/common/crdmanager.go
5330 views
1
package common
2
3
import (
4
"context"
5
"errors"
6
"fmt"
7
"strings"
8
"sync"
9
"time"
10
11
"github.com/go-kit/log"
12
"github.com/go-kit/log/level"
13
"github.com/grafana/agent/component"
14
"github.com/grafana/agent/component/prometheus"
15
"github.com/prometheus/prometheus/config"
16
"github.com/prometheus/prometheus/discovery"
17
"github.com/prometheus/prometheus/discovery/targetgroup"
18
"github.com/prometheus/prometheus/scrape"
19
toolscache "k8s.io/client-go/tools/cache"
20
"sigs.k8s.io/controller-runtime/pkg/cache"
21
"sigs.k8s.io/controller-runtime/pkg/client"
22
23
"github.com/grafana/agent/component/prometheus/operator"
24
"github.com/grafana/agent/component/prometheus/operator/configgen"
25
compscrape "github.com/grafana/agent/component/prometheus/scrape"
26
promopv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
27
"k8s.io/apimachinery/pkg/labels"
28
"k8s.io/apimachinery/pkg/runtime"
29
)
30
31
// Generous timeout period for configuring all informers
32
const informerSyncTimeout = 10 * time.Second
33
34
// crdManager is all of the fields required to run a crd based component.
35
// on update, this entire thing should be recreated and restarted
36
type crdManager struct {
37
mut sync.Mutex
38
discoveryConfigs map[string]discovery.Configs
39
scrapeConfigs map[string]*config.ScrapeConfig
40
debugInfo map[string]*operator.DiscoveredResource
41
discoveryManager *discovery.Manager
42
scrapeManager *scrape.Manager
43
44
opts component.Options
45
logger log.Logger
46
args *operator.Arguments
47
configGen configgen.ConfigGenerator
48
49
kind string
50
}
51
52
const (
53
KindPodMonitor string = "podMonitor"
54
KindServiceMonitor string = "serviceMonitor"
55
)
56
57
func newCrdManager(opts component.Options, logger log.Logger, args *operator.Arguments, kind string) *crdManager {
58
switch kind {
59
case KindPodMonitor, KindServiceMonitor:
60
default:
61
panic(fmt.Sprintf("Unknown kind for crdManager: %s", kind))
62
}
63
return &crdManager{
64
opts: opts,
65
logger: logger,
66
args: args,
67
discoveryConfigs: map[string]discovery.Configs{},
68
scrapeConfigs: map[string]*config.ScrapeConfig{},
69
debugInfo: map[string]*operator.DiscoveredResource{},
70
kind: kind,
71
}
72
}
73
74
func (c *crdManager) Run(ctx context.Context) error {
75
c.configGen = configgen.ConfigGenerator{
76
Client: &c.args.Client,
77
}
78
79
// Start prometheus service discovery manager
80
c.discoveryManager = discovery.NewManager(ctx, c.logger, discovery.Name(c.opts.ID))
81
go func() {
82
err := c.discoveryManager.Run()
83
if err != nil {
84
level.Error(c.logger).Log("msg", "discovery manager stopped", "err", err)
85
}
86
}()
87
88
if err := c.runInformers(ctx); err != nil {
89
return err
90
}
91
level.Info(c.logger).Log("msg", "informers started")
92
93
// Start prometheus scrape manager.
94
flowAppendable := prometheus.NewFanout(c.args.ForwardTo, c.opts.ID, c.opts.Registerer)
95
opts := &scrape.Options{}
96
c.scrapeManager = scrape.NewManager(opts, c.logger, flowAppendable)
97
defer c.scrapeManager.Stop()
98
targetSetsChan := make(chan map[string][]*targetgroup.Group)
99
go func() {
100
err := c.scrapeManager.Run(targetSetsChan)
101
level.Info(c.logger).Log("msg", "scrape manager stopped")
102
if err != nil {
103
level.Error(c.logger).Log("msg", "scrape manager failed", "err", err)
104
}
105
}()
106
107
// Start the target discovery loop to update the scrape manager with new targets.
108
for {
109
select {
110
case <-ctx.Done():
111
return nil
112
case m := <-c.discoveryManager.SyncCh():
113
targetSetsChan <- m
114
}
115
}
116
}
117
118
// DebugInfo returns debug information for the CRDManager.
119
func (c *crdManager) DebugInfo() interface{} {
120
c.mut.Lock()
121
defer c.mut.Unlock()
122
123
var info operator.DebugInfo
124
for _, pm := range c.debugInfo {
125
info.DiscoveredCRDs = append(info.DiscoveredCRDs, pm)
126
}
127
info.Targets = compscrape.BuildTargetStatuses(c.scrapeManager.TargetsActive())
128
return info
129
}
130
131
// runInformers starts all the informers that are required to discover CRDs.
132
func (c *crdManager) runInformers(ctx context.Context) error {
133
config, err := c.args.Client.BuildRESTConfig(c.logger)
134
if err != nil {
135
return fmt.Errorf("creating rest config: %w", err)
136
}
137
138
scheme := runtime.NewScheme()
139
for _, add := range []func(*runtime.Scheme) error{
140
promopv1.AddToScheme,
141
} {
142
if err := add(scheme); err != nil {
143
return fmt.Errorf("unable to register scheme: %w", err)
144
}
145
}
146
147
ls, err := c.args.LabelSelector.BuildSelector()
148
if err != nil {
149
return fmt.Errorf("building label selector: %w", err)
150
}
151
for _, ns := range c.args.Namespaces {
152
opts := cache.Options{
153
Scheme: scheme,
154
Namespace: ns,
155
}
156
157
if ls != labels.Nothing() {
158
opts.DefaultSelector.Label = ls
159
}
160
cache, err := cache.New(config, opts)
161
if err != nil {
162
return err
163
}
164
165
informers := cache
166
167
go func() {
168
err := informers.Start(ctx)
169
// If the context was canceled, we don't want to log an error.
170
if err != nil && ctx.Err() == nil {
171
level.Error(c.logger).Log("msg", "failed to start informers", "err", err)
172
}
173
}()
174
if !informers.WaitForCacheSync(ctx) {
175
return fmt.Errorf("informer caches failed to sync")
176
}
177
if err := c.configureInformers(ctx, informers); err != nil {
178
return fmt.Errorf("failed to configure informers: %w", err)
179
}
180
}
181
182
return nil
183
}
184
185
// configureInformers configures the informers for the CRDManager to watch for crd changes.
186
func (c *crdManager) configureInformers(ctx context.Context, informers cache.Informers) error {
187
var prototype client.Object
188
switch c.kind {
189
case KindPodMonitor:
190
prototype = &promopv1.PodMonitor{}
191
case KindServiceMonitor:
192
prototype = &promopv1.ServiceMonitor{}
193
default:
194
return fmt.Errorf("unknown kind to configure Informers: %s", c.kind)
195
}
196
197
informerCtx, cancel := context.WithTimeout(ctx, informerSyncTimeout)
198
defer cancel()
199
200
informer, err := informers.GetInformer(informerCtx, prototype)
201
if err != nil {
202
if errors.Is(informerCtx.Err(), context.DeadlineExceeded) { // Check the context to prevent GetInformer returning a fake timeout
203
return fmt.Errorf("timeout exceeded while configuring informers. Check the connection"+
204
" to the Kubernetes API is stable and that the Agent has appropriate RBAC permissions for %v", prototype)
205
}
206
207
return err
208
}
209
switch c.kind {
210
case KindPodMonitor:
211
_, err = informer.AddEventHandler((toolscache.ResourceEventHandlerFuncs{
212
AddFunc: c.onAddPodMonitor,
213
UpdateFunc: c.onUpdatePodMonitor,
214
DeleteFunc: c.onDeletePodMonitor,
215
}))
216
case KindServiceMonitor:
217
_, err = informer.AddEventHandler((toolscache.ResourceEventHandlerFuncs{
218
AddFunc: c.onAddServiceMonitor,
219
UpdateFunc: c.onUpdateServiceMonitor,
220
DeleteFunc: c.onDeleteServiceMonitor,
221
}))
222
default:
223
return fmt.Errorf("unknown kind to configure Informers: %s", c.kind)
224
}
225
226
if err != nil {
227
return err
228
}
229
return nil
230
}
231
232
// apply applies the current state of the Manager to the Prometheus discovery manager and scrape manager.
233
func (c *crdManager) apply() error {
234
c.mut.Lock()
235
defer c.mut.Unlock()
236
err := c.discoveryManager.ApplyConfig(c.discoveryConfigs)
237
if err != nil {
238
level.Error(c.logger).Log("msg", "error applying discovery configs", "err", err)
239
return err
240
}
241
scs := []*config.ScrapeConfig{}
242
for _, sc := range c.scrapeConfigs {
243
scs = append(scs, sc)
244
}
245
err = c.scrapeManager.ApplyConfig(&config.Config{
246
ScrapeConfigs: scs,
247
})
248
if err != nil {
249
level.Error(c.logger).Log("msg", "error applying scrape configs", "err", err)
250
return err
251
}
252
level.Debug(c.logger).Log("msg", "scrape config was updated")
253
return nil
254
}
255
256
func (c *crdManager) addDebugInfo(ns string, name string, err error) {
257
c.mut.Lock()
258
defer c.mut.Unlock()
259
debug := &operator.DiscoveredResource{}
260
debug.Namespace = ns
261
debug.Name = name
262
debug.LastReconcile = time.Now()
263
if err != nil {
264
debug.ReconcileError = err.Error()
265
} else {
266
debug.ReconcileError = ""
267
}
268
prefix := fmt.Sprintf("%s/%s/%s", c.kind, ns, name)
269
c.debugInfo[prefix] = debug
270
}
271
272
func (c *crdManager) addPodMonitor(pm *promopv1.PodMonitor) {
273
var err error
274
for i, ep := range pm.Spec.PodMetricsEndpoints {
275
var pmc *config.ScrapeConfig
276
pmc, err = c.configGen.GeneratePodMonitorConfig(pm, ep, i)
277
if err != nil {
278
// TODO(jcreixell): Generate Kubernetes event to inform of this error when running `kubectl get <podmonitor>`.
279
level.Error(c.logger).Log("name", pm.Name, "err", err, "msg", "error generating scrapeconfig from podmonitor")
280
break
281
}
282
c.mut.Lock()
283
c.discoveryConfigs[pmc.JobName] = pmc.ServiceDiscoveryConfigs
284
c.scrapeConfigs[pmc.JobName] = pmc
285
c.mut.Unlock()
286
}
287
if err != nil {
288
c.addDebugInfo(pm.Namespace, pm.Name, err)
289
return
290
}
291
if err = c.apply(); err != nil {
292
level.Error(c.logger).Log("name", pm.Name, "err", err, "msg", "error applying scrape configs from "+c.kind)
293
}
294
c.addDebugInfo(pm.Namespace, pm.Name, err)
295
}
296
297
func (c *crdManager) onAddPodMonitor(obj interface{}) {
298
pm := obj.(*promopv1.PodMonitor)
299
level.Info(c.logger).Log("msg", "found pod monitor", "name", pm.Name)
300
c.addPodMonitor(pm)
301
}
302
func (c *crdManager) onUpdatePodMonitor(oldObj, newObj interface{}) {
303
pm := oldObj.(*promopv1.PodMonitor)
304
c.clearConfigs("podMonitor", pm.Namespace, pm.Name)
305
c.addPodMonitor(newObj.(*promopv1.PodMonitor))
306
}
307
308
func (c *crdManager) onDeletePodMonitor(obj interface{}) {
309
pm := obj.(*promopv1.PodMonitor)
310
c.clearConfigs("podMonitor", pm.Namespace, pm.Name)
311
if err := c.apply(); err != nil {
312
level.Error(c.logger).Log("name", pm.Name, "err", err, "msg", "error applying scrape configs after deleting "+c.kind)
313
}
314
}
315
316
func (c *crdManager) addServiceMonitor(sm *promopv1.ServiceMonitor) {
317
var err error
318
for i, ep := range sm.Spec.Endpoints {
319
var pmc *config.ScrapeConfig
320
pmc, err = c.configGen.GenerateServiceMonitorConfig(sm, ep, i)
321
if err != nil {
322
// TODO(jcreixell): Generate Kubernetes event to inform of this error when running `kubectl get <servicemonitor>`.
323
level.Error(c.logger).Log("name", sm.Name, "err", err, "msg", "error generating scrapeconfig from serviceMonitor")
324
break
325
}
326
c.mut.Lock()
327
c.discoveryConfigs[pmc.JobName] = pmc.ServiceDiscoveryConfigs
328
c.scrapeConfigs[pmc.JobName] = pmc
329
c.mut.Unlock()
330
}
331
if err != nil {
332
c.addDebugInfo(sm.Namespace, sm.Name, err)
333
return
334
}
335
if err = c.apply(); err != nil {
336
level.Error(c.logger).Log("name", sm.Name, "err", err, "msg", "error applying scrape configs from "+c.kind)
337
}
338
c.addDebugInfo(sm.Namespace, sm.Name, err)
339
}
340
341
func (c *crdManager) onAddServiceMonitor(obj interface{}) {
342
pm := obj.(*promopv1.ServiceMonitor)
343
level.Info(c.logger).Log("msg", "found pod monitor", "name", pm.Name)
344
c.addServiceMonitor(pm)
345
}
346
func (c *crdManager) onUpdateServiceMonitor(oldObj, newObj interface{}) {
347
pm := oldObj.(*promopv1.ServiceMonitor)
348
c.clearConfigs("serviceMonitor", pm.Namespace, pm.Name)
349
c.addServiceMonitor(newObj.(*promopv1.ServiceMonitor))
350
}
351
352
func (c *crdManager) onDeleteServiceMonitor(obj interface{}) {
353
pm := obj.(*promopv1.ServiceMonitor)
354
c.clearConfigs("serviceMonitor", pm.Namespace, pm.Name)
355
if err := c.apply(); err != nil {
356
level.Error(c.logger).Log("name", pm.Name, "err", err, "msg", "error applying scrape configs after deleting "+c.kind)
357
}
358
}
359
360
func (c *crdManager) clearConfigs(kind, ns, name string) {
361
c.mut.Lock()
362
defer c.mut.Unlock()
363
prefix := fmt.Sprintf("%s/%s/%s", kind, ns, name)
364
for k := range c.discoveryConfigs {
365
if strings.HasPrefix(k, prefix) {
366
delete(c.discoveryConfigs, k)
367
delete(c.scrapeConfigs, k)
368
}
369
}
370
delete(c.debugInfo, prefix)
371
}
372
373