Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/integrations/v2/controller.go
5282 views
1
package integrations
2
3
import (
4
"context"
5
"errors"
6
"fmt"
7
"net/http"
8
"net/url"
9
"path"
10
"sort"
11
"strings"
12
"sync"
13
14
"github.com/go-kit/log"
15
"github.com/go-kit/log/level"
16
"github.com/gorilla/mux"
17
"github.com/grafana/agent/pkg/integrations/v2/autoscrape"
18
"github.com/prometheus/prometheus/discovery"
19
http_sd "github.com/prometheus/prometheus/discovery/http"
20
"go.uber.org/atomic"
21
)
22
23
// controllerConfig holds a set of integration configs.
24
type controllerConfig []Config
25
26
// controller manages a set of integrations.
27
type controller struct {
28
logger log.Logger
29
30
mut sync.Mutex
31
cfg controllerConfig
32
globals Globals
33
integrations []*controlledIntegration // Running integrations
34
35
runIntegrations chan []*controlledIntegration // Schedule integrations to run
36
}
37
38
// newController creates a new Controller. Controller is intended to be
39
// embedded inside of integrations that may want to multiplex other
40
// integrations.
41
func newController(l log.Logger, cfg controllerConfig, globals Globals) (*controller, error) {
42
c := &controller{
43
logger: l,
44
runIntegrations: make(chan []*controlledIntegration, 1),
45
}
46
if err := c.UpdateController(cfg, globals); err != nil {
47
return nil, err
48
}
49
return c, nil
50
}
51
52
// run starts the controller and blocks until ctx is canceled.
53
func (c *controller) run(ctx context.Context) {
54
pool := newWorkerPool(ctx, c.logger)
55
defer pool.Close()
56
57
for {
58
select {
59
case <-ctx.Done():
60
level.Debug(c.logger).Log("msg", "controller exiting")
61
return
62
case newIntegrations := <-c.runIntegrations:
63
pool.Reload(newIntegrations)
64
}
65
}
66
}
67
68
// controlledIntegration is a running Integration. A running integration is
69
// identified uniquely by its id.
70
type controlledIntegration struct {
71
id integrationID
72
i Integration
73
c Config // Config that generated i. Used for changing to see if a config changed.
74
running atomic.Bool
75
}
76
77
func (ci *controlledIntegration) Running() bool {
78
return ci.running.Load()
79
}
80
81
// integrationID uses a tuple of Name and Identifier to uniquely identify an
82
// integration.
83
type integrationID struct{ Name, Identifier string }
84
85
func (id integrationID) String() string {
86
return fmt.Sprintf("%s/%s", id.Name, id.Identifier)
87
}
88
89
// UpdateController updates the Controller with new Controller and
90
// IntegrationOptions.
91
//
92
// UpdateController updates running integrations. Extensions can be
93
// recalculated by calling relevant methods like Handler or Targets.
94
func (c *controller) UpdateController(cfg controllerConfig, globals Globals) error {
95
c.mut.Lock()
96
defer c.mut.Unlock()
97
98
// Ensure that no singleton integration is defined twice
99
var (
100
duplicatedSingletons []string
101
singletonSet = make(map[string]struct{})
102
)
103
for _, cfg := range cfg {
104
t, _ := RegisteredType(cfg)
105
if t != TypeSingleton {
106
continue
107
}
108
109
if _, exists := singletonSet[cfg.Name()]; exists {
110
duplicatedSingletons = append(duplicatedSingletons, cfg.Name())
111
continue
112
}
113
singletonSet[cfg.Name()] = struct{}{}
114
}
115
if len(duplicatedSingletons) == 1 {
116
return fmt.Errorf("integration %q may only be defined once", duplicatedSingletons[0])
117
} else if len(duplicatedSingletons) > 1 {
118
list := strings.Join(duplicatedSingletons, ", ")
119
return fmt.Errorf("the following integrations may only be defined once each: %s", list)
120
}
121
122
integrationIDMap := map[integrationID]struct{}{}
123
124
integrations := make([]*controlledIntegration, 0, len(cfg))
125
126
NextConfig:
127
for _, ic := range cfg {
128
name := ic.Name()
129
130
identifier, err := ic.Identifier(globals)
131
if err != nil {
132
return fmt.Errorf("could not build identifier for integration %q: %w", name, err)
133
}
134
135
if err := ic.ApplyDefaults(globals); err != nil {
136
return fmt.Errorf("failed to apply defaults for %s/%s: %w", name, identifier, err)
137
}
138
139
id := integrationID{Name: name, Identifier: identifier}
140
if _, exist := integrationIDMap[id]; exist {
141
return fmt.Errorf("multiple instance names %q in integration %q", identifier, name)
142
}
143
integrationIDMap[id] = struct{}{}
144
145
// Now that we know the ID for an integration, we can check to see if it's
146
// running and can be dynamically updated.
147
for _, ci := range c.integrations {
148
if ci.id != id {
149
continue
150
}
151
152
// If the configs haven't changed, then we don't need to do anything.
153
if CompareConfigs(ci.c, ic) {
154
integrations = append(integrations, ci)
155
continue NextConfig
156
}
157
158
if ui, ok := ci.i.(UpdateIntegration); ok {
159
if err := ui.ApplyConfig(ic, globals); errors.Is(err, ErrInvalidUpdate) {
160
level.Warn(c.logger).Log("msg", "failed to dynamically update integration; will recreate", "integration", name, "instance", identifier, "err', err")
161
break
162
} else if err != nil {
163
return fmt.Errorf("failed to update %s integration %q: %w", name, identifier, err)
164
} else {
165
// Update succeeded; re-use the running one and go to the next
166
// integration to process.
167
integrations = append(integrations, ci)
168
continue NextConfig
169
}
170
}
171
172
// We found the integration to update: we can stop this loop now.
173
break
174
}
175
176
logger := log.With(c.logger, "integration", name, "instance", identifier)
177
integration, err := ic.NewIntegration(logger, globals)
178
if err != nil {
179
return fmt.Errorf("failed to construct %s integration %q: %w", name, identifier, err)
180
}
181
182
// Create a new controlled integration.
183
integrations = append(integrations, &controlledIntegration{
184
id: id,
185
i: integration,
186
c: ic,
187
})
188
}
189
190
// Schedule integrations to run
191
c.runIntegrations <- integrations
192
193
c.cfg = cfg
194
c.globals = globals
195
c.integrations = integrations
196
return nil
197
}
198
199
// Handler returns an HTTP handler for the controller and its integrations.
200
// Handler will pass through requests to other running integrations. Handler
201
// always returns an http.Handler regardless of error.
202
//
203
// Handler is expensive to compute and should only be done after reloading the
204
// config.
205
func (c *controller) Handler(prefix string) (http.Handler, error) {
206
var firstErr error
207
saveFirstErr := func(err error) {
208
if firstErr == nil {
209
firstErr = err
210
}
211
}
212
213
r := mux.NewRouter()
214
215
err := c.forEachIntegration(prefix, func(ci *controlledIntegration, iprefix string) {
216
id := ci.id
217
218
i, ok := ci.i.(HTTPIntegration)
219
if !ok {
220
return
221
}
222
223
handler, err := i.Handler(iprefix + "/")
224
if err != nil {
225
saveFirstErr(fmt.Errorf("could not generate HTTP handler for %s integration %q: %w", id.Name, id.Identifier, err))
226
return
227
} else if handler == nil {
228
return
229
}
230
231
// Anything that matches the integrationPrefix should be passed to the handler.
232
// The reason these two are separated is if you have two instance names and one is a prefix of another
233
// ie localhost and localhost2, localhost2 will never get called because localhost will always get precedence
234
// add / fixes this, but to keep old behavior we need to ensure /localhost and localhost2 also work, hence
235
// the second handlefunc below this one. https://github.com/grafana/agent/issues/1718
236
hfunc := func(rw http.ResponseWriter, r *http.Request) {
237
if !ci.Running() {
238
http.Error(rw, fmt.Sprintf("%s integration intance %q not running", id.Name, id.Identifier), http.StatusServiceUnavailable)
239
return
240
}
241
handler.ServeHTTP(rw, r)
242
}
243
r.PathPrefix(iprefix + "/").HandlerFunc(hfunc)
244
// Handle calling the iprefix itself
245
r.HandleFunc(iprefix, hfunc)
246
})
247
if err != nil {
248
level.Warn(c.logger).Log("msg", "error when iterating over integrations to build HTTP handlers", "err", err)
249
}
250
251
// TODO(rfratto): navigation page for exact prefix match
252
253
return r, firstErr
254
}
255
256
// forEachIntegration calculates the prefix for each integration and calls f.
257
// prefix will not end in /.
258
func (c *controller) forEachIntegration(basePrefix string, f func(ci *controlledIntegration, iprefix string)) error {
259
c.mut.Lock()
260
defer c.mut.Unlock()
261
262
// Pre-populate a mapping of integration name -> identifier. If there are
263
// two instances of the same integration, we want to ensure unique routing.
264
//
265
// This special logic is done for backwards compatibility with the original
266
// design of integrations.
267
identifiersMap := map[string][]string{}
268
for _, i := range c.integrations {
269
identifiersMap[i.id.Name] = append(identifiersMap[i.id.Name], i.id.Identifier)
270
}
271
272
usedPrefixes := map[string]struct{}{}
273
274
for _, ci := range c.integrations {
275
id := ci.id
276
multipleInstances := len(identifiersMap[id.Name]) > 1
277
278
var integrationPrefix string
279
if multipleInstances {
280
// i.e., /integrations/mysqld_exporter/server-a
281
integrationPrefix = path.Join(basePrefix, id.Name, id.Identifier)
282
} else {
283
// i.e., /integrations/node_exporter
284
integrationPrefix = path.Join(basePrefix, id.Name)
285
}
286
287
f(ci, integrationPrefix)
288
289
if _, exist := usedPrefixes[integrationPrefix]; exist {
290
return fmt.Errorf("BUG: duplicate integration prefix %q", integrationPrefix)
291
}
292
usedPrefixes[integrationPrefix] = struct{}{}
293
}
294
return nil
295
}
296
297
// Targets returns the current set of targets across all integrations. Use opts
298
// to customize which targets are returned.
299
func (c *controller) Targets(ep Endpoint, opts TargetOptions) []*targetGroup {
300
// Grab the integrations as fast as possible. We don't want to spend too much
301
// time holding the mutex.
302
type prefixedMetricsIntegration struct {
303
id integrationID
304
i MetricsIntegration
305
ep Endpoint
306
}
307
var mm []prefixedMetricsIntegration
308
309
err := c.forEachIntegration(ep.Prefix, func(ci *controlledIntegration, iprefix string) {
310
// Best effort liveness check. They might stop running when we request
311
// their targets, which is fine, but we should save as much work as we
312
// can.
313
if !ci.Running() {
314
return
315
}
316
if mi, ok := ci.i.(MetricsIntegration); ok {
317
ep := Endpoint{Host: ep.Host, Prefix: iprefix}
318
mm = append(mm, prefixedMetricsIntegration{id: ci.id, i: mi, ep: ep})
319
}
320
})
321
if err != nil {
322
level.Warn(c.logger).Log("msg", "error when iterating over integrations to get targets", "err", err)
323
}
324
325
var tgs []*targetGroup
326
for _, mi := range mm {
327
// If we're looking for a subset of integrations, filter out anything that doesn't match.
328
if len(opts.Integrations) > 0 && !stringSliceContains(opts.Integrations, mi.id.Name) {
329
continue
330
}
331
// If we're looking for a specific instance, filter out anything that doesn't match.
332
if opts.Instance != "" && mi.id.Identifier != opts.Instance {
333
continue
334
}
335
336
for _, tgt := range mi.i.Targets(mi.ep) {
337
tgs = append(tgs, (*targetGroup)(tgt))
338
}
339
}
340
sort.Slice(tgs, func(i, j int) bool {
341
return tgs[i].Source < tgs[j].Source
342
})
343
return tgs
344
}
345
346
func stringSliceContains(ss []string, s string) bool {
347
for _, check := range ss {
348
if check == s {
349
return true
350
}
351
}
352
return false
353
}
354
355
// TargetOptions controls which targets should be returned by the subsystem.
356
type TargetOptions struct {
357
// Integrations is the set of integrations to return. An empty slice will
358
// default to returning all integrations.
359
Integrations []string
360
// Instance matches a specific instance from all integrations. An empty
361
// string will match any instance.
362
Instance string
363
}
364
365
// TargetOptionsFromParams creates TargetOptions from parsed URL query parameters.
366
func TargetOptionsFromParams(u url.Values) (TargetOptions, error) {
367
var to TargetOptions
368
369
rawIntegrations := u.Get("integrations")
370
if rawIntegrations != "" {
371
rawIntegrations, err := url.QueryUnescape(rawIntegrations)
372
if err != nil {
373
return to, fmt.Errorf("invalid value for integrations: %w", err)
374
}
375
to.Integrations = strings.Split(rawIntegrations, ",")
376
}
377
378
rawInstance := u.Get("instance")
379
if rawInstance != "" {
380
rawInstance, err := url.QueryUnescape(rawInstance)
381
if err != nil {
382
return to, fmt.Errorf("invalid value for instance: %w", err)
383
}
384
to.Instance = rawInstance
385
}
386
387
return to, nil
388
}
389
390
// ToParams will convert to into URL query parameters.
391
func (to TargetOptions) ToParams() url.Values {
392
p := make(url.Values)
393
if len(to.Integrations) != 0 {
394
p.Set("integrations", url.QueryEscape(strings.Join(to.Integrations, ",")))
395
}
396
if to.Instance != "" {
397
p.Set("instance", url.QueryEscape(to.Instance))
398
}
399
return p
400
}
401
402
// ScrapeConfigs returns a set of scrape configs to use for self-scraping.
403
// sdConfig should contain the full URL where the integrations SD API is
404
// exposed. ScrapeConfigs will inject unique query parameters per integration
405
// to limit what will be discovered.
406
func (c *controller) ScrapeConfigs(prefix string, sdConfig *http_sd.SDConfig) []*autoscrape.ScrapeConfig {
407
// Grab the integrations as fast as possible. We don't want to spend too much
408
// time holding the mutex.
409
type prefixedMetricsIntegration struct {
410
id integrationID
411
i MetricsIntegration
412
prefix string
413
}
414
var mm []prefixedMetricsIntegration
415
416
err := c.forEachIntegration(prefix, func(ci *controlledIntegration, iprefix string) {
417
if mi, ok := ci.i.(MetricsIntegration); ok {
418
mm = append(mm, prefixedMetricsIntegration{id: ci.id, i: mi, prefix: iprefix})
419
}
420
})
421
if err != nil {
422
level.Warn(c.logger).Log("msg", "error when iterating over integrations to get scrape configs", "err", err)
423
}
424
425
var cfgs []*autoscrape.ScrapeConfig
426
for _, mi := range mm {
427
// sdConfig will be pointing to the targets API. By default, this returns absolutely everything.
428
// We want to use the query parameters to inform the API to only return
429
// specific targets.
430
opts := TargetOptions{
431
Integrations: []string{mi.id.Name},
432
Instance: mi.id.Identifier,
433
}
434
435
integrationSDConfig := *sdConfig
436
integrationSDConfig.URL = sdConfig.URL + "?" + opts.ToParams().Encode()
437
sds := discovery.Configs{&integrationSDConfig}
438
cfgs = append(cfgs, mi.i.ScrapeConfigs(sds)...)
439
}
440
sort.Slice(cfgs, func(i, j int) bool {
441
return cfgs[i].Config.JobName < cfgs[j].Config.JobName
442
})
443
return cfgs
444
}
445
446