Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/agentctl/samples.go
4094 views
1
package agentctl
2
3
import (
4
"fmt"
5
"time"
6
7
"github.com/prometheus/prometheus/model/labels"
8
"github.com/prometheus/prometheus/model/timestamp"
9
"github.com/prometheus/prometheus/promql/parser"
10
"github.com/prometheus/prometheus/tsdb/chunks"
11
"github.com/prometheus/prometheus/tsdb/record"
12
"github.com/prometheus/prometheus/tsdb/wlog"
13
)
14
15
// SampleStats are statistics for samples for a series within the WAL. Each
16
// instance represents a unique series based on its labels, and holds the range
17
// of timestamps found for all samples including the total number of samples
18
// for that series.
19
type SampleStats struct {
20
Labels labels.Labels
21
From time.Time
22
To time.Time
23
Samples int64
24
}
25
26
// FindSamples searches the WAL and returns a summary of samples of series
27
// matching the given label selector.
28
func FindSamples(walDir string, selectorStr string) ([]*SampleStats, error) {
29
w, err := wlog.Open(nil, walDir)
30
if err != nil {
31
return nil, err
32
}
33
defer w.Close()
34
35
selector, err := parser.ParseMetricSelector(selectorStr)
36
if err != nil {
37
return nil, err
38
}
39
40
var (
41
labelsByRef = make(map[chunks.HeadSeriesRef]labels.Labels)
42
43
minTSByRef = make(map[chunks.HeadSeriesRef]int64)
44
maxTSByRef = make(map[chunks.HeadSeriesRef]int64)
45
sampleCountByRef = make(map[chunks.HeadSeriesRef]int64)
46
)
47
48
// get the references matching label selector
49
err = walIterate(w, func(r *wlog.Reader) error {
50
return collectSeries(r, selector, labelsByRef)
51
})
52
if err != nil {
53
return nil, fmt.Errorf("could not collect series: %w", err)
54
}
55
56
// find related samples
57
err = walIterate(w, func(r *wlog.Reader) error {
58
return collectSamples(r, labelsByRef, minTSByRef, maxTSByRef, sampleCountByRef)
59
})
60
if err != nil {
61
return nil, fmt.Errorf("could not collect samples: %w", err)
62
}
63
64
series := make([]*SampleStats, 0, len(labelsByRef))
65
for ref, labels := range labelsByRef {
66
series = append(series, &SampleStats{
67
Labels: labels,
68
Samples: sampleCountByRef[ref],
69
From: timestamp.Time(minTSByRef[ref]),
70
To: timestamp.Time(maxTSByRef[ref]),
71
})
72
}
73
74
return series, nil
75
}
76
77
func collectSeries(r *wlog.Reader, selector labels.Selector, labelsByRef map[chunks.HeadSeriesRef]labels.Labels) error {
78
var dec record.Decoder
79
80
for r.Next() {
81
rec := r.Record()
82
83
switch dec.Type(rec) {
84
case record.Series:
85
series, err := dec.Series(rec, nil)
86
if err != nil {
87
return err
88
}
89
for _, s := range series {
90
if selector.Matches(s.Labels) {
91
labelsByRef[s.Ref] = s.Labels.Copy()
92
}
93
}
94
}
95
}
96
97
return r.Err()
98
}
99
100
func collectSamples(r *wlog.Reader, labelsByRef map[chunks.HeadSeriesRef]labels.Labels, minTS, maxTS, sampleCount map[chunks.HeadSeriesRef]int64) error {
101
var dec record.Decoder
102
103
for r.Next() {
104
rec := r.Record()
105
106
switch dec.Type(rec) {
107
case record.Samples:
108
samples, err := dec.Samples(rec, nil)
109
if err != nil {
110
return err
111
}
112
113
for _, s := range samples {
114
// skip unmatched series
115
if _, ok := labelsByRef[s.Ref]; !ok {
116
continue
117
}
118
119
// determine min/max TS
120
if ts, ok := minTS[s.Ref]; !ok || ts > s.T {
121
minTS[s.Ref] = s.T
122
}
123
if ts, ok := maxTS[s.Ref]; !ok || ts < s.T {
124
maxTS[s.Ref] = s.T
125
}
126
127
sampleCount[s.Ref]++
128
}
129
}
130
}
131
132
return r.Err()
133
}
134
135