Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/phlare/scrape/target.go
4096 views
1
// Copyright 2022 The Parca Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13
14
package scrape
15
16
import (
17
"errors"
18
"fmt"
19
"hash/fnv"
20
"net"
21
"net/url"
22
"strconv"
23
"strings"
24
"sync"
25
"time"
26
27
"github.com/prometheus/common/model"
28
"github.com/prometheus/prometheus/config"
29
"github.com/prometheus/prometheus/discovery/targetgroup"
30
"github.com/prometheus/prometheus/model/labels"
31
"github.com/prometheus/prometheus/model/relabel"
32
)
33
34
// TargetHealth describes the health state of a target.
35
type TargetHealth string
36
37
// The possible health states of a target based on the last performed scrape.
38
const (
39
HealthUnknown TargetHealth = "unknown"
40
HealthGood TargetHealth = "up"
41
HealthBad TargetHealth = "down"
42
)
43
44
// Target refers to a singular HTTP or HTTPS endpoint.
45
type Target struct {
46
// Labels before any processing.
47
discoveredLabels labels.Labels
48
// Any labels that are added to this target and its metrics.
49
labels labels.Labels
50
// Additional URL parameters that are part of the target URL.
51
params url.Values
52
53
mtx sync.RWMutex
54
lastError error
55
lastScrape time.Time
56
lastScrapeDuration time.Duration
57
health TargetHealth
58
}
59
60
// NewTarget creates a reasonably configured target for querying.
61
func NewTarget(labels, discoveredLabels labels.Labels, params url.Values) *Target {
62
return &Target{
63
labels: labels,
64
discoveredLabels: discoveredLabels,
65
params: params,
66
health: HealthUnknown,
67
}
68
}
69
70
func (t *Target) String() string {
71
return t.URL().String()
72
}
73
74
// hash returns an identifying hash for the target.
75
func (t *Target) hash() uint64 {
76
h := fnv.New64a()
77
_, _ = h.Write([]byte(fmt.Sprintf("%016d", t.Labels().Hash())))
78
_, _ = h.Write([]byte(t.URL().String()))
79
return h.Sum64()
80
}
81
82
// offset returns the time until the next scrape cycle for the target.
83
func (t *Target) offset(interval time.Duration) time.Duration {
84
now := time.Now().UnixNano()
85
86
var (
87
base = now % int64(interval)
88
offset = t.hash() % uint64(interval)
89
next = base + int64(offset)
90
)
91
92
if next > int64(interval) {
93
next -= int64(interval)
94
}
95
return time.Duration(next)
96
}
97
98
// Params returns a copy of the set of all public params of the target.
99
func (t *Target) Params() url.Values {
100
q := make(url.Values, len(t.params))
101
for k, values := range t.params {
102
q[k] = make([]string, len(values))
103
copy(q[k], values)
104
}
105
return q
106
}
107
108
// Labels returns a copy of the set of all public labels of the target.
109
func (t *Target) Labels() labels.Labels {
110
lset := make(labels.Labels, 0, len(t.labels))
111
for _, l := range t.labels {
112
if !strings.HasPrefix(l.Name, model.ReservedLabelPrefix) {
113
lset = append(lset, l)
114
}
115
}
116
return lset
117
}
118
119
// DiscoveredLabels returns a copy of the target's labels before any processing.
120
func (t *Target) DiscoveredLabels() labels.Labels {
121
t.mtx.RLock()
122
defer t.mtx.RUnlock()
123
lset := make(labels.Labels, len(t.discoveredLabels))
124
copy(lset, t.discoveredLabels)
125
return lset
126
}
127
128
// Clone returns a clone of the target.
129
func (t *Target) Clone() *Target {
130
return NewTarget(
131
t.Labels(),
132
t.DiscoveredLabels(),
133
t.Params(),
134
)
135
}
136
137
// SetDiscoveredLabels sets new DiscoveredLabels.
138
func (t *Target) SetDiscoveredLabels(l labels.Labels) {
139
t.mtx.Lock()
140
defer t.mtx.Unlock()
141
t.discoveredLabels = l
142
}
143
144
// URL returns a copy of the target's URL.
145
func (t *Target) URL() *url.URL {
146
params := url.Values{}
147
148
for k, v := range t.params {
149
params[k] = make([]string, len(v))
150
copy(params[k], v)
151
}
152
for _, l := range t.labels {
153
if !strings.HasPrefix(l.Name, model.ParamLabelPrefix) {
154
continue
155
}
156
ks := l.Name[len(model.ParamLabelPrefix):]
157
158
if len(params[ks]) > 0 {
159
params[ks][0] = l.Value
160
} else {
161
params[ks] = []string{l.Value}
162
}
163
}
164
165
return &url.URL{
166
Scheme: t.labels.Get(model.SchemeLabel),
167
Host: t.labels.Get(model.AddressLabel),
168
Path: t.labels.Get(ProfilePath),
169
RawQuery: params.Encode(),
170
}
171
}
172
173
// LastError returns the error encountered during the last scrape.
174
func (t *Target) LastError() error {
175
t.mtx.RLock()
176
defer t.mtx.RUnlock()
177
178
return t.lastError
179
}
180
181
// LastScrape returns the time of the last scrape.
182
func (t *Target) LastScrape() time.Time {
183
t.mtx.RLock()
184
defer t.mtx.RUnlock()
185
186
return t.lastScrape
187
}
188
189
// LastScrapeDuration returns how long the last scrape of the target took.
190
func (t *Target) LastScrapeDuration() time.Duration {
191
t.mtx.RLock()
192
defer t.mtx.RUnlock()
193
194
return t.lastScrapeDuration
195
}
196
197
// Health returns the last known health state of the target.
198
func (t *Target) Health() TargetHealth {
199
t.mtx.RLock()
200
defer t.mtx.RUnlock()
201
202
return t.health
203
}
204
205
// LabelsByProfiles returns the labels for a given ProfilingConfig.
206
func LabelsByProfiles(lset labels.Labels, c *ProfilingConfig) []labels.Labels {
207
res := []labels.Labels{}
208
add := func(profileType string, cfgs ...ProfilingTarget) {
209
for _, p := range cfgs {
210
if p.Enabled {
211
l := lset.Copy()
212
l = append(l, labels.Label{Name: ProfilePath, Value: p.Path}, labels.Label{Name: ProfileName, Value: profileType})
213
res = append(res, l)
214
}
215
}
216
}
217
218
for profilingType, profilingConfig := range c.AllTargets() {
219
add(profilingType, profilingConfig)
220
}
221
222
return res
223
}
224
225
// Targets is a sortable list of targets.
226
type Targets []*Target
227
228
func (ts Targets) Len() int { return len(ts) }
229
func (ts Targets) Less(i, j int) bool { return ts[i].URL().String() < ts[j].URL().String() }
230
func (ts Targets) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }
231
232
const (
233
ProfilePath = "__profile_path__"
234
ProfileName = "__name__"
235
ProfileTraceType = "trace"
236
)
237
238
// populateLabels builds a label set from the given label set and scrape configuration.
239
// It returns a label set before relabeling was applied as the second return value.
240
// Returns the original discovered label set found before relabelling was applied if the target is dropped during relabeling.
241
func populateLabels(lset labels.Labels, cfg Arguments) (res, orig labels.Labels, err error) {
242
// Copy labels into the labelset for the target if they are not set already.
243
scrapeLabels := []labels.Label{
244
{Name: model.JobLabel, Value: cfg.JobName},
245
{Name: model.SchemeLabel, Value: cfg.Scheme},
246
}
247
lb := labels.NewBuilder(lset)
248
249
for _, l := range scrapeLabels {
250
if lv := lset.Get(l.Name); lv == "" {
251
lb.Set(l.Name, l.Value)
252
}
253
}
254
// Encode scrape query parameters as labels.
255
for k, v := range cfg.Params {
256
if len(v) > 0 {
257
lb.Set(model.ParamLabelPrefix+k, v[0])
258
}
259
}
260
261
preRelabelLabels := lb.Labels(nil)
262
// todo(ctovena): add relabeling after pprof discovery.
263
// lset = relabel.Process(preRelabelLabels, cfg.RelabelConfigs...)
264
lset, keep := relabel.Process(preRelabelLabels)
265
266
// Check if the target was dropped.
267
if !keep {
268
return nil, preRelabelLabels, nil
269
}
270
if v := lset.Get(model.AddressLabel); v == "" {
271
return nil, nil, errors.New("no address")
272
}
273
274
if v := lset.Get(model.AddressLabel); v == "" {
275
return nil, nil, fmt.Errorf("no address")
276
}
277
278
lb = labels.NewBuilder(lset)
279
280
// addPort checks whether we should add a default port to the address.
281
// If the address is not valid, we don't append a port either.
282
addPort := func(s string) bool {
283
// If we can split, a port exists and we don't have to add one.
284
if _, _, err := net.SplitHostPort(s); err == nil {
285
return false
286
}
287
// If adding a port makes it valid, the previous error
288
// was not due to an invalid address and we can append a port.
289
_, _, err := net.SplitHostPort(s + ":1234")
290
return err == nil
291
}
292
addr := lset.Get(model.AddressLabel)
293
// If it's an address with no trailing port, infer it based on the used scheme.
294
if addPort(addr) {
295
// Addresses reaching this point are already wrapped in [] if necessary.
296
switch lset.Get(model.SchemeLabel) {
297
case "http", "":
298
addr = addr + ":80"
299
case "https":
300
addr = addr + ":443"
301
default:
302
return nil, nil, fmt.Errorf("invalid scheme: %q", cfg.Scheme)
303
}
304
lb.Set(model.AddressLabel, addr)
305
}
306
307
if err := config.CheckTargetAddress(model.LabelValue(addr)); err != nil {
308
return nil, nil, err
309
}
310
311
// Meta labels are deleted after relabelling. Other internal labels propagate to
312
// the target which decides whether they will be part of their label set.
313
for _, l := range lset {
314
if strings.HasPrefix(l.Name, model.MetaLabelPrefix) {
315
lb.Del(l.Name)
316
}
317
}
318
319
// Default the instance label to the target address.
320
if v := lset.Get(model.InstanceLabel); v == "" {
321
lb.Set(model.InstanceLabel, addr)
322
}
323
324
res = lb.Labels(nil)
325
for _, l := range res {
326
// Check label values are valid, drop the target if not.
327
if !model.LabelValue(l.Value).IsValid() {
328
return nil, nil, fmt.Errorf("invalid label value for %q: %q", l.Name, l.Value)
329
}
330
}
331
332
return res, lset, nil
333
}
334
335
// targetsFromGroup builds targets based on the given TargetGroup and config.
336
func targetsFromGroup(group *targetgroup.Group, cfg Arguments) ([]*Target, []*Target, error) {
337
var (
338
targets = make([]*Target, 0, len(group.Targets))
339
droppedTargets = make([]*Target, 0, len(group.Targets))
340
)
341
342
for i, tlset := range group.Targets {
343
lbls := make([]labels.Label, 0, len(tlset)+len(group.Labels))
344
345
for ln, lv := range tlset {
346
lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)})
347
}
348
for ln, lv := range group.Labels {
349
if _, ok := tlset[ln]; !ok {
350
lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)})
351
}
352
}
353
354
lset := labels.New(lbls...)
355
lsets := LabelsByProfiles(lset, &cfg.ProfilingConfig)
356
357
for _, lset := range lsets {
358
var profType string
359
for _, label := range lset {
360
if label.Name == ProfileName {
361
profType = label.Value
362
}
363
}
364
lbls, origLabels, err := populateLabels(lset, cfg)
365
if err != nil {
366
return nil, nil, fmt.Errorf("instance %d in group %s: %s", i, group, err)
367
}
368
// This is a dropped target, according to the current return behaviour of populateLabels
369
if lbls == nil && origLabels != nil {
370
// ensure we get the full url path for dropped targets
371
params := cfg.Params
372
if params == nil {
373
params = url.Values{}
374
}
375
lbls = append(lbls, labels.Label{Name: model.AddressLabel, Value: lset.Get(model.AddressLabel)})
376
lbls = append(lbls, labels.Label{Name: model.SchemeLabel, Value: cfg.Scheme})
377
lbls = append(lbls, labels.Label{Name: ProfilePath, Value: lset.Get(ProfilePath)})
378
// Encode scrape query parameters as labels.
379
for k, v := range cfg.Params {
380
if len(v) > 0 {
381
lbls = append(lbls, labels.Label{Name: model.ParamLabelPrefix + k, Value: v[0]})
382
}
383
}
384
droppedTargets = append(droppedTargets, NewTarget(lbls, origLabels, params))
385
continue
386
}
387
if lbls != nil || origLabels != nil {
388
params := cfg.Params
389
if params == nil {
390
params = url.Values{}
391
}
392
393
if pcfg, found := cfg.ProfilingConfig.AllTargets()[profType]; found && pcfg.Delta {
394
params.Add("seconds", strconv.Itoa(int((cfg.ScrapeInterval)/time.Second)-1))
395
}
396
targets = append(targets, NewTarget(lbls, origLabels, params))
397
}
398
}
399
}
400
401
return targets, droppedTargets, nil
402
}
403
404