Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/metrics/wal/series.go
4096 views
1
package wal
2
3
// This code is copied from
4
// prometheus/prometheus@7c2de14b0bd74303c2ca6f932b71d4585a29ca75, with only
5
// minor changes for metric names.
6
7
import (
8
"sync"
9
10
"github.com/prometheus/prometheus/model/exemplar"
11
"github.com/prometheus/prometheus/model/labels"
12
"github.com/prometheus/prometheus/tsdb/chunks"
13
)
14
15
// memSeries is a chunkless version of tsdb.memSeries.
16
type memSeries struct {
17
sync.Mutex
18
19
ref chunks.HeadSeriesRef
20
lset labels.Labels
21
22
// Last recorded timestamp. Used by gc to determine if a series is stale.
23
lastTs int64
24
}
25
26
// updateTimestamp obtains the lock on s and will attempt to update lastTs.
27
// fails if newTs < lastTs.
28
func (m *memSeries) updateTimestamp(newTs int64) bool {
29
m.Lock()
30
defer m.Unlock()
31
if newTs >= m.lastTs {
32
m.lastTs = newTs
33
return true
34
}
35
return false
36
}
37
38
// seriesHashmap is a simple hashmap for memSeries by their label set.
39
// It is built on top of a regular hashmap and holds a slice of series to
40
// resolve hash collisions. Its methods require the hash to be submitted
41
// with the label set to avoid re-computing hash throughout the code.
42
type seriesHashmap map[uint64][]*memSeries
43
44
func (m seriesHashmap) Get(hash uint64, lset labels.Labels) *memSeries {
45
for _, s := range m[hash] {
46
if labels.Equal(s.lset, lset) {
47
return s
48
}
49
}
50
return nil
51
}
52
53
func (m seriesHashmap) Set(hash uint64, s *memSeries) {
54
seriesSet := m[hash]
55
for i, prev := range seriesSet {
56
if labels.Equal(prev.lset, s.lset) {
57
seriesSet[i] = s
58
return
59
}
60
}
61
m[hash] = append(seriesSet, s)
62
}
63
64
func (m seriesHashmap) Delete(hash uint64, ref chunks.HeadSeriesRef) {
65
var rem []*memSeries
66
for _, s := range m[hash] {
67
if s.ref != ref {
68
rem = append(rem, s)
69
}
70
}
71
if len(rem) == 0 {
72
delete(m, hash)
73
} else {
74
m[hash] = rem
75
}
76
}
77
78
// stripeSeries locks modulo ranges of IDs and hashes to reduce lock
79
// contention. The locks are padded to not be on the same cache line.
80
// Filling the padded space with the maps was profiled to be slower -
81
// likely due to the additional pointer dereferences.
82
type stripeSeries struct {
83
size int
84
series []map[chunks.HeadSeriesRef]*memSeries
85
hashes []seriesHashmap
86
exemplars []map[chunks.HeadSeriesRef]*exemplar.Exemplar
87
locks []stripeLock
88
89
gcMut sync.Mutex
90
}
91
92
type stripeLock struct {
93
sync.RWMutex
94
// Padding to avoid multiple locks being on the same cache line.
95
_ [40]byte
96
}
97
98
func newStripeSeries(stripeSize int) *stripeSeries {
99
s := &stripeSeries{
100
size: stripeSize,
101
series: make([]map[chunks.HeadSeriesRef]*memSeries, stripeSize),
102
hashes: make([]seriesHashmap, stripeSize),
103
exemplars: make([]map[chunks.HeadSeriesRef]*exemplar.Exemplar, stripeSize),
104
locks: make([]stripeLock, stripeSize),
105
}
106
for i := range s.series {
107
s.series[i] = map[chunks.HeadSeriesRef]*memSeries{}
108
}
109
for i := range s.hashes {
110
s.hashes[i] = seriesHashmap{}
111
}
112
for i := range s.exemplars {
113
s.exemplars[i] = map[chunks.HeadSeriesRef]*exemplar.Exemplar{}
114
}
115
return s
116
}
117
118
// gc garbage collects old chunks that are strictly before mint and removes
119
// series entirely that have no chunks left.
120
func (s *stripeSeries) gc(mint int64) map[chunks.HeadSeriesRef]struct{} {
121
// NOTE(rfratto): GC will grab two locks, one for the hash and the other for
122
// series. It's not valid for any other function to grab both locks,
123
// otherwise a deadlock might occur when running GC in parallel with
124
// appending.
125
s.gcMut.Lock()
126
defer s.gcMut.Unlock()
127
128
deleted := map[chunks.HeadSeriesRef]struct{}{}
129
for hashLock := 0; hashLock < s.size; hashLock++ {
130
s.locks[hashLock].Lock()
131
132
for hash, all := range s.hashes[hashLock] {
133
for _, series := range all {
134
series.Lock()
135
136
// Any series that has received a write since mint is still alive.
137
if series.lastTs >= mint {
138
series.Unlock()
139
continue
140
}
141
142
// The series is stale. We need to obtain a second lock for the
143
// ref if it's different than the hash lock.
144
refLock := int(series.ref) & (s.size - 1)
145
if hashLock != refLock {
146
s.locks[refLock].Lock()
147
}
148
149
deleted[series.ref] = struct{}{}
150
delete(s.series[refLock], series.ref)
151
s.hashes[hashLock].Delete(hash, series.ref)
152
153
// Since the series is gone, we'll also delete
154
// the latest stored exemplar.
155
delete(s.exemplars[refLock], series.ref)
156
157
if hashLock != refLock {
158
s.locks[refLock].Unlock()
159
}
160
series.Unlock()
161
}
162
}
163
164
s.locks[hashLock].Unlock()
165
}
166
167
return deleted
168
}
169
170
func (s *stripeSeries) GetByID(id chunks.HeadSeriesRef) *memSeries {
171
refLock := uint64(id) & uint64(s.size-1)
172
s.locks[refLock].RLock()
173
defer s.locks[refLock].RUnlock()
174
return s.series[refLock][id]
175
}
176
177
func (s *stripeSeries) GetByHash(hash uint64, lset labels.Labels) *memSeries {
178
hashLock := hash & uint64(s.size-1)
179
180
s.locks[hashLock].RLock()
181
defer s.locks[hashLock].RUnlock()
182
return s.hashes[hashLock].Get(hash, lset)
183
}
184
185
func (s *stripeSeries) Set(hash uint64, series *memSeries) {
186
var (
187
hashLock = hash & uint64(s.size-1)
188
refLock = uint64(series.ref) & uint64(s.size-1)
189
)
190
191
// We can't hold both locks at once otherwise we might deadlock with a
192
// simultaneous call to GC.
193
//
194
// We update s.series first because GC expects anything in s.hashes to
195
// already exist in s.series.
196
s.locks[refLock].Lock()
197
s.series[refLock][series.ref] = series
198
s.locks[refLock].Unlock()
199
200
s.locks[hashLock].Lock()
201
s.hashes[hashLock].Set(hash, series)
202
s.locks[hashLock].Unlock()
203
}
204
205
func (s *stripeSeries) GetLatestExemplar(ref chunks.HeadSeriesRef) *exemplar.Exemplar {
206
i := uint64(ref) & uint64(s.size-1)
207
208
s.locks[i].RLock()
209
exemplar := s.exemplars[i][ref]
210
s.locks[i].RUnlock()
211
212
return exemplar
213
}
214
215
func (s *stripeSeries) SetLatestExemplar(ref chunks.HeadSeriesRef, exemplar *exemplar.Exemplar) {
216
i := uint64(ref) & uint64(s.size-1)
217
218
// Make sure that's a valid series id and record its latest exemplar
219
s.locks[i].Lock()
220
if s.series[i][ref] != nil {
221
s.exemplars[i][ref] = exemplar
222
}
223
s.locks[i].Unlock()
224
}
225
226
func (s *stripeSeries) iterator() *stripeSeriesIterator {
227
return &stripeSeriesIterator{s}
228
}
229
230
// stripeSeriesIterator allows to iterate over series through a channel.
231
// The channel should always be completely consumed to not leak.
232
type stripeSeriesIterator struct {
233
s *stripeSeries
234
}
235
236
func (it *stripeSeriesIterator) Channel() <-chan *memSeries {
237
ret := make(chan *memSeries)
238
239
go func() {
240
for i := 0; i < it.s.size; i++ {
241
it.s.locks[i].RLock()
242
243
for _, series := range it.s.series[i] {
244
series.Lock()
245
246
j := int(series.lset.Hash()) & (it.s.size - 1)
247
if i != j {
248
it.s.locks[j].RLock()
249
}
250
251
ret <- series
252
253
if i != j {
254
it.s.locks[j].RUnlock()
255
}
256
series.Unlock()
257
}
258
259
it.s.locks[i].RUnlock()
260
}
261
262
close(ret)
263
}()
264
265
return ret
266
}
267
268