Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/agentctl/walstats.go
4096 views
1
package agentctl
2
3
import (
4
"math"
5
"time"
6
7
"github.com/prometheus/prometheus/model/timestamp"
8
"github.com/prometheus/prometheus/tsdb/chunks"
9
"github.com/prometheus/prometheus/tsdb/record"
10
"github.com/prometheus/prometheus/tsdb/wlog"
11
)
12
13
// WALStats stores statistics on the whole WAL.
14
type WALStats struct {
15
// From holds the first timestamp for the oldest sample found within the WAL.
16
From time.Time
17
18
// To holds the last timestamp for the newest sample found within the WAL.
19
To time.Time
20
21
// CheckpointNumber is the segment number of the most recently created
22
// checkpoint.
23
CheckpointNumber int
24
25
// FirstSegment is the segment number of the first (oldest) non-checkpoint
26
// segment file found within the WAL folder.
27
FirstSegment int
28
29
// FirstSegment is the segment number of the last (newest) non-checkpoint
30
// segment file found within the WAL folder.
31
LastSegment int
32
33
// InvalidRefs is the number of samples with a ref ID to which there is no
34
// series defined.
35
InvalidRefs int
36
37
// HashCollisions is the total number of times there has been a hash
38
// collision. A hash collision is any instance in which a hash of labels
39
// is defined by two ref IDs.
40
//
41
// For the Grafana Agent, a hash collision has no negative side effects
42
// on data sent to the remote_write endpoint but may have a noticeable impact
43
// on memory while the collision exists.
44
HashCollisions int
45
46
// Targets holds stats on specific scrape targets.
47
Targets []WALTargetStats
48
}
49
50
// Series returns the number of series across all targets.
51
func (s WALStats) Series() int {
52
var series int
53
for _, t := range s.Targets {
54
series += t.Series
55
}
56
return series
57
}
58
59
// Samples returns the number of Samples across all targets.
60
func (s WALStats) Samples() int {
61
var samples int
62
for _, t := range s.Targets {
63
samples += t.Samples
64
}
65
return samples
66
}
67
68
// WALTargetStats aggregates statistics on scrape targets across the entirety
69
// of the WAL and its checkpoints.
70
type WALTargetStats struct {
71
// Job corresponds to the "job" label on the scraped target.
72
Job string
73
74
// Instance corresponds to the "instance" label on the scraped target.
75
Instance string
76
77
// Series is the total number of series for the scraped target. It is
78
// equivalent to the total cardinality.
79
Series int
80
81
// Samples is the total number of samples for the scraped target.
82
Samples int
83
}
84
85
// CalculateStats calculates the statistics of the WAL for the given directory.
86
// walDir must be a folder containing segment files and checkpoint directories.
87
func CalculateStats(walDir string) (WALStats, error) {
88
w, err := wlog.Open(nil, walDir)
89
if err != nil {
90
return WALStats{}, err
91
}
92
defer w.Close()
93
94
return newWALStatsCalculator(w).Calculate()
95
}
96
97
type walStatsCalculator struct {
98
w *wlog.WL
99
100
fromTime int64
101
toTime int64
102
invalidRefs int
103
104
stats []*WALTargetStats
105
106
statsLookup map[chunks.HeadSeriesRef]*WALTargetStats
107
108
// hash -> # ref IDs with that hash
109
hashInstances map[uint64]int
110
}
111
112
func newWALStatsCalculator(w *wlog.WL) *walStatsCalculator {
113
return &walStatsCalculator{
114
w: w,
115
fromTime: math.MaxInt64,
116
statsLookup: make(map[chunks.HeadSeriesRef]*WALTargetStats),
117
hashInstances: make(map[uint64]int),
118
}
119
}
120
121
func (c *walStatsCalculator) Calculate() (WALStats, error) {
122
var (
123
stats WALStats
124
err error
125
)
126
127
_, checkpointIdx, err := wlog.LastCheckpoint(c.w.Dir())
128
if err != nil && err != record.ErrNotFound {
129
return stats, err
130
}
131
132
firstSegment, lastSegment, err := wlog.Segments(c.w.Dir())
133
if err != nil {
134
return stats, err
135
}
136
137
stats.FirstSegment = firstSegment
138
stats.LastSegment = lastSegment
139
stats.CheckpointNumber = checkpointIdx
140
141
// Iterate over the WAL and collect stats. This must be done before the rest
142
// of the function as readWAL populates internal state used for calculating
143
// stats.
144
err = walIterate(c.w, c.readWAL)
145
if err != nil {
146
return stats, err
147
}
148
149
// Fill in the rest of the stats
150
stats.From = timestamp.Time(c.fromTime)
151
stats.To = timestamp.Time(c.toTime)
152
stats.InvalidRefs = c.invalidRefs
153
154
for _, hashCount := range c.hashInstances {
155
if hashCount > 1 {
156
stats.HashCollisions++
157
}
158
}
159
160
for _, tgt := range c.stats {
161
stats.Targets = append(stats.Targets, *tgt)
162
}
163
164
return stats, nil
165
}
166
167
func (c *walStatsCalculator) readWAL(r *wlog.Reader) error {
168
var dec record.Decoder
169
170
for r.Next() {
171
rec := r.Record()
172
173
// We ignore other record types here; we only write records and samples
174
// but we don't want to return an error for an unexpected record type;
175
// doing so would prevent users from getting stats on a traditional
176
// Prometheus WAL, which would be nice to support.
177
switch dec.Type(rec) {
178
case record.Series:
179
series, err := dec.Series(rec, nil)
180
if err != nil {
181
return err
182
}
183
for _, s := range series {
184
var (
185
jobLabel = s.Labels.Get("job")
186
instanceLabel = s.Labels.Get("instance")
187
)
188
189
// Find or create the WALTargetStats for this job/instance pair.
190
var stats *WALTargetStats
191
for _, wts := range c.stats {
192
if wts.Job == jobLabel && wts.Instance == instanceLabel {
193
stats = wts
194
break
195
}
196
}
197
if stats == nil {
198
stats = &WALTargetStats{Job: jobLabel, Instance: instanceLabel}
199
c.stats = append(c.stats, stats)
200
}
201
202
// Every time we get a new series, we want to increment the series
203
// count for the specific job/instance pair, store the ref ID so
204
// samples can modify the stats, and then store the hash of our
205
// labels to detect collisions (or flapping series).
206
stats.Series++
207
c.statsLookup[s.Ref] = stats
208
c.hashInstances[s.Labels.Hash()]++
209
}
210
case record.Samples:
211
samples, err := dec.Samples(rec, nil)
212
if err != nil {
213
return err
214
}
215
for _, s := range samples {
216
if s.T < c.fromTime {
217
c.fromTime = s.T
218
}
219
if s.T > c.toTime {
220
c.toTime = s.T
221
}
222
223
stats := c.statsLookup[s.Ref]
224
if stats == nil {
225
c.invalidRefs++
226
continue
227
}
228
stats.Samples++
229
}
230
}
231
}
232
233
return r.Err()
234
}
235
236
// BySeriesCount can sort a slice of target stats by the count of
237
// series. The slice is sorted in descending order.
238
type BySeriesCount []WALTargetStats
239
240
func (s BySeriesCount) Len() int { return len(s) }
241
func (s BySeriesCount) Less(i, j int) bool { return s[i].Series > s[j].Series }
242
func (s BySeriesCount) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
243
244