package relabel
import (
"context"
"reflect"
"sync"
"github.com/go-kit/log/level"
"github.com/grafana/agent/component"
"github.com/grafana/agent/component/common/loki"
flow_relabel "github.com/grafana/agent/component/common/relabel"
"github.com/grafana/agent/pkg/river"
lru "github.com/hashicorp/golang-lru"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
)
func init() {
component.Register(component.Registration{
Name: "loki.relabel",
Args: Arguments{},
Exports: Exports{},
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
return New(opts, args.(Arguments))
},
})
}
type Arguments struct {
ForwardTo []loki.LogsReceiver `river:"forward_to,attr"`
RelabelConfigs []*flow_relabel.Config `river:"rule,block,optional"`
MaxCacheSize int `river:"max_cache_size,attr,optional"`
}
var DefaultArguments = Arguments{
MaxCacheSize: 10_000,
}
var _ river.Unmarshaler = (*Arguments)(nil)
func (a *Arguments) UnmarshalRiver(f func(interface{}) error) error {
*a = DefaultArguments
type arguments Arguments
return f((*arguments)(a))
}
type Exports struct {
Receiver loki.LogsReceiver `river:"receiver,attr"`
Rules flow_relabel.Rules `river:"rules,attr"`
}
type Component struct {
opts component.Options
metrics *metrics
mut sync.RWMutex
rcs []*relabel.Config
receiver loki.LogsReceiver
fanout []loki.LogsReceiver
cache *lru.Cache
maxCacheSize int
}
var (
_ component.Component = (*Component)(nil)
)
func New(o component.Options, args Arguments) (*Component, error) {
cache, err := lru.New(args.MaxCacheSize)
if err != nil {
return nil, err
}
c := &Component{
opts: o,
metrics: newMetrics(o.Registerer),
cache: cache,
maxCacheSize: args.MaxCacheSize,
}
c.receiver = make(loki.LogsReceiver)
o.OnStateChange(Exports{Receiver: c.receiver, Rules: args.RelabelConfigs})
if err := c.Update(args); err != nil {
return nil, err
}
return c, nil
}
func (c *Component) Run(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case entry := <-c.receiver:
c.metrics.entriesProcessed.Inc()
lbls := c.relabel(entry)
if len(lbls) == 0 {
level.Debug(c.opts.Logger).Log("msg", "dropping entry after relabeling", "labels", entry.Labels.String())
continue
}
c.metrics.entriesOutgoing.Inc()
entry.Labels = lbls
for _, f := range c.fanout {
select {
case <-ctx.Done():
return nil
case f <- entry:
}
}
}
}
}
func (c *Component) Update(args component.Arguments) error {
c.mut.Lock()
defer c.mut.Unlock()
newArgs := args.(Arguments)
newRCS := flow_relabel.ComponentToPromRelabelConfigs(newArgs.RelabelConfigs)
if relabelingChanged(c.rcs, newRCS) {
level.Debug(c.opts.Logger).Log("msg", "received new relabel configs, purging cache")
c.cache.Purge()
c.metrics.cacheSize.Set(0)
}
if newArgs.MaxCacheSize != c.maxCacheSize {
evicted := c.cache.Resize(newArgs.MaxCacheSize)
if evicted > 0 {
level.Debug(c.opts.Logger).Log("msg", "resizing the cache lead to evicting of items", "len_items_evicted", evicted)
}
}
c.rcs = newRCS
c.fanout = newArgs.ForwardTo
c.opts.OnStateChange(Exports{Receiver: c.receiver, Rules: newArgs.RelabelConfigs})
return nil
}
func relabelingChanged(prev, next []*relabel.Config) bool {
if len(prev) != len(next) {
return true
}
for i := range prev {
if !reflect.DeepEqual(prev[i], next[i]) {
return true
}
}
return false
}
type cacheItem struct {
original model.LabelSet
relabeled model.LabelSet
}
func (c *Component) relabel(e loki.Entry) model.LabelSet {
hash := e.Labels.Fingerprint()
val, found := c.cache.Get(hash)
if found {
for _, ci := range val.([]cacheItem) {
if e.Labels.Equal(ci.original) {
c.metrics.cacheHits.Inc()
return ci.relabeled
}
}
}
c.metrics.cacheMisses.Inc()
relabeled := c.process(e)
if !found {
val = []cacheItem{{e.Labels, relabeled}}
} else {
val = append(val.([]cacheItem), cacheItem{e.Labels, relabeled})
}
c.cache.Add(hash, val)
c.metrics.cacheSize.Set(float64(c.cache.Len()))
return relabeled
}
func (c *Component) process(e loki.Entry) model.LabelSet {
var lbls labels.Labels
for k, v := range e.Labels {
lbls = append(lbls, labels.Label{
Name: string(k),
Value: string(v),
})
}
lbls, _ = relabel.Process(lbls, c.rcs...)
relabeled := make(model.LabelSet, len(lbls))
for i := range lbls {
relabeled[model.LabelName(lbls[i].Name)] = model.LabelValue(lbls[i].Value)
}
return relabeled
}