Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/metrics/wal/util.go
4095 views
1
package wal
2
3
import (
4
"path/filepath"
5
"sync"
6
7
"github.com/prometheus/prometheus/tsdb/record"
8
"github.com/prometheus/prometheus/tsdb/wlog"
9
)
10
11
type walReplayer struct {
12
w wlog.WriteTo
13
}
14
15
func (r walReplayer) Replay(dir string) error {
16
w, err := wlog.Open(nil, dir)
17
if err != nil {
18
return err
19
}
20
21
dir, startFrom, err := wlog.LastCheckpoint(w.Dir())
22
if err != nil && err != record.ErrNotFound {
23
return err
24
}
25
26
if err == nil {
27
sr, err := wlog.NewSegmentsReader(dir)
28
if err != nil {
29
return err
30
}
31
32
err = r.replayWAL(wlog.NewReader(sr))
33
if closeErr := sr.Close(); closeErr != nil && err == nil {
34
err = closeErr
35
}
36
if err != nil {
37
return err
38
}
39
40
startFrom++
41
}
42
43
_, last, err := wlog.Segments(w.Dir())
44
if err != nil {
45
return err
46
}
47
48
for i := startFrom; i <= last; i++ {
49
s, err := wlog.OpenReadSegment(wlog.SegmentName(w.Dir(), i))
50
if err != nil {
51
return err
52
}
53
54
sr := wlog.NewSegmentBufReader(s)
55
err = r.replayWAL(wlog.NewReader(sr))
56
if closeErr := sr.Close(); closeErr != nil && err == nil {
57
err = closeErr
58
}
59
if err != nil {
60
return err
61
}
62
}
63
64
return nil
65
}
66
67
func (r walReplayer) replayWAL(reader *wlog.Reader) error {
68
var dec record.Decoder
69
70
for reader.Next() {
71
rec := reader.Record()
72
switch dec.Type(rec) {
73
case record.Series:
74
series, err := dec.Series(rec, nil)
75
if err != nil {
76
return err
77
}
78
r.w.StoreSeries(series, 0)
79
case record.Samples:
80
samples, err := dec.Samples(rec, nil)
81
if err != nil {
82
return err
83
}
84
r.w.Append(samples)
85
case record.Exemplars:
86
exemplars, err := dec.Exemplars(rec, nil)
87
if err != nil {
88
return err
89
}
90
r.w.AppendExemplars(exemplars)
91
}
92
}
93
94
return nil
95
}
96
97
type walDataCollector struct {
98
mut sync.Mutex
99
samples []record.RefSample
100
series []record.RefSeries
101
exemplars []record.RefExemplar
102
histograms []record.RefHistogramSample
103
floatHistograms []record.RefFloatHistogramSample
104
}
105
106
func (c *walDataCollector) AppendExemplars(exemplars []record.RefExemplar) bool {
107
c.mut.Lock()
108
defer c.mut.Unlock()
109
110
c.exemplars = append(c.exemplars, exemplars...)
111
return true
112
}
113
114
func (c *walDataCollector) Append(samples []record.RefSample) bool {
115
c.mut.Lock()
116
defer c.mut.Unlock()
117
118
c.samples = append(c.samples, samples...)
119
return true
120
}
121
122
func (c *walDataCollector) AppendHistograms(histograms []record.RefHistogramSample) bool {
123
c.mut.Lock()
124
defer c.mut.Unlock()
125
126
c.histograms = append(c.histograms, histograms...)
127
return true
128
}
129
130
func (c *walDataCollector) AppendFloatHistograms(histograms []record.RefFloatHistogramSample) bool {
131
c.mut.Lock()
132
defer c.mut.Unlock()
133
134
c.floatHistograms = append(c.floatHistograms, histograms...)
135
return true
136
}
137
138
func (c *walDataCollector) StoreSeries(series []record.RefSeries, _ int) {
139
c.mut.Lock()
140
defer c.mut.Unlock()
141
142
c.series = append(c.series, series...)
143
}
144
145
func (c *walDataCollector) SeriesReset(_ int) {}
146
147
func (*walDataCollector) UpdateSeriesSegment([]record.RefSeries, int) {}
148
149
// SubDirectory returns the subdirectory within a Storage directory used for
150
// the Prometheus WAL.
151
func SubDirectory(base string) string {
152
return filepath.Join(base, "wal")
153
}
154
155