Path: blob/main/component/prometheus/relabel/relabel.go
4095 views
package relabel12import (3"context"4"fmt"5"sync"67"go.uber.org/atomic"89"github.com/prometheus/prometheus/storage"1011"github.com/grafana/agent/component"12flow_relabel "github.com/grafana/agent/component/common/relabel"13"github.com/grafana/agent/component/prometheus"14prometheus_client "github.com/prometheus/client_golang/prometheus"15"github.com/prometheus/prometheus/model/exemplar"16"github.com/prometheus/prometheus/model/labels"17"github.com/prometheus/prometheus/model/metadata"1819"github.com/prometheus/prometheus/model/relabel"20"github.com/prometheus/prometheus/model/value"21)2223func init() {24component.Register(component.Registration{25Name: "prometheus.relabel",26Args: Arguments{},27Exports: Exports{},28Build: func(opts component.Options, args component.Arguments) (component.Component, error) {29return New(opts, args.(Arguments))30},31})32}3334// Arguments holds values which are used to configure the prometheus.relabel35// component.36type Arguments struct {37// Where the relabelled metrics should be forwarded to.38ForwardTo []storage.Appendable `river:"forward_to,attr"`3940// The relabelling rules to apply to each metric before it's forwarded.41MetricRelabelConfigs []*flow_relabel.Config `river:"rule,block,optional"`42}4344// Exports holds values which are exported by the prometheus.relabel component.45type Exports struct {46Receiver storage.Appendable `river:"receiver,attr"`47Rules flow_relabel.Rules `river:"rules,attr"`48}4950// Component implements the prometheus.relabel component.51type Component struct {52mut sync.RWMutex53opts component.Options54mrc []*relabel.Config55receiver *prometheus.Interceptor56metricsProcessed prometheus_client.Counter57metricsOutgoing prometheus_client.Counter58cacheHits prometheus_client.Counter59cacheMisses prometheus_client.Counter60cacheSize prometheus_client.Gauge61fanout *prometheus.Fanout62exited atomic.Bool6364cacheMut sync.RWMutex65cache map[uint64]*labelAndID66}6768var (69_ component.Component = (*Component)(nil)70)7172// New creates a new prometheus.relabel component.73func New(o component.Options, args Arguments) (*Component, error) {74c := &Component{75opts: o,76cache: make(map[uint64]*labelAndID),77}78c.metricsProcessed = prometheus_client.NewCounter(prometheus_client.CounterOpts{79Name: "agent_prometheus_relabel_metrics_processed",80Help: "Total number of metrics processed",81})82c.metricsOutgoing = prometheus_client.NewCounter(prometheus_client.CounterOpts{83Name: "agent_prometheus_relabel_metrics_written",84Help: "Total number of metrics written",85})86c.cacheMisses = prometheus_client.NewCounter(prometheus_client.CounterOpts{87Name: "agent_prometheus_relabel_cache_misses",88Help: "Total number of cache misses",89})90c.cacheHits = prometheus_client.NewCounter(prometheus_client.CounterOpts{91Name: "agent_prometheus_relabel_cache_hits",92Help: "Total number of cache hits",93})94c.cacheSize = prometheus_client.NewGauge(prometheus_client.GaugeOpts{95Name: "agent_prometheus_relabel_cache_size",96Help: "Total size of relabel cache",97})9899var err error100for _, metric := range []prometheus_client.Collector{c.metricsProcessed, c.metricsOutgoing, c.cacheMisses, c.cacheHits, c.cacheSize} {101err = o.Registerer.Register(metric)102if err != nil {103return nil, err104}105}106107c.fanout = prometheus.NewFanout(args.ForwardTo, o.ID, o.Registerer)108c.receiver = prometheus.NewInterceptor(109c.fanout,110prometheus.WithAppendHook(func(_ storage.SeriesRef, l labels.Labels, t int64, v float64, next storage.Appender) (storage.SeriesRef, error) {111if c.exited.Load() {112return 0, fmt.Errorf("%s has exited", o.ID)113}114115newLbl := c.relabel(v, l)116if newLbl == nil {117return 0, nil118}119c.metricsOutgoing.Inc()120return next.Append(0, newLbl, t, v)121}),122prometheus.WithExemplarHook(func(_ storage.SeriesRef, l labels.Labels, e exemplar.Exemplar, next storage.Appender) (storage.SeriesRef, error) {123if c.exited.Load() {124return 0, fmt.Errorf("%s has exited", o.ID)125}126127newLbl := c.relabel(0, l)128if newLbl == nil {129return 0, nil130}131return next.AppendExemplar(0, l, e)132}),133prometheus.WithMetadataHook(func(_ storage.SeriesRef, l labels.Labels, m metadata.Metadata, next storage.Appender) (storage.SeriesRef, error) {134if c.exited.Load() {135return 0, fmt.Errorf("%s has exited", o.ID)136}137138newLbl := c.relabel(0, l)139if newLbl == nil {140return 0, nil141}142return next.UpdateMetadata(0, l, m)143}),144)145146// Immediately export the receiver which remains the same for the component147// lifetime.148o.OnStateChange(Exports{Receiver: c.receiver, Rules: args.MetricRelabelConfigs})149150// Call to Update() to set the relabelling rules once at the start.151if err = c.Update(args); err != nil {152return nil, err153}154155return c, nil156}157158// Run implements component.Component.159func (c *Component) Run(ctx context.Context) error {160defer c.exited.Store(true)161162<-ctx.Done()163return nil164}165166// Update implements component.Component.167func (c *Component) Update(args component.Arguments) error {168c.mut.Lock()169defer c.mut.Unlock()170171newArgs := args.(Arguments)172c.clearCache()173c.mrc = flow_relabel.ComponentToPromRelabelConfigs(newArgs.MetricRelabelConfigs)174c.fanout.UpdateChildren(newArgs.ForwardTo)175176c.opts.OnStateChange(Exports{Receiver: c.receiver, Rules: newArgs.MetricRelabelConfigs})177178return nil179}180181func (c *Component) relabel(val float64, lbls labels.Labels) labels.Labels {182c.mut.RLock()183defer c.mut.RUnlock()184185globalRef := prometheus.GlobalRefMapping.GetOrAddGlobalRefID(lbls)186var relabelled labels.Labels187newLbls, found := c.getFromCache(globalRef)188if found {189c.cacheHits.Inc()190// If newLbls is nil but cache entry was found then we want to keep the value nil, if it's not we want to reuse the labels191if newLbls != nil {192relabelled = newLbls.labels193}194} else {195// Relabel against a copy of the labels to prevent modifying the original196// slice.197relabelled, keep := relabel.Process(lbls.Copy(), c.mrc...)198c.cacheMisses.Inc()199c.cacheSize.Inc()200c.addToCache(globalRef, relabelled, keep)201}202203// If stale remove from the cache, the reason we don't exit early is so the stale value can propagate.204// TODO: (@mattdurham) This caching can leak and likely needs a timed eviction at some point, but this is simple.205// In the future the global ref cache may have some hooks to allow notification of when caches should be evicted.206if value.IsStaleNaN(val) {207c.cacheSize.Dec()208c.deleteFromCache(globalRef)209}210if relabelled == nil {211return nil212}213return relabelled214}215216func (c *Component) getFromCache(id uint64) (*labelAndID, bool) {217c.cacheMut.RLock()218defer c.cacheMut.RUnlock()219220fm, found := c.cache[id]221return fm, found222}223224func (c *Component) deleteFromCache(id uint64) {225c.cacheMut.Lock()226defer c.cacheMut.Unlock()227228delete(c.cache, id)229}230231func (c *Component) clearCache() {232c.cacheMut.Lock()233defer c.cacheMut.Unlock()234235c.cache = make(map[uint64]*labelAndID)236}237238func (c *Component) addToCache(originalID uint64, lbls labels.Labels, keep bool) {239c.cacheMut.Lock()240defer c.cacheMut.Unlock()241242if !keep {243c.cache[originalID] = nil244return245}246newGlobal := prometheus.GlobalRefMapping.GetOrAddGlobalRefID(lbls)247c.cache[originalID] = &labelAndID{248labels: lbls,249id: newGlobal,250}251}252253// labelAndID stores both the globalrefid for the label and the id itself. We store the id so that it doesn't have254// to be recalculated again.255type labelAndID struct {256labels labels.Labels257id uint64258}259260261