Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/integrations/manager.go
5287 views
1
package integrations
2
3
import (
4
"context"
5
"fmt"
6
"net/http"
7
"path"
8
"strings"
9
"sync"
10
"time"
11
12
config_util "github.com/prometheus/common/config"
13
14
"github.com/go-kit/log"
15
"github.com/go-kit/log/level"
16
"github.com/gorilla/mux"
17
"github.com/grafana/agent/pkg/metrics"
18
"github.com/grafana/agent/pkg/metrics/instance"
19
"github.com/grafana/agent/pkg/metrics/instance/configstore"
20
"github.com/grafana/agent/pkg/server"
21
"github.com/grafana/agent/pkg/util"
22
"github.com/prometheus/client_golang/prometheus"
23
"github.com/prometheus/client_golang/prometheus/promauto"
24
"github.com/prometheus/common/model"
25
promConfig "github.com/prometheus/prometheus/config"
26
"github.com/prometheus/prometheus/discovery"
27
"github.com/prometheus/prometheus/model/relabel"
28
)
29
30
var (
31
integrationAbnormalExits = promauto.NewCounterVec(prometheus.CounterOpts{
32
Name: "agent_metrics_integration_abnormal_exits_total",
33
Help: "Total number of times an agent integration exited unexpectedly, causing it to be restarted.",
34
}, []string{"integration_name"})
35
)
36
37
var CurrentManagerConfig ManagerConfig = DefaultManagerConfig()
38
39
// DefaultManagerConfig holds the default settings for integrations.
40
func DefaultManagerConfig() ManagerConfig {
41
return ManagerConfig{
42
ScrapeIntegrations: true,
43
IntegrationRestartBackoff: 5 * time.Second,
44
45
// Deprecated fields which keep their previous defaults:
46
UseHostnameLabel: true,
47
ReplaceInstanceLabel: true,
48
}
49
}
50
51
// ManagerConfig holds the configuration for all integrations.
52
type ManagerConfig struct {
53
// When true, scrapes metrics from integrations.
54
ScrapeIntegrations bool `yaml:"scrape_integrations,omitempty"`
55
56
// The integration configs is merged with the manager config struct so we
57
// don't want to export it here; we'll manually unmarshal it in UnmarshalYAML.
58
Integrations Configs `yaml:"-"`
59
60
// Extra labels to add for all integration samples
61
Labels model.LabelSet `yaml:"labels,omitempty"`
62
63
// Prometheus RW configs to use for all integrations.
64
PrometheusRemoteWrite []*promConfig.RemoteWriteConfig `yaml:"prometheus_remote_write,omitempty"`
65
66
IntegrationRestartBackoff time.Duration `yaml:"integration_restart_backoff,omitempty"`
67
68
// ListenPort tells the integration Manager which port the Agent is
69
// listening on for generating Prometheus instance configs.
70
ListenPort int `yaml:"-"`
71
72
// ListenHost tells the integration Manager which port the Agent is
73
// listening on for generating Prometheus instance configs
74
ListenHost string `yaml:"-"`
75
76
TLSConfig config_util.TLSConfig `yaml:"http_tls_config,omitempty"`
77
78
// This is set to true if the Server TLSConfig Cert and Key path are set
79
ServerUsingTLS bool `yaml:"-"`
80
81
// We use this config to check if we need to reload integrations or not
82
// The Integrations Configs don't have prometheus defaults applied which
83
// can cause us skip reload when scrape configs change
84
PrometheusGlobalConfig promConfig.GlobalConfig `yaml:"-"`
85
86
//
87
// Deprecated and ignored fields.
88
//
89
90
ReplaceInstanceLabel bool `yaml:"replace_instance_label,omitempty"` // DEPRECATED, unused
91
UseHostnameLabel bool `yaml:"use_hostname_label,omitempty"` // DEPRECATED, unused
92
}
93
94
// MarshalYAML implements yaml.Marshaler for ManagerConfig.
95
func (c ManagerConfig) MarshalYAML() (interface{}, error) {
96
return MarshalYAML(c)
97
}
98
99
// UnmarshalYAML implements yaml.Unmarshaler for ManagerConfig.
100
func (c *ManagerConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
101
*c = DefaultManagerConfig()
102
return UnmarshalYAML(c, unmarshal)
103
}
104
105
// DefaultRelabelConfigs returns the set of relabel configs that should be
106
// prepended to all RelabelConfigs for an integration.
107
func (c *ManagerConfig) DefaultRelabelConfigs(instanceKey string) []*relabel.Config {
108
return []*relabel.Config{{
109
SourceLabels: model.LabelNames{model.AddressLabel},
110
Action: relabel.Replace,
111
Separator: ";",
112
Regex: relabel.MustNewRegexp("(.*)"),
113
Replacement: instanceKey,
114
TargetLabel: model.InstanceLabel,
115
}}
116
}
117
118
// ApplyDefaults applies default settings to the ManagerConfig and validates
119
// that it can be used.
120
//
121
// If any integrations are enabled and are configured to be scraped, the
122
// Prometheus configuration must have a WAL directory configured.
123
func (c *ManagerConfig) ApplyDefaults(sflags *server.Flags, mcfg *metrics.Config) error {
124
host, port, err := sflags.HTTP.ListenHostPort()
125
if err != nil {
126
return fmt.Errorf("reading HTTP host:port: %w", err)
127
}
128
129
c.ListenHost = host
130
c.ListenPort = port
131
c.ServerUsingTLS = sflags.HTTP.UseTLS
132
133
if len(c.PrometheusRemoteWrite) == 0 {
134
c.PrometheusRemoteWrite = mcfg.Global.RemoteWrite
135
}
136
c.PrometheusGlobalConfig = mcfg.Global.Prometheus
137
138
for _, ic := range c.Integrations {
139
if !ic.Common.Enabled {
140
continue
141
}
142
143
scrapeIntegration := c.ScrapeIntegrations
144
if common := ic.Common; common.ScrapeIntegration != nil {
145
scrapeIntegration = *common.ScrapeIntegration
146
}
147
148
// WAL must be configured if an integration is going to be scraped.
149
if scrapeIntegration && mcfg.WALDir == "" {
150
return fmt.Errorf("no wal_directory configured")
151
}
152
}
153
154
return nil
155
}
156
157
// Manager manages a set of integrations and runs them.
158
type Manager struct {
159
logger log.Logger
160
161
cfgMut sync.RWMutex
162
cfg ManagerConfig
163
164
hostname string
165
166
ctx context.Context
167
cancel context.CancelFunc
168
wg sync.WaitGroup
169
170
im instance.Manager
171
validator configstore.Validator
172
173
integrationsMut sync.RWMutex
174
integrations map[string]*integrationProcess
175
176
handlerMut sync.Mutex
177
handlerCache map[string]handlerCacheEntry
178
}
179
180
// NewManager creates a new integrations manager. NewManager must be given an
181
// InstanceManager which is responsible for accepting instance configs to
182
// scrape and send metrics from running integrations.
183
func NewManager(cfg ManagerConfig, logger log.Logger, im instance.Manager, validate configstore.Validator) (*Manager, error) {
184
ctx, cancel := context.WithCancel(context.Background())
185
186
m := &Manager{
187
logger: logger,
188
189
ctx: ctx,
190
cancel: cancel,
191
192
im: im,
193
validator: validate,
194
195
integrations: make(map[string]*integrationProcess, len(cfg.Integrations)),
196
197
handlerCache: make(map[string]handlerCacheEntry),
198
}
199
200
var err error
201
m.hostname, err = instance.Hostname()
202
if err != nil {
203
return nil, err
204
}
205
206
if err := m.ApplyConfig(cfg); err != nil {
207
return nil, fmt.Errorf("failed applying config: %w", err)
208
}
209
return m, nil
210
}
211
212
// ApplyConfig updates the configuration of the integrations subsystem.
213
func (m *Manager) ApplyConfig(cfg ManagerConfig) error {
214
var failed bool
215
216
m.cfgMut.Lock()
217
defer m.cfgMut.Unlock()
218
219
m.integrationsMut.Lock()
220
defer m.integrationsMut.Unlock()
221
222
// The global prometheus config settings don't get applied to integrations until later. This
223
// causes us to skip reload when those settings change.
224
if util.CompareYAML(m.cfg, cfg) && util.CompareYAML(m.cfg.PrometheusGlobalConfig, cfg.PrometheusGlobalConfig) {
225
level.Debug(m.logger).Log("msg", "Integrations config is unchanged skipping apply")
226
return nil
227
}
228
level.Debug(m.logger).Log("msg", "Applying integrations config changes")
229
230
select {
231
case <-m.ctx.Done():
232
return fmt.Errorf("Manager already stopped")
233
default:
234
// No-op
235
}
236
237
// Iterate over our integrations. New or changed integrations will be
238
// started, with their existing counterparts being shut down.
239
for _, ic := range cfg.Integrations {
240
if !ic.Common.Enabled {
241
continue
242
}
243
// Key is used to identify the instance of this integration within the
244
// instance manager and within our set of running integrations.
245
key := integrationKey(ic.Name())
246
247
// Look for an existing integration with the same key. If it exists and
248
// is unchanged, we have nothing to do. Otherwise, we're going to recreate
249
// it with the new settings, so we'll need to stop it.
250
if p, exist := m.integrations[key]; exist {
251
if util.CompareYAMLWithHook(p.cfg, ic, noScrubbedSecretsHook) {
252
continue
253
}
254
p.stop()
255
delete(m.integrations, key)
256
}
257
258
l := log.With(m.logger, "integration", ic.Name())
259
i, err := ic.NewIntegration(l)
260
if err != nil {
261
level.Error(m.logger).Log("msg", "failed to initialize integration. it will not run or be scraped", "integration", ic.Name(), "err", err)
262
failed = true
263
264
// If this integration was running before, its instance won't be cleaned
265
// up since it's now removed from the map. We need to clean it up here.
266
_ = m.im.DeleteConfig(key)
267
continue
268
}
269
270
// Find what instance label should be used to represent this integration.
271
var instanceKey string
272
if kp := ic.Common.InstanceKey; kp != nil {
273
// Common config takes precedence.
274
instanceKey = strings.TrimSpace(*kp)
275
} else {
276
instanceKey, err = ic.InstanceKey(fmt.Sprintf("%s:%d", m.hostname, cfg.ListenPort))
277
if err != nil {
278
level.Error(m.logger).Log("msg", "failed to get instance key for integration. it will not run or be scraped", "integration", ic.Name(), "err", err)
279
failed = true
280
281
// If this integration was running before, its instance won't be cleaned
282
// up since it's now removed from the map. We need to clean it up here.
283
_ = m.im.DeleteConfig(key)
284
continue
285
}
286
}
287
288
// Create, start, and register the new integration.
289
ctx, cancel := context.WithCancel(m.ctx)
290
p := &integrationProcess{
291
log: m.logger,
292
cfg: ic,
293
i: i,
294
instanceKey: instanceKey,
295
296
ctx: ctx,
297
stop: cancel,
298
299
wg: &m.wg,
300
wait: m.instanceBackoff,
301
}
302
go p.Run()
303
m.integrations[key] = p
304
}
305
306
// Delete instances and processed that have been removed in between calls to
307
// ApplyConfig.
308
for key, process := range m.integrations {
309
foundConfig := false
310
for _, ic := range cfg.Integrations {
311
if integrationKey(ic.Name()) == key {
312
// If this is disabled then we should delete from integrations
313
if !ic.Common.Enabled {
314
break
315
}
316
foundConfig = true
317
break
318
}
319
}
320
if foundConfig {
321
continue
322
}
323
324
_ = m.im.DeleteConfig(key)
325
process.stop()
326
delete(m.integrations, key)
327
}
328
329
// Re-apply configs to our instance manager for all running integrations.
330
// Generated scrape configs may change in between calls to ApplyConfig even
331
// if the configs for the integration didn't.
332
for key, p := range m.integrations {
333
shouldCollect := cfg.ScrapeIntegrations
334
if common := p.cfg.Common; common.ScrapeIntegration != nil {
335
shouldCollect = *common.ScrapeIntegration
336
}
337
338
switch shouldCollect {
339
case true:
340
instanceConfig := m.instanceConfigForIntegration(p, cfg)
341
if err := m.validator(&instanceConfig); err != nil {
342
level.Error(p.log).Log("msg", "failed to validate generated scrape config for integration. integration will not be scraped", "err", err, "integration", p.cfg.Name())
343
failed = true
344
break
345
}
346
347
if err := m.im.ApplyConfig(instanceConfig); err != nil {
348
level.Error(p.log).Log("msg", "failed to apply integration. integration will not be scraped", "err", err, "integration", p.cfg.Name())
349
failed = true
350
}
351
case false:
352
// If a previous instance of the config was being scraped, we need to
353
// delete it here. Calling DeleteConfig when nothing is running is a safe
354
// operation.
355
_ = m.im.DeleteConfig(key)
356
}
357
}
358
359
m.cfg = cfg
360
361
if failed {
362
return fmt.Errorf("not all integrations were correctly updated")
363
}
364
return nil
365
}
366
367
func noScrubbedSecretsHook(in interface{}) (ok bool, out interface{}, err error) {
368
switch v := in.(type) {
369
case config_util.Secret:
370
return true, string(v), nil
371
case *config_util.URL:
372
return true, v.String(), nil
373
default:
374
return false, nil, nil
375
}
376
}
377
378
// integrationProcess is a running integration.
379
type integrationProcess struct {
380
log log.Logger
381
ctx context.Context
382
stop context.CancelFunc
383
cfg UnmarshaledConfig
384
instanceKey string // Value for the `instance` label
385
i Integration
386
387
wg *sync.WaitGroup
388
wait func(cfg Config, err error)
389
}
390
391
// Run runs the integration until the process is canceled.
392
func (p *integrationProcess) Run() {
393
defer func() {
394
if r := recover(); r != nil {
395
err := fmt.Errorf("%v", r)
396
level.Error(p.log).Log("msg", "integration has panicked. THIS IS A BUG!", "err", err, "integration", p.cfg.Name())
397
}
398
}()
399
400
p.wg.Add(1)
401
defer p.wg.Done()
402
403
for {
404
err := p.i.Run(p.ctx)
405
if err != nil && err != context.Canceled {
406
p.wait(p.cfg, err)
407
} else {
408
level.Info(p.log).Log("msg", "stopped integration", "integration", p.cfg.Name())
409
break
410
}
411
}
412
}
413
414
func (m *Manager) instanceBackoff(cfg Config, err error) {
415
m.cfgMut.RLock()
416
defer m.cfgMut.RUnlock()
417
418
integrationAbnormalExits.WithLabelValues(cfg.Name()).Inc()
419
level.Error(m.logger).Log("msg", "integration stopped abnormally, restarting after backoff", "err", err, "integration", cfg.Name(), "backoff", m.cfg.IntegrationRestartBackoff)
420
time.Sleep(m.cfg.IntegrationRestartBackoff)
421
}
422
423
func (m *Manager) instanceConfigForIntegration(p *integrationProcess, cfg ManagerConfig) instance.Config {
424
common := p.cfg.Common
425
relabelConfigs := append(cfg.DefaultRelabelConfigs(p.instanceKey), common.RelabelConfigs...)
426
427
schema := "http"
428
// Check for HTTPS support
429
var httpClientConfig config_util.HTTPClientConfig
430
if cfg.ServerUsingTLS {
431
schema = "https"
432
httpClientConfig.TLSConfig = cfg.TLSConfig
433
}
434
435
var scrapeConfigs []*promConfig.ScrapeConfig
436
437
for _, isc := range p.i.ScrapeConfigs() {
438
sc := &promConfig.ScrapeConfig{
439
JobName: fmt.Sprintf("integrations/%s", isc.JobName),
440
MetricsPath: path.Join("/integrations", p.cfg.Name(), isc.MetricsPath),
441
Params: isc.QueryParams,
442
Scheme: schema,
443
HonorLabels: false,
444
HonorTimestamps: true,
445
ScrapeInterval: model.Duration(common.ScrapeInterval),
446
ScrapeTimeout: model.Duration(common.ScrapeTimeout),
447
ServiceDiscoveryConfigs: m.scrapeServiceDiscovery(cfg),
448
RelabelConfigs: relabelConfigs,
449
MetricRelabelConfigs: common.MetricRelabelConfigs,
450
HTTPClientConfig: httpClientConfig,
451
}
452
453
scrapeConfigs = append(scrapeConfigs, sc)
454
}
455
456
instanceCfg := instance.DefaultConfig
457
instanceCfg.Name = integrationKey(p.cfg.Name())
458
instanceCfg.ScrapeConfigs = scrapeConfigs
459
instanceCfg.RemoteWrite = cfg.PrometheusRemoteWrite
460
if common.WALTruncateFrequency > 0 {
461
instanceCfg.WALTruncateFrequency = common.WALTruncateFrequency
462
}
463
return instanceCfg
464
}
465
466
// integrationKey returns the key for an integration Config, used for its
467
// instance name and name in the process cache.
468
func integrationKey(name string) string {
469
return fmt.Sprintf("integration/%s", name)
470
}
471
472
func (m *Manager) scrapeServiceDiscovery(cfg ManagerConfig) discovery.Configs {
473
// A blank host somehow works, but it then requires a sever name to be set under tls.
474
newHost := cfg.ListenHost
475
if newHost == "" {
476
newHost = "127.0.0.1"
477
}
478
localAddr := fmt.Sprintf("%s:%d", newHost, cfg.ListenPort)
479
labels := model.LabelSet{}
480
labels[model.LabelName("agent_hostname")] = model.LabelValue(m.hostname)
481
for k, v := range cfg.Labels {
482
labels[k] = v
483
}
484
485
return discovery.Configs{
486
discovery.StaticConfig{{
487
Targets: []model.LabelSet{{model.AddressLabel: model.LabelValue(localAddr)}},
488
Labels: labels,
489
}},
490
}
491
}
492
493
// WireAPI hooks up /metrics routes per-integration.
494
func (m *Manager) WireAPI(r *mux.Router) {
495
r.HandleFunc("/integrations/{name}/metrics", func(rw http.ResponseWriter, r *http.Request) {
496
m.integrationsMut.RLock()
497
defer m.integrationsMut.RUnlock()
498
499
key := integrationKey(mux.Vars(r)["name"])
500
handler := m.loadHandler(key)
501
handler.ServeHTTP(rw, r)
502
})
503
}
504
505
// loadHandler will perform a dynamic lookup of an HTTP handler for an
506
// integration. loadHandler should be called with a read lock on the
507
// integrations mutex.
508
func (m *Manager) loadHandler(key string) http.Handler {
509
m.handlerMut.Lock()
510
defer m.handlerMut.Unlock()
511
512
// Search the integration by name to see if it's still running.
513
p, ok := m.integrations[key]
514
if !ok {
515
delete(m.handlerCache, key)
516
return http.NotFoundHandler()
517
}
518
519
// Now look in the cache for a handler for the running process.
520
cacheEntry, ok := m.handlerCache[key]
521
if ok && cacheEntry.process == p {
522
return cacheEntry.handler
523
}
524
525
// New integration process that hasn't been scraped before. Generate
526
// a handler for it and cache it.
527
handler, err := p.i.MetricsHandler()
528
if err != nil {
529
level.Error(m.logger).Log("msg", "could not create http handler for integration", "integration", p.cfg.Name(), "err", err)
530
return http.HandlerFunc(internalServiceError)
531
}
532
533
cacheEntry = handlerCacheEntry{handler: handler, process: p}
534
m.handlerCache[key] = cacheEntry
535
return cacheEntry.handler
536
}
537
538
func internalServiceError(w http.ResponseWriter, r *http.Request) {
539
http.Error(w, "500 Internal Server Error", http.StatusInternalServerError)
540
}
541
542
// Stop stops the manager and all of its integrations. Blocks until all running
543
// integrations exit.
544
func (m *Manager) Stop() {
545
m.cancel()
546
m.wg.Wait()
547
}
548
549
type handlerCacheEntry struct {
550
handler http.Handler
551
process *integrationProcess
552
}
553
554