Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/metrics/wal/wal_test.go
4096 views
1
package wal
2
3
import (
4
"context"
5
"math"
6
"sort"
7
"testing"
8
"time"
9
10
"github.com/go-kit/log"
11
"github.com/grafana/agent/pkg/util"
12
"github.com/prometheus/prometheus/model/exemplar"
13
"github.com/prometheus/prometheus/model/labels"
14
"github.com/prometheus/prometheus/model/value"
15
"github.com/prometheus/prometheus/storage"
16
"github.com/prometheus/prometheus/tsdb"
17
"github.com/prometheus/prometheus/tsdb/chunks"
18
"github.com/prometheus/prometheus/tsdb/record"
19
"github.com/stretchr/testify/require"
20
)
21
22
func TestStorage_InvalidSeries(t *testing.T) {
23
walDir := t.TempDir()
24
25
s, err := NewStorage(log.NewNopLogger(), nil, walDir)
26
require.NoError(t, err)
27
defer func() {
28
require.NoError(t, s.Close())
29
}()
30
31
app := s.Appender(context.Background())
32
33
// Samples
34
_, err = app.Append(0, labels.Labels{}, 0, 0)
35
require.Error(t, err, "should reject empty labels")
36
37
_, err = app.Append(0, labels.Labels{{Name: "a", Value: "1"}, {Name: "a", Value: "2"}}, 0, 0)
38
require.Error(t, err, "should reject duplicate labels")
39
40
// Sanity check: valid series
41
sRef, err := app.Append(0, labels.Labels{{Name: "a", Value: "1"}}, 0, 0)
42
require.NoError(t, err, "should not reject valid series")
43
44
// Exemplars
45
_, err = app.AppendExemplar(0, nil, exemplar.Exemplar{})
46
require.Error(t, err, "should reject unknown series ref")
47
48
e := exemplar.Exemplar{Labels: labels.Labels{{Name: "a", Value: "1"}, {Name: "a", Value: "2"}}}
49
_, err = app.AppendExemplar(sRef, nil, e)
50
require.ErrorIs(t, err, tsdb.ErrInvalidExemplar, "should reject duplicate labels")
51
52
e = exemplar.Exemplar{Labels: labels.Labels{{Name: "a_somewhat_long_trace_id", Value: "nYJSNtFrFTY37VR7mHzEE/LIDt7cdAQcuOzFajgmLDAdBSRHYPDzrxhMA4zz7el8naI/AoXFv9/e/G0vcETcIoNUi3OieeLfaIRQci2oa"}}}
53
_, err = app.AppendExemplar(sRef, nil, e)
54
require.ErrorIs(t, err, storage.ErrExemplarLabelLength, "should reject too long label length")
55
56
// Sanity check: valid exemplars
57
e = exemplar.Exemplar{Labels: labels.Labels{{Name: "a", Value: "1"}}, Value: 20, Ts: 10, HasTs: true}
58
_, err = app.AppendExemplar(sRef, nil, e)
59
require.NoError(t, err, "should not reject valid exemplars")
60
}
61
62
func TestStorage(t *testing.T) {
63
walDir := t.TempDir()
64
65
s, err := NewStorage(log.NewNopLogger(), nil, walDir)
66
require.NoError(t, err)
67
defer func() {
68
require.NoError(t, s.Close())
69
}()
70
71
app := s.Appender(context.Background())
72
73
// Write some samples
74
payload := buildSeries([]string{"foo", "bar", "baz"})
75
for _, metric := range payload {
76
metric.Write(t, app)
77
}
78
79
require.NoError(t, app.Commit())
80
81
collector := walDataCollector{}
82
replayer := walReplayer{w: &collector}
83
require.NoError(t, replayer.Replay(s.wal.Dir()))
84
85
names := []string{}
86
for _, series := range collector.series {
87
names = append(names, series.Labels.Get("__name__"))
88
}
89
require.Equal(t, payload.SeriesNames(), names)
90
91
expectedSamples := payload.ExpectedSamples()
92
actualSamples := collector.samples
93
sort.Sort(byRefSample(actualSamples))
94
require.Equal(t, expectedSamples, actualSamples)
95
96
expectedExemplars := payload.ExpectedExemplars()
97
actualExemplars := collector.exemplars
98
sort.Sort(byRefExemplar(actualExemplars))
99
require.Equal(t, expectedExemplars, actualExemplars)
100
}
101
102
func TestStorage_DuplicateExemplarsIgnored(t *testing.T) {
103
walDir := t.TempDir()
104
105
s, err := NewStorage(log.NewNopLogger(), nil, walDir)
106
require.NoError(t, err)
107
defer func() {
108
require.NoError(t, s.Close())
109
}()
110
111
app := s.Appender(context.Background())
112
113
sRef, err := app.Append(0, labels.Labels{{Name: "a", Value: "1"}}, 0, 0)
114
require.NoError(t, err, "should not reject valid series")
115
116
// If the Labels, Value or Timestamp are different than the last exemplar,
117
// then a new one should be appended; Otherwise, it should be skipped.
118
e := exemplar.Exemplar{Labels: labels.Labels{{Name: "a", Value: "1"}}, Value: 20, Ts: 10, HasTs: true}
119
_, _ = app.AppendExemplar(sRef, nil, e)
120
_, _ = app.AppendExemplar(sRef, nil, e)
121
122
e.Labels = labels.Labels{{Name: "b", Value: "2"}}
123
_, _ = app.AppendExemplar(sRef, nil, e)
124
_, _ = app.AppendExemplar(sRef, nil, e)
125
_, _ = app.AppendExemplar(sRef, nil, e)
126
127
e.Value = 42
128
_, _ = app.AppendExemplar(sRef, nil, e)
129
_, _ = app.AppendExemplar(sRef, nil, e)
130
131
e.Ts = 25
132
_, _ = app.AppendExemplar(sRef, nil, e)
133
_, _ = app.AppendExemplar(sRef, nil, e)
134
135
require.NoError(t, app.Commit())
136
collector := walDataCollector{}
137
replayer := walReplayer{w: &collector}
138
require.NoError(t, replayer.Replay(s.wal.Dir()))
139
140
// We had 9 calls to AppendExemplar but only 4 of those should have gotten through
141
require.Equal(t, 4, len(collector.exemplars))
142
}
143
144
func TestStorage_ExistingWAL(t *testing.T) {
145
walDir := t.TempDir()
146
147
s, err := NewStorage(log.NewNopLogger(), nil, walDir)
148
require.NoError(t, err)
149
150
app := s.Appender(context.Background())
151
payload := buildSeries([]string{"foo", "bar", "baz", "blerg"})
152
153
// Write half of the samples.
154
for _, metric := range payload[0 : len(payload)/2] {
155
metric.Write(t, app)
156
}
157
158
require.NoError(t, app.Commit())
159
require.NoError(t, s.Close())
160
161
// We need to wait a little bit for the previous store to finish
162
// flushing.
163
time.Sleep(time.Millisecond * 150)
164
165
// Create a new storage, write the other half of samples.
166
s, err = NewStorage(log.NewNopLogger(), nil, walDir)
167
require.NoError(t, err)
168
defer func() {
169
require.NoError(t, s.Close())
170
}()
171
172
// Verify that the storage picked up existing series when it
173
// replayed the WAL.
174
for series := range s.series.iterator().Channel() {
175
require.Greater(t, series.lastTs, int64(0), "series timestamp not updated")
176
}
177
178
app = s.Appender(context.Background())
179
180
for _, metric := range payload[len(payload)/2:] {
181
metric.Write(t, app)
182
}
183
184
require.NoError(t, app.Commit())
185
186
collector := walDataCollector{}
187
replayer := walReplayer{w: &collector}
188
require.NoError(t, replayer.Replay(s.wal.Dir()))
189
190
names := []string{}
191
for _, series := range collector.series {
192
names = append(names, series.Labels.Get("__name__"))
193
}
194
require.Equal(t, payload.SeriesNames(), names)
195
196
expectedSamples := payload.ExpectedSamples()
197
actualSamples := collector.samples
198
sort.Sort(byRefSample(actualSamples))
199
require.Equal(t, expectedSamples, actualSamples)
200
201
expectedExemplars := payload.ExpectedExemplars()
202
actualExemplars := collector.exemplars
203
sort.Sort(byRefExemplar(actualExemplars))
204
require.Equal(t, expectedExemplars, actualExemplars)
205
}
206
207
func TestStorage_ExistingWAL_RefID(t *testing.T) {
208
l := util.TestLogger(t)
209
210
walDir := t.TempDir()
211
212
s, err := NewStorage(l, nil, walDir)
213
require.NoError(t, err)
214
215
app := s.Appender(context.Background())
216
payload := buildSeries([]string{"foo", "bar", "baz", "blerg"})
217
218
// Write all the samples
219
for _, metric := range payload {
220
metric.Write(t, app)
221
}
222
require.NoError(t, app.Commit())
223
224
// Truncate the WAL to force creation of a new segment.
225
require.NoError(t, s.Truncate(0))
226
require.NoError(t, s.Close())
227
228
// Create a new storage and see what the ref ID is initialized to.
229
s, err = NewStorage(l, nil, walDir)
230
require.NoError(t, err)
231
defer require.NoError(t, s.Close())
232
233
require.Equal(t, uint64(len(payload)), s.nextRef.Load(), "cached ref ID should be equal to the number of series written")
234
}
235
236
func TestStorage_Truncate(t *testing.T) {
237
// Same as before but now do the following:
238
// after writing all the data, forcefully create 4 more segments,
239
// then do a truncate of a timestamp for _some_ of the data.
240
// then read data back in. Expect to only get the latter half of data.
241
walDir := t.TempDir()
242
243
s, err := NewStorage(log.NewNopLogger(), nil, walDir)
244
require.NoError(t, err)
245
defer func() {
246
require.NoError(t, s.Close())
247
}()
248
249
app := s.Appender(context.Background())
250
251
payload := buildSeries([]string{"foo", "bar", "baz", "blerg"})
252
253
for _, metric := range payload {
254
metric.Write(t, app)
255
}
256
257
require.NoError(t, app.Commit())
258
259
// Forcefully create a bunch of new segments so when we truncate
260
// there's enough segments to be considered for truncation.
261
for i := 0; i < 5; i++ {
262
_, err := s.wal.NextSegmentSync()
263
require.NoError(t, err)
264
}
265
266
// Truncate half of the samples, keeping only the second sample
267
// per series.
268
keepTs := payload[len(payload)-1].samples[0].ts + 1
269
err = s.Truncate(keepTs)
270
require.NoError(t, err)
271
272
payload = payload.Filter(func(s sample) bool {
273
return s.ts >= keepTs
274
}, func(e exemplar.Exemplar) bool {
275
return e.HasTs && e.Ts >= keepTs
276
})
277
expectedSamples := payload.ExpectedSamples()
278
expectedExemplars := payload.ExpectedExemplars()
279
280
// Read back the WAL, collect series and samples.
281
collector := walDataCollector{}
282
replayer := walReplayer{w: &collector}
283
require.NoError(t, replayer.Replay(s.wal.Dir()))
284
285
names := []string{}
286
for _, series := range collector.series {
287
names = append(names, series.Labels.Get("__name__"))
288
}
289
require.Equal(t, payload.SeriesNames(), names)
290
291
actualSamples := collector.samples
292
sort.Sort(byRefSample(actualSamples))
293
require.Equal(t, expectedSamples, actualSamples)
294
295
actualExemplars := collector.exemplars
296
sort.Sort(byRefExemplar(actualExemplars))
297
require.Equal(t, expectedExemplars, actualExemplars)
298
}
299
300
func TestStorage_WriteStalenessMarkers(t *testing.T) {
301
walDir := t.TempDir()
302
303
s, err := NewStorage(log.NewNopLogger(), nil, walDir)
304
require.NoError(t, err)
305
defer func() {
306
require.NoError(t, s.Close())
307
}()
308
309
app := s.Appender(context.Background())
310
311
// Write some samples
312
payload := seriesList{
313
{name: "foo", samples: []sample{{1, 10.0}, {10, 100.0}}},
314
{name: "bar", samples: []sample{{2, 20.0}, {20, 200.0}}},
315
{name: "baz", samples: []sample{{3, 30.0}, {30, 300.0}}},
316
}
317
for _, metric := range payload {
318
metric.Write(t, app)
319
}
320
321
require.NoError(t, app.Commit())
322
323
// Write staleness markers for every series
324
require.NoError(t, s.WriteStalenessMarkers(func() int64 {
325
// Pass math.MaxInt64 so it seems like everything was written already
326
return math.MaxInt64
327
}))
328
329
// Read back the WAL, collect series and samples.
330
collector := walDataCollector{}
331
replayer := walReplayer{w: &collector}
332
require.NoError(t, replayer.Replay(s.wal.Dir()))
333
334
actual := collector.samples
335
sort.Sort(byRefSample(actual))
336
337
staleMap := map[chunks.HeadSeriesRef]bool{}
338
for _, sample := range actual {
339
if _, ok := staleMap[sample.Ref]; !ok {
340
staleMap[sample.Ref] = false
341
}
342
if value.IsStaleNaN(sample.V) {
343
staleMap[sample.Ref] = true
344
}
345
}
346
347
for ref, v := range staleMap {
348
require.True(t, v, "ref %d doesn't have stale marker", ref)
349
}
350
}
351
352
func TestStorage_TruncateAfterClose(t *testing.T) {
353
walDir := t.TempDir()
354
355
s, err := NewStorage(log.NewNopLogger(), nil, walDir)
356
require.NoError(t, err)
357
358
require.NoError(t, s.Close())
359
require.Error(t, ErrWALClosed, s.Truncate(0))
360
}
361
362
func TestGlobalReferenceID_Normal(t *testing.T) {
363
walDir := t.TempDir()
364
365
s, _ := NewStorage(log.NewNopLogger(), nil, walDir)
366
defer s.Close()
367
app := s.Appender(context.Background())
368
l := labels.New(labels.Label{
369
Name: "__name__",
370
Value: "label1",
371
})
372
ref, err := app.Append(0, l, time.Now().UnixMilli(), 0.1)
373
_ = app.Commit()
374
require.NoError(t, err)
375
require.True(t, ref == 1)
376
ref2, err := app.Append(0, l, time.Now().UnixMilli(), 0.1)
377
require.NoError(t, err)
378
require.True(t, ref2 == 1)
379
380
l2 := labels.New(labels.Label{
381
Name: "__name__",
382
Value: "label2",
383
})
384
ref3, err := app.Append(0, l2, time.Now().UnixMilli(), 0.1)
385
require.NoError(t, err)
386
require.True(t, ref3 == 2)
387
}
388
389
func BenchmarkAppendExemplar(b *testing.B) {
390
walDir := b.TempDir()
391
392
s, _ := NewStorage(log.NewNopLogger(), nil, walDir)
393
defer s.Close()
394
app := s.Appender(context.Background())
395
sRef, _ := app.Append(0, labels.Labels{{Name: "a", Value: "1"}}, 0, 0)
396
e := exemplar.Exemplar{Labels: labels.Labels{{Name: "a", Value: "1"}}, Value: 20, Ts: 10, HasTs: true}
397
398
b.StartTimer()
399
for i := 0; i < b.N; i++ {
400
e.Ts = int64(i)
401
_, _ = app.AppendExemplar(sRef, nil, e)
402
}
403
b.StopTimer()
404
405
// Actually use appended exemplars in case they get eliminated
406
_ = app.Commit()
407
}
408
409
type sample struct {
410
ts int64
411
val float64
412
}
413
414
type series struct {
415
name string
416
samples []sample
417
exemplars []exemplar.Exemplar
418
419
ref *storage.SeriesRef
420
}
421
422
func (s *series) Write(t *testing.T, app storage.Appender) {
423
t.Helper()
424
425
lbls := labels.FromMap(map[string]string{"__name__": s.name})
426
427
offset := 0
428
if s.ref == nil {
429
// Write first sample to get ref ID
430
ref, err := app.Append(0, lbls, s.samples[0].ts, s.samples[0].val)
431
require.NoError(t, err)
432
433
s.ref = &ref
434
offset = 1
435
}
436
437
// Write other data points with AddFast
438
for _, sample := range s.samples[offset:] {
439
_, err := app.Append(*s.ref, lbls, sample.ts, sample.val)
440
require.NoError(t, err)
441
}
442
443
sRef := *s.ref
444
for _, exemplar := range s.exemplars {
445
var err error
446
sRef, err = app.AppendExemplar(sRef, nil, exemplar)
447
require.NoError(t, err)
448
}
449
}
450
451
type seriesList []*series
452
453
// Filter creates a new seriesList with series filtered by a sample
454
// keep predicate function.
455
func (s seriesList) Filter(fn func(s sample) bool, fnExemplar func(e exemplar.Exemplar) bool) seriesList {
456
var ret seriesList
457
458
for _, entry := range s {
459
var (
460
samples []sample
461
exemplars []exemplar.Exemplar
462
)
463
464
for _, sample := range entry.samples {
465
if fn(sample) {
466
samples = append(samples, sample)
467
}
468
}
469
470
for _, e := range entry.exemplars {
471
if fnExemplar(e) {
472
exemplars = append(exemplars, e)
473
}
474
}
475
476
if len(samples) > 0 && len(exemplars) > 0 {
477
ret = append(ret, &series{
478
name: entry.name,
479
ref: entry.ref,
480
samples: samples,
481
exemplars: exemplars,
482
})
483
}
484
}
485
486
return ret
487
}
488
489
func (s seriesList) SeriesNames() []string {
490
names := make([]string, 0, len(s))
491
for _, series := range s {
492
names = append(names, series.name)
493
}
494
return names
495
}
496
497
// ExpectedSamples returns the list of expected samples, sorted by ref ID and timestamp
498
func (s seriesList) ExpectedSamples() []record.RefSample {
499
expect := []record.RefSample{}
500
for _, series := range s {
501
for _, sample := range series.samples {
502
expect = append(expect, record.RefSample{
503
Ref: chunks.HeadSeriesRef(*series.ref),
504
T: sample.ts,
505
V: sample.val,
506
})
507
}
508
}
509
sort.Sort(byRefSample(expect))
510
return expect
511
}
512
513
// ExpectedExemplars returns the list of expected exemplars, sorted by ref ID and timestamp
514
func (s seriesList) ExpectedExemplars() []record.RefExemplar {
515
expect := []record.RefExemplar{}
516
for _, series := range s {
517
for _, exemplar := range series.exemplars {
518
expect = append(expect, record.RefExemplar{
519
Ref: chunks.HeadSeriesRef(*series.ref),
520
T: exemplar.Ts,
521
V: exemplar.Value,
522
Labels: exemplar.Labels,
523
})
524
}
525
}
526
sort.Sort(byRefExemplar(expect))
527
return expect
528
}
529
530
func buildSeries(nameSlice []string) seriesList {
531
s := make(seriesList, 0, len(nameSlice))
532
for i, n := range nameSlice {
533
i++
534
s = append(s, &series{
535
name: n,
536
samples: []sample{{int64(i), float64(i * 10.0)}, {int64(i * 10), float64(i * 100.0)}},
537
exemplars: []exemplar.Exemplar{
538
{Labels: labels.Labels{{Name: "foobar", Value: "barfoo"}}, Value: float64(i * 10.0), Ts: int64(i), HasTs: true},
539
{Labels: labels.Labels{{Name: "lorem", Value: "ipsum"}}, Value: float64(i * 100.0), Ts: int64(i * 10), HasTs: true},
540
},
541
})
542
}
543
return s
544
}
545
546
type byRefSample []record.RefSample
547
548
func (b byRefSample) Len() int { return len(b) }
549
func (b byRefSample) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
550
func (b byRefSample) Less(i, j int) bool {
551
if b[i].Ref == b[j].Ref {
552
return b[i].T < b[j].T
553
}
554
return b[i].Ref < b[j].Ref
555
}
556
557
type byRefExemplar []record.RefExemplar
558
559
func (b byRefExemplar) Len() int { return len(b) }
560
func (b byRefExemplar) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
561
func (b byRefExemplar) Less(i, j int) bool {
562
if b[i].Ref == b[j].Ref {
563
return b[i].T < b[j].T
564
}
565
return b[i].Ref < b[j].Ref
566
}
567
568