Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/metrics/wal/wal.go
4094 views
1
package wal
2
3
// This code is copied from
4
// prometheus/prometheus@7c2de14b0bd74303c2ca6f932b71d4585a29ca75, with only
5
// minor changes for metric names.
6
7
import (
8
"context"
9
"errors"
10
"fmt"
11
"math"
12
"sync"
13
"time"
14
"unicode/utf8"
15
16
"github.com/go-kit/log"
17
"github.com/go-kit/log/level"
18
"github.com/prometheus/client_golang/prometheus"
19
"github.com/prometheus/prometheus/model/exemplar"
20
"github.com/prometheus/prometheus/model/histogram"
21
"github.com/prometheus/prometheus/model/labels"
22
"github.com/prometheus/prometheus/model/metadata"
23
"github.com/prometheus/prometheus/model/timestamp"
24
"github.com/prometheus/prometheus/model/value"
25
"github.com/prometheus/prometheus/storage"
26
"github.com/prometheus/prometheus/tsdb"
27
"github.com/prometheus/prometheus/tsdb/chunks"
28
"github.com/prometheus/prometheus/tsdb/record"
29
"github.com/prometheus/prometheus/tsdb/wlog"
30
"go.uber.org/atomic"
31
)
32
33
// ErrWALClosed is an error returned when a WAL operation can't run because the
34
// storage has already been closed.
35
var ErrWALClosed = fmt.Errorf("WAL storage closed")
36
37
type storageMetrics struct {
38
r prometheus.Registerer
39
40
numActiveSeries prometheus.Gauge
41
numDeletedSeries prometheus.Gauge
42
totalOutOfOrderSamples prometheus.Counter
43
totalCreatedSeries prometheus.Counter
44
totalRemovedSeries prometheus.Counter
45
totalAppendedSamples prometheus.Counter
46
totalAppendedExemplars prometheus.Counter
47
}
48
49
func newStorageMetrics(r prometheus.Registerer) *storageMetrics {
50
m := storageMetrics{r: r}
51
m.numActiveSeries = prometheus.NewGauge(prometheus.GaugeOpts{
52
Name: "agent_wal_storage_active_series",
53
Help: "Current number of active series being tracked by the WAL storage",
54
})
55
56
m.numDeletedSeries = prometheus.NewGauge(prometheus.GaugeOpts{
57
Name: "agent_wal_storage_deleted_series",
58
Help: "Current number of series marked for deletion from memory",
59
})
60
61
m.totalOutOfOrderSamples = prometheus.NewCounter(prometheus.CounterOpts{
62
Name: "agent_wal_out_of_order_samples_total",
63
Help: "Total number of out of order samples ingestion failed attempts.",
64
})
65
66
m.totalCreatedSeries = prometheus.NewCounter(prometheus.CounterOpts{
67
Name: "agent_wal_storage_created_series_total",
68
Help: "Total number of created series appended to the WAL",
69
})
70
71
m.totalRemovedSeries = prometheus.NewCounter(prometheus.CounterOpts{
72
Name: "agent_wal_storage_removed_series_total",
73
Help: "Total number of created series removed from the WAL",
74
})
75
76
m.totalAppendedSamples = prometheus.NewCounter(prometheus.CounterOpts{
77
Name: "agent_wal_samples_appended_total",
78
Help: "Total number of samples appended to the WAL",
79
})
80
81
m.totalAppendedExemplars = prometheus.NewCounter(prometheus.CounterOpts{
82
Name: "agent_wal_exemplars_appended_total",
83
Help: "Total number of exemplars appended to the WAL",
84
})
85
86
if r != nil {
87
r.MustRegister(
88
m.numActiveSeries,
89
m.numDeletedSeries,
90
m.totalOutOfOrderSamples,
91
m.totalCreatedSeries,
92
m.totalRemovedSeries,
93
m.totalAppendedSamples,
94
m.totalAppendedExemplars,
95
)
96
}
97
98
return &m
99
}
100
101
func (m *storageMetrics) Unregister() {
102
if m.r == nil {
103
return
104
}
105
cs := []prometheus.Collector{
106
m.numActiveSeries,
107
m.numDeletedSeries,
108
m.totalOutOfOrderSamples,
109
m.totalCreatedSeries,
110
m.totalRemovedSeries,
111
m.totalAppendedSamples,
112
m.totalAppendedExemplars,
113
}
114
for _, c := range cs {
115
m.r.Unregister(c)
116
}
117
}
118
119
// Storage implements storage.Storage, and just writes to the WAL.
120
type Storage struct {
121
// Embed Queryable/ChunkQueryable for compatibility, but don't actually implement it.
122
storage.Queryable
123
storage.ChunkQueryable
124
125
// Operations against the WAL must be protected by a mutex so it doesn't get
126
// closed in the middle of an operation. Other operations are concurrency-safe, so we
127
// use a RWMutex to allow multiple usages of the WAL at once. If the WAL is closed, all
128
// operations that change the WAL must fail.
129
walMtx sync.RWMutex
130
walClosed bool
131
132
path string
133
wal *wlog.WL
134
logger log.Logger
135
136
appenderPool sync.Pool
137
bufPool sync.Pool
138
139
nextRef *atomic.Uint64
140
series *stripeSeries
141
deleted map[chunks.HeadSeriesRef]int // Deleted series, and what WAL segment they must be kept until.
142
143
metrics *storageMetrics
144
}
145
146
// NewStorage makes a new Storage.
147
func NewStorage(logger log.Logger, registerer prometheus.Registerer, path string) (*Storage, error) {
148
w, err := wlog.NewSize(logger, registerer, SubDirectory(path), wlog.DefaultSegmentSize, true)
149
if err != nil {
150
return nil, err
151
}
152
153
storage := &Storage{
154
path: path,
155
wal: w,
156
logger: logger,
157
deleted: map[chunks.HeadSeriesRef]int{},
158
series: newStripeSeries(tsdb.DefaultStripeSize),
159
metrics: newStorageMetrics(registerer),
160
nextRef: atomic.NewUint64(0),
161
}
162
163
storage.bufPool.New = func() interface{} {
164
b := make([]byte, 0, 1024)
165
return b
166
}
167
168
storage.appenderPool.New = func() interface{} {
169
return &appender{
170
w: storage,
171
pendingSeries: make([]record.RefSeries, 0, 100),
172
pendingSamples: make([]record.RefSample, 0, 100),
173
pendingHistograms: make([]record.RefHistogramSample, 0, 100),
174
pendingFloatHistograms: make([]record.RefFloatHistogramSample, 0, 100),
175
pendingExamplars: make([]record.RefExemplar, 0, 10),
176
}
177
}
178
179
if err := storage.replayWAL(); err != nil {
180
level.Warn(storage.logger).Log("msg", "encountered WAL read error, attempting repair", "err", err)
181
182
var ce *wlog.CorruptionErr
183
if ok := errors.As(err, &ce); !ok {
184
return nil, err
185
}
186
if err := w.Repair(ce); err != nil {
187
// if repair fails, truncate everything in WAL
188
level.Warn(storage.logger).Log("msg", "WAL repair failed, truncating!", "err", err)
189
if e := w.Truncate(math.MaxInt); e != nil {
190
level.Error(storage.logger).Log("msg", "WAL truncate failure", "err", e)
191
return nil, fmt.Errorf("truncate corrupted WAL: %w", e)
192
}
193
if e := wlog.DeleteCheckpoints(w.Dir(), math.MaxInt); e != nil {
194
return nil, fmt.Errorf("delete WAL checkpoints: %w", e)
195
}
196
return nil, fmt.Errorf("repair corrupted WAL: %w", err)
197
}
198
}
199
200
return storage, nil
201
}
202
203
func (w *Storage) replayWAL() error {
204
w.walMtx.RLock()
205
defer w.walMtx.RUnlock()
206
207
if w.walClosed {
208
return ErrWALClosed
209
}
210
211
level.Info(w.logger).Log("msg", "replaying WAL, this may take a while", "dir", w.wal.Dir())
212
dir, startFrom, err := wlog.LastCheckpoint(w.wal.Dir())
213
if err != nil && err != record.ErrNotFound {
214
return fmt.Errorf("find last checkpoint: %w", err)
215
}
216
217
multiRef := map[chunks.HeadSeriesRef]chunks.HeadSeriesRef{}
218
219
if err == nil {
220
sr, err := wlog.NewSegmentsReader(dir)
221
if err != nil {
222
return fmt.Errorf("open checkpoint: %w", err)
223
}
224
defer func() {
225
if err := sr.Close(); err != nil {
226
level.Warn(w.logger).Log("msg", "error while closing the wal segments reader", "err", err)
227
}
228
}()
229
230
// A corrupted checkpoint is a hard error for now and requires user
231
// intervention. There's likely little data that can be recovered anyway.
232
if err := w.loadWAL(wlog.NewReader(sr), multiRef); err != nil {
233
return fmt.Errorf("backfill checkpoint: %w", err)
234
}
235
startFrom++
236
level.Info(w.logger).Log("msg", "WAL checkpoint loaded")
237
}
238
239
// Find the last segment.
240
_, last, err := wlog.Segments(w.wal.Dir())
241
if err != nil {
242
return fmt.Errorf("finding WAL segments: %w", err)
243
}
244
245
// Backfill segments from the most recent checkpoint onwards.
246
for i := startFrom; i <= last; i++ {
247
s, err := wlog.OpenReadSegment(wlog.SegmentName(w.wal.Dir(), i))
248
if err != nil {
249
return fmt.Errorf("open WAL segment %d: %w", i, err)
250
}
251
252
sr := wlog.NewSegmentBufReader(s)
253
err = w.loadWAL(wlog.NewReader(sr), multiRef)
254
if err := sr.Close(); err != nil {
255
level.Warn(w.logger).Log("msg", "error while closing the wal segments reader", "err", err)
256
}
257
if err != nil {
258
return err
259
}
260
level.Info(w.logger).Log("msg", "WAL segment loaded", "segment", i, "maxSegment", last)
261
}
262
263
return nil
264
}
265
266
func (w *Storage) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef) (err error) {
267
var (
268
dec record.Decoder
269
lastRef = chunks.HeadSeriesRef(w.nextRef.Load())
270
271
decoded = make(chan interface{}, 10)
272
errCh = make(chan error, 1)
273
seriesPool = sync.Pool{
274
New: func() interface{} {
275
return []record.RefSeries{}
276
},
277
}
278
samplesPool = sync.Pool{
279
New: func() interface{} {
280
return []record.RefSample{}
281
},
282
}
283
histogramsPool = sync.Pool{
284
New: func() interface{} {
285
return []record.RefHistogramSample{}
286
},
287
}
288
floatHistogramsPool = sync.Pool{
289
New: func() interface{} {
290
return []record.RefFloatHistogramSample{}
291
},
292
}
293
)
294
295
go func() {
296
defer close(decoded)
297
for r.Next() {
298
rec := r.Record()
299
switch dec.Type(rec) {
300
case record.Series:
301
series := seriesPool.Get().([]record.RefSeries)[:0]
302
series, err = dec.Series(rec, series)
303
if err != nil {
304
errCh <- &wlog.CorruptionErr{
305
Err: fmt.Errorf("decode series: %w", err),
306
Segment: r.Segment(),
307
Offset: r.Offset(),
308
}
309
return
310
}
311
decoded <- series
312
case record.Samples:
313
samples := samplesPool.Get().([]record.RefSample)[:0]
314
samples, err = dec.Samples(rec, samples)
315
if err != nil {
316
errCh <- &wlog.CorruptionErr{
317
Err: fmt.Errorf("decode samples: %w", err),
318
Segment: r.Segment(),
319
Offset: r.Offset(),
320
}
321
}
322
decoded <- samples
323
case record.HistogramSamples:
324
histograms := histogramsPool.Get().([]record.RefHistogramSample)[:0]
325
histograms, err = dec.HistogramSamples(rec, histograms)
326
if err != nil {
327
errCh <- &wlog.CorruptionErr{
328
Err: fmt.Errorf("decode histogram samples: %w", err),
329
Segment: r.Segment(),
330
Offset: r.Offset(),
331
}
332
return
333
}
334
decoded <- histograms
335
case record.FloatHistogramSamples:
336
floatHistograms := floatHistogramsPool.Get().([]record.RefFloatHistogramSample)[:0]
337
floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms)
338
if err != nil {
339
errCh <- &wlog.CorruptionErr{
340
Err: fmt.Errorf("decode float histogram samples: %w", err),
341
Segment: r.Segment(),
342
Offset: r.Offset(),
343
}
344
return
345
}
346
decoded <- floatHistograms
347
case record.Tombstones, record.Exemplars:
348
// We don't care about decoding tombstones or exemplars
349
// TODO: If decide to decode exemplars, we should make sure to prepopulate
350
// stripeSeries.exemplars in the next block by using setLatestExemplar.
351
continue
352
default:
353
errCh <- &wlog.CorruptionErr{
354
Err: fmt.Errorf("invalid record type %v", dec.Type(rec)),
355
Segment: r.Segment(),
356
Offset: r.Offset(),
357
}
358
return
359
}
360
}
361
}()
362
363
var nonExistentSeriesRefs atomic.Uint64
364
365
for d := range decoded {
366
switch v := d.(type) {
367
case []record.RefSeries:
368
for _, s := range v {
369
// If this is a new series, create it in memory without a timestamp.
370
// If we read in a sample for it, we'll use the timestamp of the latest
371
// sample. Otherwise, the series is stale and will be deleted once
372
// the truncation is performed.
373
if w.series.GetByID(s.Ref) == nil {
374
series := &memSeries{ref: s.Ref, lset: s.Labels, lastTs: 0}
375
w.series.Set(s.Labels.Hash(), series)
376
multiRef[s.Ref] = series.ref
377
378
w.metrics.numActiveSeries.Inc()
379
w.metrics.totalCreatedSeries.Inc()
380
381
if s.Ref > lastRef {
382
lastRef = s.Ref
383
}
384
}
385
}
386
387
//nolint:staticcheck
388
seriesPool.Put(v)
389
case []record.RefSample:
390
for _, s := range v {
391
// Update the lastTs for the series based
392
ref, ok := multiRef[s.Ref]
393
if !ok {
394
nonExistentSeriesRefs.Inc()
395
continue
396
}
397
398
series := w.series.GetByID(ref)
399
if s.T > series.lastTs {
400
series.lastTs = s.T
401
}
402
}
403
404
//nolint:staticcheck
405
samplesPool.Put(v)
406
case []record.RefHistogramSample:
407
for _, entry := range v {
408
// Update the lastTs for the series based
409
ref, ok := multiRef[entry.Ref]
410
if !ok {
411
nonExistentSeriesRefs.Inc()
412
continue
413
}
414
series := w.series.GetByID(ref)
415
if entry.T > series.lastTs {
416
series.lastTs = entry.T
417
}
418
}
419
420
//nolint:staticcheck
421
histogramsPool.Put(v)
422
case []record.RefFloatHistogramSample:
423
for _, entry := range v {
424
// Update the lastTs for the series based
425
ref, ok := multiRef[entry.Ref]
426
if !ok {
427
nonExistentSeriesRefs.Inc()
428
continue
429
}
430
series := w.series.GetByID(ref)
431
if entry.T > series.lastTs {
432
series.lastTs = entry.T
433
}
434
}
435
436
//nolint:staticcheck
437
floatHistogramsPool.Put(v)
438
default:
439
panic(fmt.Errorf("unexpected decoded type: %T", d))
440
}
441
}
442
443
if v := nonExistentSeriesRefs.Load(); v > 0 {
444
level.Warn(w.logger).Log("msg", "found sample referencing non-existing series", "skipped_series", v)
445
}
446
447
w.nextRef.Store(uint64(lastRef))
448
449
select {
450
case err := <-errCh:
451
return err
452
default:
453
if r.Err() != nil {
454
return fmt.Errorf("read records: %w", err)
455
}
456
return nil
457
}
458
}
459
460
// Directory returns the path where the WAL storage is held.
461
func (w *Storage) Directory() string {
462
return w.path
463
}
464
465
// Appender returns a new appender against the storage.
466
func (w *Storage) Appender(_ context.Context) storage.Appender {
467
return w.appenderPool.Get().(storage.Appender)
468
}
469
470
// StartTime always returns 0, nil. It is implemented for compatibility with
471
// Prometheus, but is unused in the agent.
472
func (*Storage) StartTime() (int64, error) {
473
return 0, nil
474
}
475
476
// Truncate removes all data from the WAL prior to the timestamp specified by
477
// mint.
478
func (w *Storage) Truncate(mint int64) error {
479
w.walMtx.RLock()
480
defer w.walMtx.RUnlock()
481
482
if w.walClosed {
483
return ErrWALClosed
484
}
485
486
start := time.Now()
487
488
// Garbage collect series that haven't received an update since mint.
489
w.gc(mint)
490
level.Info(w.logger).Log("msg", "series GC completed", "duration", time.Since(start))
491
492
first, last, err := wlog.Segments(w.wal.Dir())
493
if err != nil {
494
return fmt.Errorf("get segment range: %w", err)
495
}
496
497
// Start a new segment, so low ingestion volume instance don't have more WAL
498
// than needed.
499
_, err = w.wal.NextSegment()
500
if err != nil {
501
return fmt.Errorf("next segment: %w", err)
502
}
503
504
last-- // Never consider last segment for checkpoint.
505
if last < 0 {
506
return nil // no segments yet.
507
}
508
509
// The lower two thirds of segments should contain mostly obsolete samples.
510
// If we have less than two segments, it's not worth checkpointing yet.
511
last = first + (last-first)*2/3
512
if last <= first {
513
return nil
514
}
515
516
keep := func(id chunks.HeadSeriesRef) bool {
517
if w.series.GetByID(id) != nil {
518
return true
519
}
520
521
seg, ok := w.deleted[id]
522
return ok && seg > last
523
}
524
if _, err = wlog.Checkpoint(w.logger, w.wal, first, last, keep, mint); err != nil {
525
return fmt.Errorf("create checkpoint: %w", err)
526
}
527
if err := w.wal.Truncate(last + 1); err != nil {
528
// If truncating fails, we'll just try again at the next checkpoint.
529
// Leftover segments will just be ignored in the future if there's a checkpoint
530
// that supersedes them.
531
level.Error(w.logger).Log("msg", "truncating segments failed", "err", err)
532
}
533
534
// The checkpoint is written and segments before it is truncated, so we no
535
// longer need to track deleted series that are before it.
536
for ref, segment := range w.deleted {
537
if segment < first {
538
delete(w.deleted, ref)
539
w.metrics.totalRemovedSeries.Inc()
540
}
541
}
542
w.metrics.numDeletedSeries.Set(float64(len(w.deleted)))
543
544
if err := wlog.DeleteCheckpoints(w.wal.Dir(), last); err != nil {
545
// Leftover old checkpoints do not cause problems down the line beyond
546
// occupying disk space.
547
// They will just be ignored since a higher checkpoint exists.
548
level.Error(w.logger).Log("msg", "delete old checkpoints", "err", err)
549
}
550
551
level.Info(w.logger).Log("msg", "WAL checkpoint complete",
552
"first", first, "last", last, "duration", time.Since(start))
553
return nil
554
}
555
556
// gc removes data before the minimum timestamp from the head.
557
func (w *Storage) gc(mint int64) {
558
deleted := w.series.gc(mint)
559
w.metrics.numActiveSeries.Sub(float64(len(deleted)))
560
561
_, last, _ := wlog.Segments(w.wal.Dir())
562
563
// We want to keep series records for any newly deleted series
564
// until we've passed the last recorded segment. This prevents
565
// the WAL having samples for series records that no longer exist.
566
for ref := range deleted {
567
w.deleted[ref] = last
568
}
569
570
w.metrics.numDeletedSeries.Set(float64(len(w.deleted)))
571
}
572
573
// WriteStalenessMarkers appends a staleness sample for all active series.
574
func (w *Storage) WriteStalenessMarkers(remoteTsFunc func() int64) error {
575
var lastErr error
576
var lastTs int64
577
578
app := w.Appender(context.Background())
579
it := w.series.iterator()
580
for series := range it.Channel() {
581
var (
582
ref = series.ref
583
lset = series.lset
584
)
585
586
ts := timestamp.FromTime(time.Now())
587
_, err := app.Append(storage.SeriesRef(ref), lset, ts, math.Float64frombits(value.StaleNaN))
588
if err != nil {
589
lastErr = err
590
}
591
592
// Remove millisecond precision; the remote write timestamp we get
593
// only has second precision.
594
lastTs = (ts / 1000) * 1000
595
}
596
597
if lastErr == nil {
598
if err := app.Commit(); err != nil {
599
return fmt.Errorf("failed to commit staleness markers: %w", err)
600
}
601
602
// Wait for remote write to write the lastTs, but give up after 1m
603
level.Info(w.logger).Log("msg", "waiting for remote write to write staleness markers...")
604
605
stopCh := time.After(1 * time.Minute)
606
start := time.Now()
607
608
Outer:
609
for {
610
select {
611
case <-stopCh:
612
level.Error(w.logger).Log("msg", "timed out waiting for staleness markers to be written")
613
break Outer
614
default:
615
writtenTs := remoteTsFunc()
616
if writtenTs >= lastTs {
617
duration := time.Since(start)
618
level.Info(w.logger).Log("msg", "remote write wrote staleness markers", "duration", duration)
619
break Outer
620
}
621
622
level.Info(w.logger).Log("msg", "remote write hasn't written staleness markers yet", "remoteTs", writtenTs, "lastTs", lastTs)
623
624
// Wait a bit before reading again
625
time.Sleep(5 * time.Second)
626
}
627
}
628
}
629
630
return lastErr
631
}
632
633
// Close closes the storage and all its underlying resources.
634
func (w *Storage) Close() error {
635
w.walMtx.Lock()
636
defer w.walMtx.Unlock()
637
638
if w.walClosed {
639
return fmt.Errorf("already closed")
640
}
641
w.walClosed = true
642
643
if w.metrics != nil {
644
w.metrics.Unregister()
645
}
646
return w.wal.Close()
647
}
648
649
type appender struct {
650
w *Storage
651
pendingSeries []record.RefSeries
652
pendingSamples []record.RefSample
653
pendingExamplars []record.RefExemplar
654
pendingHistograms []record.RefHistogramSample
655
pendingFloatHistograms []record.RefFloatHistogramSample
656
657
// Pointers to the series referenced by each element of pendingSamples.
658
// Series lock is not held on elements.
659
sampleSeries []*memSeries
660
661
// Pointers to the series referenced by each element of pendingHistograms.
662
// Series lock is not held on elements.
663
histogramSeries []*memSeries
664
665
// Pointers to the series referenced by each element of pendingFloatHistograms.
666
// Series lock is not held on elements.
667
floatHistogramSeries []*memSeries
668
}
669
670
var _ storage.Appender = (*appender)(nil)
671
672
func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
673
series := a.w.series.GetByID(chunks.HeadSeriesRef(ref))
674
if series == nil {
675
// Ensure no empty or duplicate labels have gotten through. This mirrors the
676
// equivalent validation code in the TSDB's headAppender.
677
l = l.WithoutEmpty()
678
if len(l) == 0 {
679
return 0, fmt.Errorf("empty labelset: %w", tsdb.ErrInvalidSample)
680
}
681
682
if lbl, dup := l.HasDuplicateLabelNames(); dup {
683
return 0, fmt.Errorf("label name %q is not unique: %w", lbl, tsdb.ErrInvalidSample)
684
}
685
686
var created bool
687
series, created = a.getOrCreate(l)
688
if created {
689
a.pendingSeries = append(a.pendingSeries, record.RefSeries{
690
Ref: series.ref,
691
Labels: l,
692
})
693
694
a.w.metrics.numActiveSeries.Inc()
695
a.w.metrics.totalCreatedSeries.Inc()
696
}
697
}
698
699
series.Lock()
700
defer series.Unlock()
701
702
if t < series.lastTs {
703
a.w.metrics.totalOutOfOrderSamples.Inc()
704
return 0, storage.ErrOutOfOrderSample
705
}
706
707
// NOTE(rfratto): always modify pendingSamples and sampleSeries together.
708
a.pendingSamples = append(a.pendingSamples, record.RefSample{
709
Ref: series.ref,
710
T: t,
711
V: v,
712
})
713
a.sampleSeries = append(a.sampleSeries, series)
714
715
a.w.metrics.totalAppendedSamples.Inc()
716
return storage.SeriesRef(series.ref), nil
717
}
718
719
func (a *appender) getOrCreate(l labels.Labels) (series *memSeries, created bool) {
720
hash := l.Hash()
721
722
series = a.w.series.GetByHash(hash, l)
723
if series != nil {
724
return series, false
725
}
726
727
ref := chunks.HeadSeriesRef(a.w.nextRef.Inc())
728
series = &memSeries{ref: ref, lset: l, lastTs: math.MinInt64}
729
a.w.series.Set(l.Hash(), series)
730
return series, true
731
}
732
733
func (a *appender) AppendExemplar(ref storage.SeriesRef, _ labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
734
readRef := chunks.HeadSeriesRef(ref)
735
736
s := a.w.series.GetByID(readRef)
737
if s == nil {
738
return 0, fmt.Errorf("unknown series ref when trying to add exemplar: %d", readRef)
739
}
740
741
// Ensure no empty labels have gotten through.
742
e.Labels = e.Labels.WithoutEmpty()
743
744
if lbl, dup := e.Labels.HasDuplicateLabelNames(); dup {
745
return 0, fmt.Errorf("label name %q is not unique: %w", lbl, tsdb.ErrInvalidExemplar)
746
}
747
748
// Exemplar label length does not include chars involved in text rendering such as quotes
749
// equals sign, or commas. See definition of const ExemplarMaxLabelLength.
750
labelSetLen := 0
751
err := e.Labels.Validate(func(l labels.Label) error {
752
labelSetLen += utf8.RuneCountInString(l.Name)
753
labelSetLen += utf8.RuneCountInString(l.Value)
754
755
if labelSetLen > exemplar.ExemplarMaxLabelSetLength {
756
return storage.ErrExemplarLabelLength
757
}
758
return nil
759
})
760
if err != nil {
761
return 0, err
762
}
763
764
// Check for duplicate vs last stored exemplar for this series, and discard those.
765
// Otherwise, record the current exemplar as the latest.
766
// Prometheus' TSDB returns 0 when encountering duplicates, so we do the same here.
767
prevExemplar := a.w.series.GetLatestExemplar(s.ref)
768
if prevExemplar != nil && prevExemplar.Equals(e) {
769
// Duplicate, don't return an error but don't accept the exemplar.
770
return 0, nil
771
}
772
a.w.series.SetLatestExemplar(s.ref, &e)
773
774
a.pendingExamplars = append(a.pendingExamplars, record.RefExemplar{
775
Ref: readRef,
776
T: e.Ts,
777
V: e.Value,
778
Labels: e.Labels,
779
})
780
781
a.w.metrics.totalAppendedExemplars.Inc()
782
return storage.SeriesRef(s.ref), nil
783
}
784
785
func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
786
if h != nil {
787
if err := tsdb.ValidateHistogram(h); err != nil {
788
return 0, err
789
}
790
}
791
792
if fh != nil {
793
if err := tsdb.ValidateFloatHistogram(fh); err != nil {
794
return 0, err
795
}
796
}
797
798
series := a.w.series.GetByID(chunks.HeadSeriesRef(ref))
799
if series == nil {
800
// Ensure no empty or duplicate labels have gotten through. This mirrors the
801
// equivalent validation code in the TSDB's headAppender.
802
l = l.WithoutEmpty()
803
if len(l) == 0 {
804
return 0, fmt.Errorf("empty labelset: %w", tsdb.ErrInvalidSample)
805
}
806
807
if lbl, dup := l.HasDuplicateLabelNames(); dup {
808
return 0, fmt.Errorf("label name %q is not unique: %w", lbl, tsdb.ErrInvalidSample)
809
}
810
811
var created bool
812
series, created = a.getOrCreate(l)
813
if created {
814
a.pendingSeries = append(a.pendingSeries, record.RefSeries{
815
Ref: series.ref,
816
Labels: l,
817
})
818
819
a.w.metrics.numActiveSeries.Inc()
820
a.w.metrics.totalCreatedSeries.Inc()
821
}
822
}
823
824
series.Lock()
825
defer series.Unlock()
826
827
if t < series.lastTs {
828
a.w.metrics.totalOutOfOrderSamples.Inc()
829
return 0, storage.ErrOutOfOrderSample
830
}
831
832
switch {
833
case h != nil:
834
// NOTE(rfratto): always modify pendingHistograms and histogramSeries
835
// together.
836
a.pendingHistograms = append(a.pendingHistograms, record.RefHistogramSample{
837
Ref: series.ref,
838
T: t,
839
H: h,
840
})
841
a.histogramSeries = append(a.histogramSeries, series)
842
case fh != nil:
843
// NOTE(rfratto): always modify pendingFloatHistograms and
844
// floatHistogramSeries together.
845
a.pendingFloatHistograms = append(a.pendingFloatHistograms, record.RefFloatHistogramSample{
846
Ref: series.ref,
847
T: t,
848
FH: fh,
849
})
850
a.floatHistogramSeries = append(a.floatHistogramSeries, series)
851
}
852
853
a.w.metrics.totalAppendedSamples.Inc()
854
return storage.SeriesRef(series.ref), nil
855
}
856
857
func (a *appender) UpdateMetadata(ref storage.SeriesRef, _ labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
858
// TODO(rfratto): implement pushing metadata to WAL
859
return 0, nil
860
}
861
862
// Commit submits the collected samples and purges the batch.
863
func (a *appender) Commit() error {
864
a.w.walMtx.RLock()
865
defer a.w.walMtx.RUnlock()
866
867
if a.w.walClosed {
868
return ErrWALClosed
869
}
870
871
var encoder record.Encoder
872
buf := a.w.bufPool.Get().([]byte)
873
874
if len(a.pendingSeries) > 0 {
875
buf = encoder.Series(a.pendingSeries, buf)
876
if err := a.w.wal.Log(buf); err != nil {
877
return err
878
}
879
buf = buf[:0]
880
}
881
882
if len(a.pendingSamples) > 0 {
883
buf = encoder.Samples(a.pendingSamples, buf)
884
if err := a.w.wal.Log(buf); err != nil {
885
return err
886
}
887
buf = buf[:0]
888
}
889
890
if len(a.pendingExamplars) > 0 {
891
buf = encoder.Exemplars(a.pendingExamplars, buf)
892
if err := a.w.wal.Log(buf); err != nil {
893
return err
894
}
895
buf = buf[:0]
896
}
897
898
if len(a.pendingHistograms) > 0 {
899
buf = encoder.HistogramSamples(a.pendingHistograms, buf)
900
if err := a.w.wal.Log(buf); err != nil {
901
return err
902
}
903
buf = buf[:0]
904
}
905
906
if len(a.pendingFloatHistograms) > 0 {
907
buf = encoder.FloatHistogramSamples(a.pendingFloatHistograms, buf)
908
if err := a.w.wal.Log(buf); err != nil {
909
return err
910
}
911
buf = buf[:0]
912
}
913
914
var series *memSeries
915
for i, s := range a.pendingSamples {
916
series = a.sampleSeries[i]
917
if !series.updateTimestamp(s.T) {
918
a.w.metrics.totalOutOfOrderSamples.Inc()
919
}
920
}
921
for i, s := range a.pendingHistograms {
922
series = a.histogramSeries[i]
923
if !series.updateTimestamp(s.T) {
924
a.w.metrics.totalOutOfOrderSamples.Inc()
925
}
926
}
927
for i, s := range a.pendingFloatHistograms {
928
series = a.floatHistogramSeries[i]
929
if !series.updateTimestamp(s.T) {
930
a.w.metrics.totalOutOfOrderSamples.Inc()
931
}
932
}
933
934
//nolint:staticcheck
935
a.w.bufPool.Put(buf)
936
return a.Rollback()
937
}
938
939
func (a *appender) Rollback() error {
940
a.pendingSeries = a.pendingSeries[:0]
941
a.pendingSamples = a.pendingSamples[:0]
942
a.pendingHistograms = a.pendingHistograms[:0]
943
a.pendingFloatHistograms = a.pendingFloatHistograms[:0]
944
a.pendingExamplars = a.pendingExamplars[:0]
945
a.sampleSeries = a.sampleSeries[:0]
946
a.histogramSeries = a.histogramSeries[:0]
947
a.floatHistogramSeries = a.floatHistogramSeries[:0]
948
a.w.appenderPool.Put(a)
949
return nil
950
}
951
952