Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/prometheus/globalrefmap.go
4093 views
1
package prometheus
2
3
import (
4
"sync"
5
"time"
6
7
"github.com/prometheus/prometheus/model/labels"
8
)
9
10
// GlobalRefMapping is used when translating to and from remote writes and the rest of the system (mostly scrapers)
11
// normal components except those should in general NOT need this.
12
var GlobalRefMapping = &GlobalRefMap{}
13
14
func init() {
15
GlobalRefMapping = newGlobalRefMap()
16
}
17
18
// staleDuration determines how often we should wait after a stale value is received to GC that value
19
var staleDuration = time.Minute * 10
20
21
// GlobalRefMap allows conversion from remote_write refids to global refs ids that everything else can use
22
type GlobalRefMap struct {
23
mut sync.Mutex
24
globalRefID uint64
25
mappings map[string]*remoteWriteMapping
26
labelsHashToGlobal map[uint64]uint64
27
staleGlobals map[uint64]*staleMarker
28
}
29
30
type staleMarker struct {
31
globalID uint64
32
lastMarkedStale time.Time
33
labelHash uint64
34
}
35
36
// newGlobalRefMap creates a refmap for usage, there should ONLY be one of these
37
func newGlobalRefMap() *GlobalRefMap {
38
return &GlobalRefMap{
39
globalRefID: 0,
40
mappings: make(map[string]*remoteWriteMapping),
41
labelsHashToGlobal: make(map[uint64]uint64),
42
staleGlobals: make(map[uint64]*staleMarker),
43
}
44
}
45
46
// GetOrAddLink is called by a remote_write endpoint component to add mapping and get back the global id.
47
func (g *GlobalRefMap) GetOrAddLink(componentID string, localRefID uint64, lbls labels.Labels) uint64 {
48
g.mut.Lock()
49
defer g.mut.Unlock()
50
51
// If the mapping doesn't exist then we need to create it
52
m, found := g.mappings[componentID]
53
if !found {
54
m = &remoteWriteMapping{
55
RemoteWriteID: componentID,
56
localToGlobal: make(map[uint64]uint64),
57
globalToLocal: make(map[uint64]uint64),
58
}
59
g.mappings[componentID] = m
60
}
61
62
labelHash := lbls.Hash()
63
globalID, found := g.labelsHashToGlobal[labelHash]
64
if found {
65
m.localToGlobal[localRefID] = globalID
66
m.globalToLocal[globalID] = localRefID
67
return globalID
68
}
69
// We have a value we have never seen before so increment the globalrefid and assign
70
g.globalRefID++
71
g.labelsHashToGlobal[labelHash] = g.globalRefID
72
m.localToGlobal[localRefID] = g.globalRefID
73
m.globalToLocal[g.globalRefID] = localRefID
74
return g.globalRefID
75
}
76
77
// GetOrAddGlobalRefID is used to create a global refid for a labelset
78
func (g *GlobalRefMap) GetOrAddGlobalRefID(l labels.Labels) uint64 {
79
g.mut.Lock()
80
defer g.mut.Unlock()
81
82
// Guard against bad input.
83
if l == nil {
84
return 0
85
}
86
87
labelHash := l.Hash()
88
globalID, found := g.labelsHashToGlobal[labelHash]
89
if found {
90
return globalID
91
}
92
g.globalRefID++
93
g.labelsHashToGlobal[labelHash] = g.globalRefID
94
return g.globalRefID
95
}
96
97
// GetGlobalRefID returns the global refid for a component local combo, or 0 if not found
98
func (g *GlobalRefMap) GetGlobalRefID(componentID string, localRefID uint64) uint64 {
99
g.mut.Lock()
100
defer g.mut.Unlock()
101
102
m, found := g.mappings[componentID]
103
if !found {
104
return 0
105
}
106
global := m.localToGlobal[localRefID]
107
return global
108
}
109
110
// GetLocalRefID returns the local refid for a component global combo, or 0 if not found
111
func (g *GlobalRefMap) GetLocalRefID(componentID string, globalRefID uint64) uint64 {
112
g.mut.Lock()
113
defer g.mut.Unlock()
114
115
m, found := g.mappings[componentID]
116
if !found {
117
return 0
118
}
119
local := m.globalToLocal[globalRefID]
120
return local
121
}
122
123
// AddStaleMarker adds a stale marker
124
func (g *GlobalRefMap) AddStaleMarker(globalRefID uint64, l labels.Labels) {
125
g.mut.Lock()
126
defer g.mut.Unlock()
127
128
g.staleGlobals[globalRefID] = &staleMarker{
129
lastMarkedStale: time.Now(),
130
labelHash: l.Hash(),
131
globalID: globalRefID,
132
}
133
}
134
135
// RemoveStaleMarker removes a stale marker
136
func (g *GlobalRefMap) RemoveStaleMarker(globalRefID uint64) {
137
g.mut.Lock()
138
defer g.mut.Unlock()
139
140
delete(g.staleGlobals, globalRefID)
141
}
142
143
// CheckStaleMarkers is called to garbage collect and items that have grown stale over stale duration (10m)
144
func (g *GlobalRefMap) CheckStaleMarkers() {
145
g.mut.Lock()
146
defer g.mut.Unlock()
147
148
curr := time.Now()
149
idsToBeGCed := make([]*staleMarker, 0)
150
for _, stale := range g.staleGlobals {
151
// If the difference between now and the last time the stale was marked doesn't exceed stale then let it stay
152
if curr.Sub(stale.lastMarkedStale) < staleDuration {
153
continue
154
}
155
idsToBeGCed = append(idsToBeGCed, stale)
156
}
157
for _, marker := range idsToBeGCed {
158
delete(g.staleGlobals, marker.globalID)
159
delete(g.labelsHashToGlobal, marker.labelHash)
160
// Delete our mapping keys
161
for _, mapping := range g.mappings {
162
mapping.deleteStaleIDs(marker.globalID)
163
}
164
}
165
}
166
167