Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/phlare/scrape/scrape.go
4096 views
1
package scrape
2
3
import (
4
"context"
5
"fmt"
6
"net/url"
7
"sync"
8
"time"
9
10
"github.com/go-kit/log/level"
11
"github.com/prometheus/common/model"
12
"github.com/prometheus/prometheus/discovery/targetgroup"
13
14
"github.com/grafana/agent/component"
15
component_config "github.com/grafana/agent/component/common/config"
16
"github.com/grafana/agent/component/discovery"
17
"github.com/grafana/agent/component/phlare"
18
"github.com/grafana/agent/component/prometheus/scrape"
19
)
20
21
const (
22
pprofMemory string = "memory"
23
pprofBlock string = "block"
24
pprofGoroutine string = "goroutine"
25
pprofMutex string = "mutex"
26
pprofProcessCPU string = "process_cpu"
27
pprofFgprof string = "fgprof"
28
)
29
30
func init() {
31
component.Register(component.Registration{
32
Name: "phlare.scrape",
33
Args: Arguments{},
34
35
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
36
return New(opts, args.(Arguments))
37
},
38
})
39
}
40
41
// Arguments holds values which are used to configure the pprof.scrape
42
// component.
43
type Arguments struct {
44
Targets []discovery.Target `river:"targets,attr"`
45
ForwardTo []phlare.Appendable `river:"forward_to,attr"`
46
47
// The job name to override the job label with.
48
JobName string `river:"job_name,attr,optional"`
49
// A set of query parameters with which the target is scraped.
50
Params url.Values `river:"params,attr,optional"`
51
// How frequently to scrape the targets of this scrape config.
52
ScrapeInterval time.Duration `river:"scrape_interval,attr,optional"`
53
// The timeout for scraping targets of this config.
54
ScrapeTimeout time.Duration `river:"scrape_timeout,attr,optional"`
55
// The URL scheme with which to fetch metrics from targets.
56
Scheme string `river:"scheme,attr,optional"`
57
58
// todo(ctovena): add support for limits.
59
// // An uncompressed response body larger than this many bytes will cause the
60
// // scrape to fail. 0 means no limit.
61
// BodySizeLimit units.Base2Bytes `river:"body_size_limit,attr,optional"`
62
// // More than this many targets after the target relabeling will cause the
63
// // scrapes to fail.
64
// TargetLimit uint `river:"target_limit,attr,optional"`
65
// // More than this many labels post metric-relabeling will cause the scrape
66
// // to fail.
67
// LabelLimit uint `river:"label_limit,attr,optional"`
68
// // More than this label name length post metric-relabeling will cause the
69
// // scrape to fail.
70
// LabelNameLengthLimit uint `river:"label_name_length_limit,attr,optional"`
71
// // More than this label value length post metric-relabeling will cause the
72
// // scrape to fail.
73
// LabelValueLengthLimit uint `river:"label_value_length_limit,attr,optional"`
74
75
HTTPClientConfig component_config.HTTPClientConfig `river:",squash"`
76
77
ProfilingConfig ProfilingConfig `river:"profiling_config,block,optional"`
78
79
Clustering scrape.Clustering `river:"clustering,block,optional"`
80
}
81
82
type ProfilingConfig struct {
83
Memory ProfilingTarget `river:"profile.memory,block,optional"`
84
Block ProfilingTarget `river:"profile.block,block,optional"`
85
Goroutine ProfilingTarget `river:"profile.goroutine,block,optional"`
86
Mutex ProfilingTarget `river:"profile.mutex,block,optional"`
87
ProcessCPU ProfilingTarget `river:"profile.process_cpu,block,optional"`
88
FGProf ProfilingTarget `river:"profile.fgprof,block,optional"`
89
Custom []CustomProfilingTarget `river:"profile.custom,block,optional"`
90
91
PprofPrefix string `river:"path_prefix,attr,optional"`
92
}
93
94
// AllTargets returns the set of all standard and custom profiling targets,
95
// regardless of whether they're enabled. The key in the map indicates the name
96
// of the target.
97
func (cfg ProfilingConfig) AllTargets() map[string]ProfilingTarget {
98
targets := map[string]ProfilingTarget{
99
pprofMemory: cfg.Memory,
100
pprofBlock: cfg.Block,
101
pprofGoroutine: cfg.Goroutine,
102
pprofMutex: cfg.Mutex,
103
pprofProcessCPU: cfg.ProcessCPU,
104
pprofFgprof: cfg.FGProf,
105
}
106
107
for _, custom := range cfg.Custom {
108
targets[custom.Name] = ProfilingTarget{
109
Enabled: custom.Enabled,
110
Path: custom.Path,
111
Delta: custom.Delta,
112
}
113
}
114
115
return targets
116
}
117
118
var DefaultProfilingConfig = ProfilingConfig{
119
Memory: ProfilingTarget{
120
Enabled: true,
121
Path: "/debug/pprof/allocs",
122
},
123
Block: ProfilingTarget{
124
Enabled: true,
125
Path: "/debug/pprof/block",
126
},
127
Goroutine: ProfilingTarget{
128
Enabled: true,
129
Path: "/debug/pprof/goroutine",
130
},
131
Mutex: ProfilingTarget{
132
Enabled: true,
133
Path: "/debug/pprof/mutex",
134
},
135
ProcessCPU: ProfilingTarget{
136
Enabled: true,
137
Path: "/debug/pprof/profile",
138
Delta: true,
139
},
140
FGProf: ProfilingTarget{
141
Enabled: false,
142
Path: "/debug/fgprof",
143
Delta: true,
144
},
145
}
146
147
// UnmarshalRiver implements river.Unmarshaler and applies defaults before
148
// unmarshaling.
149
func (cfg *ProfilingConfig) UnmarshalRiver(f func(interface{}) error) error {
150
*cfg = DefaultProfilingConfig
151
152
type args ProfilingConfig
153
if err := f((*args)(cfg)); err != nil {
154
return err
155
}
156
157
return nil
158
}
159
160
type ProfilingTarget struct {
161
Enabled bool `river:"enabled,attr,optional"`
162
Path string `river:"path,attr,optional"`
163
Delta bool `river:"delta,attr,optional"`
164
}
165
166
type CustomProfilingTarget struct {
167
Enabled bool `river:"enabled,attr"`
168
Path string `river:"path,attr"`
169
Delta bool `river:"delta,attr,optional"`
170
Name string `river:",label"`
171
}
172
173
var DefaultArguments = NewDefaultArguments()
174
175
// NewDefaultArguments create the default settings for a scrape job.
176
func NewDefaultArguments() Arguments {
177
return Arguments{
178
Scheme: "http",
179
HTTPClientConfig: component_config.DefaultHTTPClientConfig,
180
ScrapeInterval: 15 * time.Second,
181
ScrapeTimeout: 15*time.Second + (3 * time.Second),
182
ProfilingConfig: DefaultProfilingConfig,
183
}
184
}
185
186
// UnmarshalRiver implements river.Unmarshaler.
187
func (arg *Arguments) UnmarshalRiver(f func(interface{}) error) error {
188
*arg = NewDefaultArguments()
189
190
type args Arguments
191
if err := f((*args)(arg)); err != nil {
192
return err
193
}
194
195
if arg.ScrapeTimeout <= 0 {
196
return fmt.Errorf("scrape_timeout must be greater than 0")
197
}
198
if arg.ScrapeTimeout <= arg.ScrapeInterval {
199
return fmt.Errorf("scrape_timeout must be greater than scrape_interval")
200
}
201
202
if cfg, ok := arg.ProfilingConfig.ProcessCPU, true; ok {
203
if cfg.Enabled && arg.ScrapeTimeout < time.Second*2 {
204
return fmt.Errorf("%v scrape_timeout must be at least 2 seconds", pprofProcessCPU)
205
}
206
}
207
208
// We must explicitly Validate because HTTPClientConfig is squashed and it won't run otherwise
209
return arg.HTTPClientConfig.Validate()
210
}
211
212
// Component implements the pprof.scrape component.
213
type Component struct {
214
opts component.Options
215
216
reloadTargets chan struct{}
217
218
mut sync.RWMutex
219
args Arguments
220
scraper *Manager
221
appendable *phlare.Fanout
222
}
223
224
var _ component.Component = (*Component)(nil)
225
226
// New creates a new pprof.scrape component.
227
func New(o component.Options, args Arguments) (*Component, error) {
228
flowAppendable := phlare.NewFanout(args.ForwardTo, o.ID, o.Registerer)
229
scraper := NewManager(flowAppendable, o.Logger)
230
c := &Component{
231
opts: o,
232
reloadTargets: make(chan struct{}, 1),
233
scraper: scraper,
234
appendable: flowAppendable,
235
}
236
237
// Call to Update() to set the receivers and targets once at the start.
238
if err := c.Update(args); err != nil {
239
return nil, err
240
}
241
242
return c, nil
243
}
244
245
// Run implements component.Component.
246
func (c *Component) Run(ctx context.Context) error {
247
defer c.scraper.Stop()
248
249
targetSetsChan := make(chan map[string][]*targetgroup.Group)
250
251
go func() {
252
c.scraper.Run(targetSetsChan)
253
level.Info(c.opts.Logger).Log("msg", "scrape manager stopped")
254
}()
255
256
for {
257
select {
258
case <-ctx.Done():
259
return nil
260
case <-c.reloadTargets:
261
c.mut.RLock()
262
var (
263
tgs = c.args.Targets
264
jobName = c.opts.ID
265
clustering = c.args.Clustering.Enabled
266
)
267
if c.args.JobName != "" {
268
jobName = c.args.JobName
269
}
270
c.mut.RUnlock()
271
272
// NOTE(@tpaschalis) First approach, manually building the
273
// 'clustered' targets implementation every time.
274
ct := discovery.NewDistributedTargets(clustering, c.opts.Clusterer.Node, tgs)
275
promTargets := c.componentTargetsToProm(jobName, ct.Get())
276
277
select {
278
case targetSetsChan <- promTargets:
279
level.Debug(c.opts.Logger).Log("msg", "passed new targets to scrape manager")
280
case <-ctx.Done():
281
return nil
282
}
283
}
284
}
285
}
286
287
// Update implements component.Component.
288
func (c *Component) Update(args component.Arguments) error {
289
newArgs := args.(Arguments)
290
291
c.mut.Lock()
292
defer c.mut.Unlock()
293
c.args = newArgs
294
295
c.appendable.UpdateChildren(newArgs.ForwardTo)
296
297
err := c.scraper.ApplyConfig(newArgs)
298
if err != nil {
299
return fmt.Errorf("error applying scrape configs: %w", err)
300
}
301
level.Debug(c.opts.Logger).Log("msg", "scrape config was updated")
302
303
select {
304
case c.reloadTargets <- struct{}{}:
305
default:
306
}
307
308
return nil
309
}
310
311
func (c *Component) componentTargetsToProm(jobName string, tgs []discovery.Target) map[string][]*targetgroup.Group {
312
promGroup := &targetgroup.Group{Source: jobName}
313
for _, tg := range tgs {
314
promGroup.Targets = append(promGroup.Targets, convertLabelSet(tg))
315
}
316
317
return map[string][]*targetgroup.Group{jobName: {promGroup}}
318
}
319
320
func convertLabelSet(tg discovery.Target) model.LabelSet {
321
lset := make(model.LabelSet, len(tg))
322
for k, v := range tg {
323
lset[model.LabelName(k)] = model.LabelValue(v)
324
}
325
return lset
326
}
327
328
// ClusterUpdatesRegistration implements component.ClusterComponent.
329
func (c *Component) ClusterUpdatesRegistration() bool {
330
c.mut.RLock()
331
defer c.mut.RUnlock()
332
return c.args.Clustering.Enabled
333
}
334
335
// DebugInfo implements component.DebugComponent.
336
func (c *Component) DebugInfo() interface{} {
337
var res []scrape.TargetStatus
338
339
for job, stt := range c.scraper.TargetsActive() {
340
for _, st := range stt {
341
var lastError string
342
if st.LastError() != nil {
343
lastError = st.LastError().Error()
344
}
345
if st != nil {
346
res = append(res, scrape.TargetStatus{
347
JobName: job,
348
URL: st.URL().String(),
349
Health: string(st.Health()),
350
Labels: st.discoveredLabels.Map(),
351
LastError: lastError,
352
LastScrape: st.LastScrape(),
353
LastScrapeDuration: st.LastScrapeDuration(),
354
})
355
}
356
}
357
}
358
359
return scrape.ScraperStatus{TargetStatus: res}
360
}
361
362