Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/prometheus/scrape/scrape.go
4094 views
1
package scrape
2
3
import (
4
"context"
5
"fmt"
6
"net/url"
7
"sync"
8
"time"
9
10
"github.com/alecthomas/units"
11
"github.com/go-kit/log/level"
12
"github.com/grafana/agent/component"
13
component_config "github.com/grafana/agent/component/common/config"
14
"github.com/grafana/agent/component/discovery"
15
"github.com/grafana/agent/component/prometheus"
16
"github.com/grafana/agent/pkg/build"
17
client_prometheus "github.com/prometheus/client_golang/prometheus"
18
config_util "github.com/prometheus/common/config"
19
"github.com/prometheus/common/model"
20
"github.com/prometheus/prometheus/config"
21
"github.com/prometheus/prometheus/discovery/targetgroup"
22
"github.com/prometheus/prometheus/scrape"
23
"github.com/prometheus/prometheus/storage"
24
)
25
26
func init() {
27
scrape.UserAgent = fmt.Sprintf("GrafanaAgent/%s", build.Version)
28
29
component.Register(component.Registration{
30
Name: "prometheus.scrape",
31
Args: Arguments{},
32
33
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
34
return New(opts, args.(Arguments))
35
},
36
})
37
}
38
39
// Arguments holds values which are used to configure the prometheus.scrape
40
// component.
41
type Arguments struct {
42
Targets []discovery.Target `river:"targets,attr"`
43
ForwardTo []storage.Appendable `river:"forward_to,attr"`
44
45
// The job name to override the job label with.
46
JobName string `river:"job_name,attr,optional"`
47
// Indicator whether the scraped metrics should remain unmodified.
48
HonorLabels bool `river:"honor_labels,attr,optional"`
49
// Indicator whether the scraped timestamps should be respected.
50
HonorTimestamps bool `river:"honor_timestamps,attr,optional"`
51
// A set of query parameters with which the target is scraped.
52
Params url.Values `river:"params,attr,optional"`
53
// How frequently to scrape the targets of this scrape config.
54
ScrapeInterval time.Duration `river:"scrape_interval,attr,optional"`
55
// The timeout for scraping targets of this config.
56
ScrapeTimeout time.Duration `river:"scrape_timeout,attr,optional"`
57
// The HTTP resource path on which to fetch metrics from targets.
58
MetricsPath string `river:"metrics_path,attr,optional"`
59
// The URL scheme with which to fetch metrics from targets.
60
Scheme string `river:"scheme,attr,optional"`
61
// An uncompressed response body larger than this many bytes will cause the
62
// scrape to fail. 0 means no limit.
63
BodySizeLimit units.Base2Bytes `river:"body_size_limit,attr,optional"`
64
// More than this many samples post metric-relabeling will cause the scrape
65
// to fail.
66
SampleLimit uint `river:"sample_limit,attr,optional"`
67
// More than this many targets after the target relabeling will cause the
68
// scrapes to fail.
69
TargetLimit uint `river:"target_limit,attr,optional"`
70
// More than this many labels post metric-relabeling will cause the scrape
71
// to fail.
72
LabelLimit uint `river:"label_limit,attr,optional"`
73
// More than this label name length post metric-relabeling will cause the
74
// scrape to fail.
75
LabelNameLengthLimit uint `river:"label_name_length_limit,attr,optional"`
76
// More than this label value length post metric-relabeling will cause the
77
// scrape to fail.
78
LabelValueLengthLimit uint `river:"label_value_length_limit,attr,optional"`
79
80
HTTPClientConfig component_config.HTTPClientConfig `river:",squash"`
81
82
// Scrape Options
83
ExtraMetrics bool `river:"extra_metrics,attr,optional"`
84
85
Clustering Clustering `river:"clustering,block,optional"`
86
}
87
88
// Clustering holds values that configure clustering-specific behavior.
89
type Clustering struct {
90
// TODO(@tpaschalis) Move this block to a shared place for all components using clustering.
91
Enabled bool `river:"enabled,attr"`
92
}
93
94
// DefaultArguments defines the default settings for a scrape job.
95
var DefaultArguments = Arguments{
96
MetricsPath: "/metrics",
97
Scheme: "http",
98
HonorLabels: false,
99
HonorTimestamps: true,
100
HTTPClientConfig: component_config.DefaultHTTPClientConfig,
101
ScrapeInterval: 1 * time.Minute, // From config.DefaultGlobalConfig
102
ScrapeTimeout: 10 * time.Second, // From config.DefaultGlobalConfig
103
}
104
105
// UnmarshalRiver implements river.Unmarshaler.
106
func (arg *Arguments) UnmarshalRiver(f func(interface{}) error) error {
107
*arg = DefaultArguments
108
109
type args Arguments
110
err := f((*args)(arg))
111
if err != nil {
112
return err
113
}
114
115
// We must explicitly Validate because HTTPClientConfig is squashed and it won't run otherwise
116
return arg.HTTPClientConfig.Validate()
117
}
118
119
// Component implements the prometheus.scrape component.
120
type Component struct {
121
opts component.Options
122
123
reloadTargets chan struct{}
124
125
mut sync.RWMutex
126
args Arguments
127
scraper *scrape.Manager
128
appendable *prometheus.Fanout
129
targetsGauge client_prometheus.Gauge
130
}
131
132
var (
133
_ component.Component = (*Component)(nil)
134
)
135
136
// New creates a new prometheus.scrape component.
137
func New(o component.Options, args Arguments) (*Component, error) {
138
flowAppendable := prometheus.NewFanout(args.ForwardTo, o.ID, o.Registerer)
139
scrapeOptions := &scrape.Options{
140
ExtraMetrics: args.ExtraMetrics,
141
HTTPClientOptions: []config_util.HTTPClientOption{
142
config_util.WithDialContextFunc(o.DialFunc),
143
},
144
}
145
scraper := scrape.NewManager(scrapeOptions, o.Logger, flowAppendable)
146
147
targetsGauge := client_prometheus.NewGauge(client_prometheus.GaugeOpts{
148
Name: "agent_prometheus_scrape_targets_gauge",
149
Help: "Number of targets this component is configured to scrape"})
150
err := o.Registerer.Register(targetsGauge)
151
if err != nil {
152
return nil, err
153
}
154
155
c := &Component{
156
opts: o,
157
reloadTargets: make(chan struct{}, 1),
158
scraper: scraper,
159
appendable: flowAppendable,
160
targetsGauge: targetsGauge,
161
}
162
163
// Call to Update() to set the receivers and targets once at the start.
164
if err := c.Update(args); err != nil {
165
return nil, err
166
}
167
168
return c, nil
169
}
170
171
// Run implements component.Component.
172
func (c *Component) Run(ctx context.Context) error {
173
defer c.scraper.Stop()
174
175
targetSetsChan := make(chan map[string][]*targetgroup.Group)
176
177
go func() {
178
err := c.scraper.Run(targetSetsChan)
179
level.Info(c.opts.Logger).Log("msg", "scrape manager stopped")
180
if err != nil {
181
level.Error(c.opts.Logger).Log("msg", "scrape manager failed", "err", err)
182
}
183
}()
184
185
for {
186
select {
187
case <-ctx.Done():
188
return nil
189
case <-c.reloadTargets:
190
c.mut.RLock()
191
var (
192
tgs = c.args.Targets
193
jobName = c.opts.ID
194
cl = c.args.Clustering.Enabled
195
)
196
if c.args.JobName != "" {
197
jobName = c.args.JobName
198
}
199
c.mut.RUnlock()
200
201
// NOTE(@tpaschalis) First approach, manually building the
202
// 'clustered' targets implementation every time.
203
ct := discovery.NewDistributedTargets(cl, c.opts.Clusterer.Node, tgs)
204
promTargets := c.componentTargetsToProm(jobName, ct.Get())
205
206
select {
207
case targetSetsChan <- promTargets:
208
level.Debug(c.opts.Logger).Log("msg", "passed new targets to scrape manager")
209
case <-ctx.Done():
210
}
211
}
212
}
213
}
214
215
// Update implements component.Component.
216
func (c *Component) Update(args component.Arguments) error {
217
newArgs := args.(Arguments)
218
219
c.mut.Lock()
220
defer c.mut.Unlock()
221
c.args = newArgs
222
223
c.appendable.UpdateChildren(newArgs.ForwardTo)
224
225
sc := getPromScrapeConfigs(c.opts.ID, newArgs)
226
err := c.scraper.ApplyConfig(&config.Config{
227
ScrapeConfigs: []*config.ScrapeConfig{sc},
228
})
229
if err != nil {
230
return fmt.Errorf("error applying scrape configs: %w", err)
231
}
232
level.Debug(c.opts.Logger).Log("msg", "scrape config was updated")
233
234
select {
235
case c.reloadTargets <- struct{}{}:
236
default:
237
}
238
239
c.targetsGauge.Set(float64(len(c.args.Targets)))
240
return nil
241
}
242
243
// Helper function to bridge the in-house configuration with the Prometheus
244
// scrape_config.
245
// As explained in the Config struct, the following fields are purposefully
246
// missing out, as they're being implemented by another components.
247
// - RelabelConfigs
248
// - MetricsRelabelConfigs
249
// - ServiceDiscoveryConfigs
250
func getPromScrapeConfigs(jobName string, c Arguments) *config.ScrapeConfig {
251
dec := config.DefaultScrapeConfig
252
if c.JobName != "" {
253
dec.JobName = c.JobName
254
} else {
255
dec.JobName = jobName
256
}
257
dec.HonorLabels = c.HonorLabels
258
dec.HonorTimestamps = c.HonorTimestamps
259
dec.Params = c.Params
260
dec.ScrapeInterval = model.Duration(c.ScrapeInterval)
261
dec.ScrapeTimeout = model.Duration(c.ScrapeTimeout)
262
dec.MetricsPath = c.MetricsPath
263
dec.Scheme = c.Scheme
264
dec.BodySizeLimit = c.BodySizeLimit
265
dec.SampleLimit = c.SampleLimit
266
dec.TargetLimit = c.TargetLimit
267
dec.LabelLimit = c.LabelLimit
268
dec.LabelNameLengthLimit = c.LabelNameLengthLimit
269
dec.LabelValueLengthLimit = c.LabelValueLengthLimit
270
271
// HTTP scrape client settings
272
dec.HTTPClientConfig = *c.HTTPClientConfig.Convert()
273
return &dec
274
}
275
276
// ScraperStatus reports the status of the scraper's jobs.
277
type ScraperStatus struct {
278
TargetStatus []TargetStatus `river:"target,block,optional"`
279
}
280
281
// TargetStatus reports on the status of the latest scrape for a target.
282
type TargetStatus struct {
283
JobName string `river:"job,attr"`
284
URL string `river:"url,attr"`
285
Health string `river:"health,attr"`
286
Labels map[string]string `river:"labels,attr"`
287
LastError string `river:"last_error,attr,optional"`
288
LastScrape time.Time `river:"last_scrape,attr"`
289
LastScrapeDuration time.Duration `river:"last_scrape_duration,attr,optional"`
290
}
291
292
// BuildTargetStatuses transforms the targets from a scrape manager into our internal status type for debug info.
293
func BuildTargetStatuses(targets map[string][]*scrape.Target) []TargetStatus {
294
var res []TargetStatus
295
296
for job, stt := range targets {
297
for _, st := range stt {
298
var lastError string
299
if st.LastError() != nil {
300
lastError = st.LastError().Error()
301
}
302
if st != nil {
303
res = append(res, TargetStatus{
304
JobName: job,
305
URL: st.URL().String(),
306
Health: string(st.Health()),
307
Labels: st.Labels().Map(),
308
LastError: lastError,
309
LastScrape: st.LastScrape(),
310
LastScrapeDuration: st.LastScrapeDuration(),
311
})
312
}
313
}
314
}
315
return res
316
}
317
318
// DebugInfo implements component.DebugComponent
319
func (c *Component) DebugInfo() interface{} {
320
return ScraperStatus{
321
TargetStatus: BuildTargetStatuses(c.scraper.TargetsActive()),
322
}
323
}
324
325
// ClusterUpdatesRegistration implements component.ClusterComponent.
326
func (c *Component) ClusterUpdatesRegistration() bool {
327
c.mut.RLock()
328
defer c.mut.RUnlock()
329
return c.args.Clustering.Enabled
330
}
331
332
func (c *Component) componentTargetsToProm(jobName string, tgs []discovery.Target) map[string][]*targetgroup.Group {
333
promGroup := &targetgroup.Group{Source: jobName}
334
for _, tg := range tgs {
335
promGroup.Targets = append(promGroup.Targets, convertLabelSet(tg))
336
}
337
338
return map[string][]*targetgroup.Group{jobName: {promGroup}}
339
}
340
341
func convertLabelSet(tg discovery.Target) model.LabelSet {
342
lset := make(model.LabelSet, len(tg))
343
for k, v := range tg {
344
lset[model.LabelName(k)] = model.LabelValue(v)
345
}
346
return lset
347
}
348
349