Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/integrations/v2/autoscrape/autoscrape.go
5367 views
1
// Package autoscrape implements a scraper for integrations.
2
package autoscrape
3
4
import (
5
"context"
6
"sync"
7
8
"github.com/go-kit/log"
9
"github.com/go-kit/log/level"
10
"github.com/grafana/agent/pkg/metrics"
11
"github.com/grafana/agent/pkg/metrics/instance"
12
"github.com/grafana/agent/pkg/server"
13
"github.com/oklog/run"
14
config_util "github.com/prometheus/common/config"
15
"github.com/prometheus/common/model"
16
prom_config "github.com/prometheus/prometheus/config"
17
"github.com/prometheus/prometheus/discovery"
18
"github.com/prometheus/prometheus/model/relabel"
19
"github.com/prometheus/prometheus/scrape"
20
"github.com/prometheus/prometheus/storage"
21
)
22
23
// DefaultGlobal holds default values for Global.
24
var DefaultGlobal = Global{
25
Enable: true,
26
MetricsInstance: "default",
27
}
28
29
// Global holds default settings for metrics integrations that support
30
// autoscraping. Integrations may override their settings.
31
type Global struct {
32
Enable bool `yaml:"enable,omitempty"` // Whether self-scraping should be enabled.
33
MetricsInstance string `yaml:"metrics_instance,omitempty"` // Metrics instance name to send metrics to.
34
ScrapeInterval model.Duration `yaml:"scrape_interval,omitempty"` // Self-scraping frequency.
35
ScrapeTimeout model.Duration `yaml:"scrape_timeout,omitempty"` // Self-scraping timeout.
36
}
37
38
// UnmarshalYAML implements yaml.Unmarshaler.
39
func (g *Global) UnmarshalYAML(f func(interface{}) error) error {
40
*g = DefaultGlobal
41
type global Global
42
return f((*global)(g))
43
}
44
45
// Config configure autoscrape for an individual integration. Override defaults.
46
type Config struct {
47
Enable *bool `yaml:"enable,omitempty"` // Whether self-scraping should be enabled.
48
MetricsInstance string `yaml:"metrics_instance,omitempty"` // Metrics instance name to send metrics to.
49
ScrapeInterval model.Duration `yaml:"scrape_interval,omitempty"` // Self-scraping frequency.
50
ScrapeTimeout model.Duration `yaml:"scrape_timeout,omitempty"` // Self-scraping timeout.
51
52
RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"` // Relabel the autoscrape job
53
MetricRelabelConfigs []*relabel.Config `yaml:"metric_relabel_configs,omitempty"` // Relabel individual autoscrape metrics
54
}
55
56
// InstanceStore is used to find instances to send metrics to. It is a subset
57
// of the pkg/metrics/instance.Manager interface.
58
type InstanceStore interface {
59
// GetInstance retrieves a ManagedInstance by name.
60
GetInstance(name string) (instance.ManagedInstance, error)
61
}
62
63
// ScrapeConfig bind a Prometheus scrape config with an instance to send
64
// scraped metrics to.
65
type ScrapeConfig struct {
66
Instance string
67
Config prom_config.ScrapeConfig
68
}
69
70
// Scraper is a metrics autoscraper.
71
type Scraper struct {
72
ctx context.Context
73
cancel context.CancelFunc
74
75
log log.Logger
76
is InstanceStore
77
78
// Prometheus doesn't pass contextual information at scrape time that could
79
// be used to change the behavior of generating an appender. This means that
80
// it's not yet possible for us to just run a single SD + scrape manager for
81
// all of our integrations, and we instead need to launch a pair of each for
82
// every instance we're writing to.
83
84
iscrapersMut sync.RWMutex
85
iscrapers map[string]*instanceScraper
86
dialerFunc server.DialContextFunc
87
}
88
89
// NewScraper creates a new autoscraper. Scraper will run until Stop is called.
90
// Instances to send scraped metrics to will be looked up via im. Scraping will
91
// use the provided dialerFunc to make connections if non-nil.
92
func NewScraper(l log.Logger, is InstanceStore, dialerFunc server.DialContextFunc) *Scraper {
93
l = log.With(l, "component", "autoscraper")
94
95
ctx, cancel := context.WithCancel(context.Background())
96
97
s := &Scraper{
98
ctx: ctx,
99
cancel: cancel,
100
101
log: l,
102
is: is,
103
iscrapers: map[string]*instanceScraper{},
104
dialerFunc: dialerFunc,
105
}
106
return s
107
}
108
109
// ApplyConfig will apply the given jobs. An error will be returned for any
110
// jobs that failed to be applied.
111
func (s *Scraper) ApplyConfig(jobs []*ScrapeConfig) error {
112
s.iscrapersMut.Lock()
113
defer s.iscrapersMut.Unlock()
114
115
var firstError error
116
saveError := func(e error) {
117
if firstError == nil {
118
firstError = e
119
}
120
}
121
122
// Shard our jobs by target instance.
123
shardedJobs := map[string][]*prom_config.ScrapeConfig{}
124
for _, j := range jobs {
125
_, err := s.is.GetInstance(j.Instance)
126
if err != nil {
127
level.Error(s.log).Log("msg", "cannot autoscrape integration", "name", j.Config.JobName, "err", err)
128
saveError(err)
129
continue
130
}
131
132
shardedJobs[j.Instance] = append(shardedJobs[j.Instance], &j.Config)
133
}
134
135
// Then pass the jobs to instanceScraper, creating them if we need to.
136
for instance, jobs := range shardedJobs {
137
is, ok := s.iscrapers[instance]
138
if !ok {
139
is = newInstanceScraper(s.ctx, s.log, s.is, instance, config_util.DialContextFunc(s.dialerFunc))
140
s.iscrapers[instance] = is
141
}
142
if err := is.ApplyConfig(jobs); err != nil {
143
// Not logging here; is.ApplyConfig already logged the errors.
144
saveError(err)
145
}
146
}
147
148
// Garbage collect: If there's a key in s.scrapers that wasn't in
149
// shardedJobs, stop that unused scraper.
150
for instance, is := range s.iscrapers {
151
_, current := shardedJobs[instance]
152
if !current {
153
is.Stop()
154
delete(s.iscrapers, instance)
155
}
156
}
157
158
return firstError
159
}
160
161
// TargetsActive returns the set of active scrape targets for all target
162
// instances.
163
func (s *Scraper) TargetsActive() map[string]metrics.TargetSet {
164
s.iscrapersMut.RLock()
165
defer s.iscrapersMut.RUnlock()
166
167
allTargets := make(map[string]metrics.TargetSet, len(s.iscrapers))
168
for instance, is := range s.iscrapers {
169
allTargets[instance] = is.sm.TargetsActive()
170
}
171
return allTargets
172
}
173
174
// Stop stops the Scraper.
175
func (s *Scraper) Stop() {
176
s.iscrapersMut.Lock()
177
defer s.iscrapersMut.Unlock()
178
179
for instance, is := range s.iscrapers {
180
is.Stop()
181
delete(s.iscrapers, instance)
182
}
183
184
s.cancel()
185
}
186
187
// instanceScraper is a Scraper which always sends to the same instance.
188
type instanceScraper struct {
189
log log.Logger
190
191
sd *discovery.Manager
192
sm *scrape.Manager
193
cancel context.CancelFunc
194
exited chan struct{}
195
}
196
197
// newInstanceScraper runs a new instanceScraper. Must be stopped by calling
198
// Stop.
199
func newInstanceScraper(
200
ctx context.Context,
201
l log.Logger,
202
s InstanceStore,
203
instanceName string,
204
dialerFunc config_util.DialContextFunc,
205
) *instanceScraper {
206
207
ctx, cancel := context.WithCancel(ctx)
208
l = log.With(l, "target_instance", instanceName)
209
210
sdOpts := []func(*discovery.Manager){
211
discovery.Name("autoscraper/" + instanceName),
212
discovery.HTTPClientOptions(
213
// If dialerFunc is nil, scrape.NewManager will use Go's default dialer.
214
config_util.WithDialContextFunc(dialerFunc),
215
),
216
}
217
sd := discovery.NewManager(ctx, l, sdOpts...)
218
sm := scrape.NewManager(&scrape.Options{
219
HTTPClientOptions: []config_util.HTTPClientOption{
220
// If dialerFunc is nil, scrape.NewManager will use Go's default dialer.
221
config_util.WithDialContextFunc(dialerFunc),
222
},
223
}, l, &agentAppender{
224
inst: instanceName,
225
is: s,
226
})
227
228
is := &instanceScraper{
229
log: l,
230
231
sd: sd,
232
sm: sm,
233
cancel: cancel,
234
exited: make(chan struct{}),
235
}
236
237
go is.run()
238
return is
239
}
240
241
type agentAppender struct {
242
inst string
243
is InstanceStore
244
}
245
246
func (aa *agentAppender) Appender(ctx context.Context) storage.Appender {
247
mi, err := aa.is.GetInstance(aa.inst)
248
if err != nil {
249
return &failedAppender{instanceName: aa.inst}
250
}
251
return mi.Appender(ctx)
252
}
253
254
func (is *instanceScraper) run() {
255
defer close(is.exited)
256
var rg run.Group
257
258
rg.Add(func() error {
259
// Service discovery will stop whenever our parent context is canceled or
260
// if is.cancel is called.
261
err := is.sd.Run()
262
if err != nil {
263
level.Error(is.log).Log("msg", "autoscrape service discovery exited with error", "err", err)
264
}
265
return err
266
}, func(_ error) {
267
is.cancel()
268
})
269
270
rg.Add(func() error {
271
err := is.sm.Run(is.sd.SyncCh())
272
if err != nil {
273
level.Error(is.log).Log("msg", "autoscrape scrape manager exited with error", "err", err)
274
}
275
return err
276
}, func(_ error) {
277
is.sm.Stop()
278
})
279
280
_ = rg.Run()
281
}
282
283
func (is *instanceScraper) ApplyConfig(jobs []*prom_config.ScrapeConfig) error {
284
var firstError error
285
saveError := func(e error) {
286
if firstError == nil && e != nil {
287
firstError = e
288
}
289
}
290
291
var (
292
scrapeConfigs = make([]*prom_config.ScrapeConfig, 0, len(jobs))
293
sdConfigs = make(map[string]discovery.Configs, len(jobs))
294
)
295
for _, job := range jobs {
296
sdConfigs[job.JobName] = job.ServiceDiscoveryConfigs
297
scrapeConfigs = append(scrapeConfigs, job)
298
}
299
if err := is.sd.ApplyConfig(sdConfigs); err != nil {
300
level.Error(is.log).Log("msg", "error when applying SD to autoscraper", "err", err)
301
saveError(err)
302
}
303
if err := is.sm.ApplyConfig(&prom_config.Config{ScrapeConfigs: scrapeConfigs}); err != nil {
304
level.Error(is.log).Log("msg", "error when applying jobs to scraper", "err", err)
305
saveError(err)
306
}
307
308
return firstError
309
}
310
311
func (is *instanceScraper) Stop() {
312
is.cancel()
313
<-is.exited
314
}
315
316