Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/prometheus/relabel/relabel.go
4095 views
1
package relabel
2
3
import (
4
"context"
5
"fmt"
6
"sync"
7
8
"go.uber.org/atomic"
9
10
"github.com/prometheus/prometheus/storage"
11
12
"github.com/grafana/agent/component"
13
flow_relabel "github.com/grafana/agent/component/common/relabel"
14
"github.com/grafana/agent/component/prometheus"
15
prometheus_client "github.com/prometheus/client_golang/prometheus"
16
"github.com/prometheus/prometheus/model/exemplar"
17
"github.com/prometheus/prometheus/model/labels"
18
"github.com/prometheus/prometheus/model/metadata"
19
20
"github.com/prometheus/prometheus/model/relabel"
21
"github.com/prometheus/prometheus/model/value"
22
)
23
24
func init() {
25
component.Register(component.Registration{
26
Name: "prometheus.relabel",
27
Args: Arguments{},
28
Exports: Exports{},
29
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
30
return New(opts, args.(Arguments))
31
},
32
})
33
}
34
35
// Arguments holds values which are used to configure the prometheus.relabel
36
// component.
37
type Arguments struct {
38
// Where the relabelled metrics should be forwarded to.
39
ForwardTo []storage.Appendable `river:"forward_to,attr"`
40
41
// The relabelling rules to apply to each metric before it's forwarded.
42
MetricRelabelConfigs []*flow_relabel.Config `river:"rule,block,optional"`
43
}
44
45
// Exports holds values which are exported by the prometheus.relabel component.
46
type Exports struct {
47
Receiver storage.Appendable `river:"receiver,attr"`
48
Rules flow_relabel.Rules `river:"rules,attr"`
49
}
50
51
// Component implements the prometheus.relabel component.
52
type Component struct {
53
mut sync.RWMutex
54
opts component.Options
55
mrc []*relabel.Config
56
receiver *prometheus.Interceptor
57
metricsProcessed prometheus_client.Counter
58
metricsOutgoing prometheus_client.Counter
59
cacheHits prometheus_client.Counter
60
cacheMisses prometheus_client.Counter
61
cacheSize prometheus_client.Gauge
62
fanout *prometheus.Fanout
63
exited atomic.Bool
64
65
cacheMut sync.RWMutex
66
cache map[uint64]*labelAndID
67
}
68
69
var (
70
_ component.Component = (*Component)(nil)
71
)
72
73
// New creates a new prometheus.relabel component.
74
func New(o component.Options, args Arguments) (*Component, error) {
75
c := &Component{
76
opts: o,
77
cache: make(map[uint64]*labelAndID),
78
}
79
c.metricsProcessed = prometheus_client.NewCounter(prometheus_client.CounterOpts{
80
Name: "agent_prometheus_relabel_metrics_processed",
81
Help: "Total number of metrics processed",
82
})
83
c.metricsOutgoing = prometheus_client.NewCounter(prometheus_client.CounterOpts{
84
Name: "agent_prometheus_relabel_metrics_written",
85
Help: "Total number of metrics written",
86
})
87
c.cacheMisses = prometheus_client.NewCounter(prometheus_client.CounterOpts{
88
Name: "agent_prometheus_relabel_cache_misses",
89
Help: "Total number of cache misses",
90
})
91
c.cacheHits = prometheus_client.NewCounter(prometheus_client.CounterOpts{
92
Name: "agent_prometheus_relabel_cache_hits",
93
Help: "Total number of cache hits",
94
})
95
c.cacheSize = prometheus_client.NewGauge(prometheus_client.GaugeOpts{
96
Name: "agent_prometheus_relabel_cache_size",
97
Help: "Total size of relabel cache",
98
})
99
100
var err error
101
for _, metric := range []prometheus_client.Collector{c.metricsProcessed, c.metricsOutgoing, c.cacheMisses, c.cacheHits, c.cacheSize} {
102
err = o.Registerer.Register(metric)
103
if err != nil {
104
return nil, err
105
}
106
}
107
108
c.fanout = prometheus.NewFanout(args.ForwardTo, o.ID, o.Registerer)
109
c.receiver = prometheus.NewInterceptor(
110
c.fanout,
111
prometheus.WithAppendHook(func(_ storage.SeriesRef, l labels.Labels, t int64, v float64, next storage.Appender) (storage.SeriesRef, error) {
112
if c.exited.Load() {
113
return 0, fmt.Errorf("%s has exited", o.ID)
114
}
115
116
newLbl := c.relabel(v, l)
117
if newLbl == nil {
118
return 0, nil
119
}
120
c.metricsOutgoing.Inc()
121
return next.Append(0, newLbl, t, v)
122
}),
123
prometheus.WithExemplarHook(func(_ storage.SeriesRef, l labels.Labels, e exemplar.Exemplar, next storage.Appender) (storage.SeriesRef, error) {
124
if c.exited.Load() {
125
return 0, fmt.Errorf("%s has exited", o.ID)
126
}
127
128
newLbl := c.relabel(0, l)
129
if newLbl == nil {
130
return 0, nil
131
}
132
return next.AppendExemplar(0, l, e)
133
}),
134
prometheus.WithMetadataHook(func(_ storage.SeriesRef, l labels.Labels, m metadata.Metadata, next storage.Appender) (storage.SeriesRef, error) {
135
if c.exited.Load() {
136
return 0, fmt.Errorf("%s has exited", o.ID)
137
}
138
139
newLbl := c.relabel(0, l)
140
if newLbl == nil {
141
return 0, nil
142
}
143
return next.UpdateMetadata(0, l, m)
144
}),
145
)
146
147
// Immediately export the receiver which remains the same for the component
148
// lifetime.
149
o.OnStateChange(Exports{Receiver: c.receiver, Rules: args.MetricRelabelConfigs})
150
151
// Call to Update() to set the relabelling rules once at the start.
152
if err = c.Update(args); err != nil {
153
return nil, err
154
}
155
156
return c, nil
157
}
158
159
// Run implements component.Component.
160
func (c *Component) Run(ctx context.Context) error {
161
defer c.exited.Store(true)
162
163
<-ctx.Done()
164
return nil
165
}
166
167
// Update implements component.Component.
168
func (c *Component) Update(args component.Arguments) error {
169
c.mut.Lock()
170
defer c.mut.Unlock()
171
172
newArgs := args.(Arguments)
173
c.clearCache()
174
c.mrc = flow_relabel.ComponentToPromRelabelConfigs(newArgs.MetricRelabelConfigs)
175
c.fanout.UpdateChildren(newArgs.ForwardTo)
176
177
c.opts.OnStateChange(Exports{Receiver: c.receiver, Rules: newArgs.MetricRelabelConfigs})
178
179
return nil
180
}
181
182
func (c *Component) relabel(val float64, lbls labels.Labels) labels.Labels {
183
c.mut.RLock()
184
defer c.mut.RUnlock()
185
186
globalRef := prometheus.GlobalRefMapping.GetOrAddGlobalRefID(lbls)
187
var relabelled labels.Labels
188
newLbls, found := c.getFromCache(globalRef)
189
if found {
190
c.cacheHits.Inc()
191
// If newLbls is nil but cache entry was found then we want to keep the value nil, if it's not we want to reuse the labels
192
if newLbls != nil {
193
relabelled = newLbls.labels
194
}
195
} else {
196
// Relabel against a copy of the labels to prevent modifying the original
197
// slice.
198
relabelled, keep := relabel.Process(lbls.Copy(), c.mrc...)
199
c.cacheMisses.Inc()
200
c.cacheSize.Inc()
201
c.addToCache(globalRef, relabelled, keep)
202
}
203
204
// If stale remove from the cache, the reason we don't exit early is so the stale value can propagate.
205
// TODO: (@mattdurham) This caching can leak and likely needs a timed eviction at some point, but this is simple.
206
// In the future the global ref cache may have some hooks to allow notification of when caches should be evicted.
207
if value.IsStaleNaN(val) {
208
c.cacheSize.Dec()
209
c.deleteFromCache(globalRef)
210
}
211
if relabelled == nil {
212
return nil
213
}
214
return relabelled
215
}
216
217
func (c *Component) getFromCache(id uint64) (*labelAndID, bool) {
218
c.cacheMut.RLock()
219
defer c.cacheMut.RUnlock()
220
221
fm, found := c.cache[id]
222
return fm, found
223
}
224
225
func (c *Component) deleteFromCache(id uint64) {
226
c.cacheMut.Lock()
227
defer c.cacheMut.Unlock()
228
229
delete(c.cache, id)
230
}
231
232
func (c *Component) clearCache() {
233
c.cacheMut.Lock()
234
defer c.cacheMut.Unlock()
235
236
c.cache = make(map[uint64]*labelAndID)
237
}
238
239
func (c *Component) addToCache(originalID uint64, lbls labels.Labels, keep bool) {
240
c.cacheMut.Lock()
241
defer c.cacheMut.Unlock()
242
243
if !keep {
244
c.cache[originalID] = nil
245
return
246
}
247
newGlobal := prometheus.GlobalRefMapping.GetOrAddGlobalRefID(lbls)
248
c.cache[originalID] = &labelAndID{
249
labels: lbls,
250
id: newGlobal,
251
}
252
}
253
254
// labelAndID stores both the globalrefid for the label and the id itself. We store the id so that it doesn't have
255
// to be recalculated again.
256
type labelAndID struct {
257
labels labels.Labels
258
id uint64
259
}
260
261