Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/crow/crow.go
4094 views
1
// Package crow implements a correctness checker tool similar to Loki Canary.
2
// Inspired by Cortex test-exporter.
3
package crow
4
5
import (
6
"context"
7
"errors"
8
"flag"
9
"fmt"
10
"math"
11
"math/rand"
12
"net/http"
13
"strings"
14
"sync"
15
"time"
16
17
"github.com/go-kit/log"
18
"github.com/go-kit/log/level"
19
"github.com/opentracing-contrib/go-stdlib/nethttp"
20
"github.com/prometheus/client_golang/api"
21
promapi "github.com/prometheus/client_golang/api/prometheus/v1"
22
"github.com/prometheus/client_golang/prometheus"
23
"github.com/prometheus/client_golang/prometheus/promhttp"
24
commonCfg "github.com/prometheus/common/config"
25
"github.com/prometheus/common/model"
26
"github.com/weaveworks/common/user"
27
)
28
29
// Config for the Crow metrics checker.
30
type Config struct {
31
PrometheusAddr string // Base URL of Prometheus server
32
NumSamples int // Number of samples to generate
33
UserID string // User ID to use for auth when querying.
34
PasswordFile string // Password File for auth when querying.
35
ExtraSelectors string // Extra selectors for queries, i.e., cluster="prod"
36
OrgID string // Org ID to inject in X-Org-ScopeID header when querying.
37
38
// Querying Params
39
40
QueryTimeout time.Duration // Timeout for querying
41
QueryDuration time.Duration // Time before and after sample to search
42
QueryStep time.Duration // Step between samples in search
43
44
// Validation Params
45
46
MaxValidations int // Maximum amount of times to search for a sample
47
MaxTimestampDelta time.Duration // Maximum timestamp delta to use for validating.
48
ValueEpsilon float64 // Maximum epsilon to use for validating.
49
50
// Logger to use. If nil, logs will be discarded.
51
Log log.Logger
52
}
53
54
// RegisterFlags registers flags for the config to the given FlagSet.
55
func (c *Config) RegisterFlags(f *flag.FlagSet) {
56
c.RegisterFlagsWithPrefix(f, "")
57
}
58
59
// RegisterFlagsWithPrefix registers flags for the config to the given FlagSet and
60
// prefixing each flag with the given prefix. prefix, if non-empty, should end
61
// in `.`.
62
func (c *Config) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
63
f.StringVar(&c.PrometheusAddr, prefix+"prometheus-addr", DefaultConfig.PrometheusAddr, "Root URL of the Prometheus API to query against")
64
f.IntVar(&c.NumSamples, prefix+"generate-samples", DefaultConfig.NumSamples, "Number of samples to generate when being scraped")
65
f.StringVar(&c.UserID, prefix+"user-id", DefaultConfig.UserID, "UserID to use with basic auth.")
66
f.StringVar(&c.PasswordFile, prefix+"password-file", DefaultConfig.PasswordFile, "Password file to use with basic auth.")
67
f.StringVar(&c.ExtraSelectors, prefix+"extra-selectors", DefaultConfig.ExtraSelectors, "Extra selectors to include in queries, useful for identifying different instances of this job.")
68
f.StringVar(&c.OrgID, prefix+"org-id", DefaultConfig.OrgID, "Org ID to inject in X-Org-ScopeID header when querying. Useful for querying multi-tenated Cortex directly.")
69
70
f.DurationVar(&c.QueryTimeout, prefix+"query-timeout", DefaultConfig.QueryTimeout, "timeout for querying")
71
f.DurationVar(&c.QueryDuration, prefix+"query-duration", DefaultConfig.QueryDuration, "time before and after sample to search")
72
f.DurationVar(&c.QueryStep, prefix+"query-step", DefaultConfig.QueryStep, "step between samples when searching")
73
74
f.IntVar(&c.MaxValidations, prefix+"max-validations", DefaultConfig.MaxValidations, "Maximum number of times to try validating a sample")
75
f.DurationVar(&c.MaxTimestampDelta, prefix+"max-timestamp-delta", DefaultConfig.MaxTimestampDelta, "maximum difference from the stored timestamp from the validating sample to allow")
76
f.Float64Var(&c.ValueEpsilon, prefix+"sample-epsilon", DefaultConfig.ValueEpsilon, "maximum difference from the stored value from the validating sample to allow")
77
}
78
79
// DefaultConfig holds defaults for Crow settings.
80
var DefaultConfig = Config{
81
MaxValidations: 5,
82
NumSamples: 10,
83
84
QueryTimeout: 150 * time.Millisecond,
85
QueryDuration: 2 * time.Second,
86
QueryStep: 100 * time.Millisecond,
87
88
// MaxTimestampDelta is set to 750ms to allow some buffer for a slow network
89
// before the scrape goes through.
90
MaxTimestampDelta: 750 * time.Millisecond,
91
ValueEpsilon: 0.0001,
92
}
93
94
// Crow is a correctness checker that validates scraped metrics reach a
95
// Prometheus-compatible server with the same values and roughly the same
96
// timestamp.
97
//
98
// Crow exposes two sets of metrics:
99
//
100
// 1. Test metrics, where each scrape generates a validation job.
101
// 2. State metrics, exposing state of the Crow checker itself.
102
//
103
// These two metrics should be exposed via different endpoints, and only state
104
// metrics are safe to be manually collected from.
105
//
106
// Collecting from the set of test metrics generates a validation job, where
107
// Crow will query the Prometheus API to ensure the metrics that were scraped
108
// were written with (approximately) the same timestamp as the scrape time
109
// and with (approximately) the same floating point values exposed in the
110
// scrape.
111
//
112
// If a set of test metrics were not found and retries have been exhausted,
113
// or if the metrics were found but the values did not match, the error
114
// counter will increase.
115
type Crow struct {
116
cfg Config
117
m *metrics
118
119
promClient promapi.API
120
121
wg sync.WaitGroup
122
quit chan struct{}
123
124
pendingMtx sync.Mutex
125
pending []*sample
126
sampleCh chan []*sample
127
}
128
129
// New creates a new Crow.
130
func New(cfg Config) (*Crow, error) {
131
c, err := newCrow(cfg)
132
if err != nil {
133
return nil, err
134
}
135
136
c.wg.Add(1)
137
go c.runLoop()
138
return c, nil
139
}
140
141
func newCrow(cfg Config) (*Crow, error) {
142
if cfg.Log == nil {
143
cfg.Log = log.NewNopLogger()
144
}
145
146
if cfg.PrometheusAddr == "" {
147
return nil, fmt.Errorf("Crow must be configured with a URL to use for querying Prometheus")
148
}
149
150
apiCfg := api.Config{
151
Address: cfg.PrometheusAddr,
152
RoundTripper: api.DefaultRoundTripper,
153
}
154
if cfg.UserID != "" && cfg.PasswordFile != "" {
155
apiCfg.RoundTripper = commonCfg.NewBasicAuthRoundTripper(cfg.UserID, "", cfg.PasswordFile, api.DefaultRoundTripper)
156
}
157
if cfg.OrgID != "" {
158
apiCfg.RoundTripper = &nethttp.Transport{
159
RoundTripper: promhttp.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
160
_ = user.InjectOrgIDIntoHTTPRequest(user.InjectOrgID(context.Background(), cfg.OrgID), req)
161
return apiCfg.RoundTripper.RoundTrip(req)
162
}),
163
}
164
}
165
166
cli, err := api.NewClient(apiCfg)
167
if err != nil {
168
return nil, fmt.Errorf("failed to create prometheus client: %w", err)
169
}
170
171
c := &Crow{
172
cfg: cfg,
173
m: newMetrics(),
174
promClient: promapi.NewAPI(cli),
175
176
quit: make(chan struct{}),
177
178
sampleCh: make(chan []*sample),
179
}
180
return c, nil
181
}
182
183
func (c *Crow) runLoop() {
184
defer c.wg.Done()
185
186
ticker := time.NewTicker(250 * time.Millisecond)
187
defer ticker.Stop()
188
189
for {
190
select {
191
case <-c.quit:
192
return
193
case samples := <-c.sampleCh:
194
c.m.totalScrapes.Inc()
195
c.m.totalSamples.Add(float64(len(samples)))
196
197
c.appendSamples(samples)
198
case <-ticker.C:
199
c.checkPending()
200
}
201
}
202
}
203
204
// appendSamples queues samples to be checked.
205
func (c *Crow) appendSamples(samples []*sample) {
206
c.pendingMtx.Lock()
207
defer c.pendingMtx.Unlock()
208
c.pending = append(c.pending, samples...)
209
c.m.pendingSets.Set(float64(len(c.pending)))
210
}
211
212
// checkPending iterates over all pending samples. Samples that are ready
213
// are immediately validated. Samples are requeued if they're not ready or
214
// not found during validation.
215
func (c *Crow) checkPending() {
216
c.pendingMtx.Lock()
217
defer c.pendingMtx.Unlock()
218
219
now := time.Now().UTC()
220
221
requeued := []*sample{}
222
for _, s := range c.pending {
223
if !s.Ready(now) {
224
requeued = append(requeued, s)
225
continue
226
}
227
228
err := c.validate(s)
229
if err == nil {
230
c.m.totalResults.WithLabelValues("success").Inc()
231
continue
232
}
233
234
s.ValidationAttempt++
235
if s.ValidationAttempt < c.cfg.MaxValidations {
236
requeued = append(requeued, s)
237
continue
238
}
239
240
var vf errValidationFailed
241
if errors.As(err, &vf) {
242
switch {
243
case vf.mismatch:
244
c.m.totalResults.WithLabelValues("mismatch").Inc()
245
case vf.missing:
246
c.m.totalResults.WithLabelValues("missing").Inc()
247
default:
248
c.m.totalResults.WithLabelValues("unknown").Inc()
249
}
250
}
251
}
252
c.pending = requeued
253
c.m.pendingSets.Set(float64(len(c.pending)))
254
}
255
256
type errValidationFailed struct {
257
missing bool
258
mismatch bool
259
}
260
261
func (e errValidationFailed) Error() string {
262
switch {
263
case e.missing:
264
return "validation failed: sample missing"
265
case e.mismatch:
266
return "validation failed: sample does not match"
267
default:
268
return "validation failed"
269
}
270
}
271
272
// validate validates a sample. If the sample should be requeued (i.e.,
273
// couldn't be found), returns true.
274
func (c *Crow) validate(b *sample) error {
275
ctx, cancel := context.WithTimeout(context.Background(), c.cfg.QueryTimeout)
276
defer cancel()
277
278
labels := make([]string, 0, len(b.Labels))
279
for k, v := range b.Labels {
280
labels = append(labels, fmt.Sprintf(`%s="%s"`, k, v))
281
}
282
if c.cfg.ExtraSelectors != "" {
283
labels = append(labels, c.cfg.ExtraSelectors)
284
}
285
286
query := fmt.Sprintf("%s{%s}", validationSampleName, strings.Join(labels, ","))
287
level.Debug(c.cfg.Log).Log("msg", "querying for sample", "query", query)
288
289
val, _, err := c.promClient.QueryRange(ctx, query, promapi.Range{
290
Start: b.ScrapeTime.UTC().Add(-c.cfg.QueryDuration),
291
End: b.ScrapeTime.UTC().Add(+c.cfg.QueryDuration),
292
Step: c.cfg.QueryStep,
293
})
294
295
if err != nil {
296
level.Error(c.cfg.Log).Log("msg", "failed to query for sample", "query", query, "err", err)
297
} else if m, ok := val.(model.Matrix); ok {
298
return c.validateInMatrix(query, b, m)
299
}
300
301
return errValidationFailed{missing: true}
302
}
303
304
func (c *Crow) validateInMatrix(query string, b *sample, m model.Matrix) error {
305
var found, matches bool
306
307
for _, ss := range m {
308
for _, sp := range ss.Values {
309
ts := time.Unix(0, sp.Timestamp.UnixNano())
310
dist := b.ScrapeTime.Sub(ts)
311
if dist < 0 {
312
dist = -dist
313
}
314
315
if dist <= c.cfg.MaxTimestampDelta {
316
found = true
317
matches = math.Abs(float64(sp.Value)-b.Value) <= c.cfg.ValueEpsilon
318
}
319
320
level.Debug(c.cfg.Log).Log(
321
"msg", "compared query to stored sample",
322
"query", query,
323
"sample", ss.Metric,
324
"ts", sp.Timestamp, "expect_ts", b.ScrapeTime,
325
"value", sp.Value, "expect_value", b.Value,
326
)
327
328
if found && matches {
329
break
330
}
331
}
332
}
333
334
if !found || !matches {
335
return errValidationFailed{
336
missing: !found,
337
mismatch: found && !matches,
338
}
339
}
340
return nil
341
}
342
343
// TestMetrics exposes a collector of test metrics. Each collection will
344
// schedule a validation job.
345
func (c *Crow) TestMetrics() prometheus.Collector {
346
return &sampleGenerator{
347
numSamples: c.cfg.NumSamples,
348
sendCh: c.sampleCh,
349
350
r: rand.New(rand.NewSource(time.Now().Unix())),
351
}
352
}
353
354
// StateMetrics exposes metrics of Crow itself. These metrics are not validated
355
// for presence in the remote system.
356
func (c *Crow) StateMetrics() prometheus.Collector { return c.m }
357
358
// Stop stops crow. Panics if Stop is called more than once.
359
func (c *Crow) Stop() {
360
close(c.quit)
361
c.wg.Wait()
362
}
363
364