Path: blob/main/component/loki/relabel/relabel_test.go
4096 views
package relabel12import (3"context"4"fmt"5"testing"6"time"78"github.com/grafana/agent/component"9"github.com/grafana/agent/component/common/loki"10flow_relabel "github.com/grafana/agent/component/common/relabel"11"github.com/grafana/agent/pkg/flow/componenttest"12"github.com/grafana/agent/pkg/river"13"github.com/grafana/agent/pkg/util"14"github.com/grafana/loki/pkg/logproto"15"github.com/prometheus/client_golang/prometheus"16"github.com/prometheus/common/model"17"github.com/prometheus/prometheus/model/relabel"1819"github.com/stretchr/testify/require"20)2122// Rename the kubernetes_(.*) labels without the suffix and remove them,23// then set the `environment` label to the value of the namespace.24var rc = `rule {25regex = "kubernetes_(.*)"26replacement = "$1"27action = "labelmap"28}29rule {30regex = "kubernetes_(.*)"31action = "labeldrop"32}33rule {34source_labels = ["namespace"]35target_label = "environment"36action = "replace"37}`3839func TestRelabeling(t *testing.T) {40// Unmarshal the River relabel rules into a custom struct, as we don't have41// an easy way to refer to a loki.LogsReceiver value for the forward_to42// argument.43type cfg struct {44Rcs []*flow_relabel.Config `river:"rule,block,optional"`45}46var relabelConfigs cfg47err := river.Unmarshal([]byte(rc), &relabelConfigs)48require.NoError(t, err)4950ch1, ch2 := make(loki.LogsReceiver), make(loki.LogsReceiver)5152// Create and run the component, so that it relabels and forwards logs.53opts := component.Options{54Logger: util.TestFlowLogger(t),55Registerer: prometheus.NewRegistry(),56OnStateChange: func(e component.Exports) {},57}58args := Arguments{59ForwardTo: []loki.LogsReceiver{ch1, ch2},60RelabelConfigs: relabelConfigs.Rcs,61MaxCacheSize: 10,62}6364c, err := New(opts, args)65require.NoError(t, err)66go c.Run(context.Background())6768// Send a log entry to the component's receiver.69logEntry := loki.Entry{70Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "kubernetes_namespace": "dev", "kubernetes_pod_name": "agent", "foo": "bar"},71Entry: logproto.Entry{72Timestamp: time.Now(),73Line: "very important log",74},75}7677c.receiver <- logEntry7879wantLabelSet := model.LabelSet{80"filename": "/var/log/pods/agent/agent/1.log",81"namespace": "dev",82"pod_name": "agent",83"environment": "dev",84"foo": "bar",85}8687// The log entry should be received in both channels, with the relabeling88// rules correctly applied.89for i := 0; i < 2; i++ {90select {91case logEntry := <-ch1:92require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)93require.Equal(t, "very important log", logEntry.Line)94require.Equal(t, wantLabelSet, logEntry.Labels)95case logEntry := <-ch2:96require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)97require.Equal(t, "very important log", logEntry.Line)98require.Equal(t, wantLabelSet, logEntry.Labels)99case <-time.After(5 * time.Second):100require.FailNow(t, "failed waiting for log line")101}102}103}104105func BenchmarkRelabelComponent(b *testing.B) {106type cfg struct {107Rcs []*flow_relabel.Config `river:"rule,block,optional"`108}109var relabelConfigs cfg110_ = river.Unmarshal([]byte(rc), &relabelConfigs)111ch1 := make(loki.LogsReceiver)112113// Create and run the component, so that it relabels and forwards logs.114opts := component.Options{115Logger: util.TestFlowLogger(b),116Registerer: prometheus.NewRegistry(),117OnStateChange: func(e component.Exports) {},118}119args := Arguments{120ForwardTo: []loki.LogsReceiver{ch1},121RelabelConfigs: relabelConfigs.Rcs,122MaxCacheSize: 500_000,123}124125c, _ := New(opts, args)126ctx, cancel := context.WithCancel(context.Background())127go c.Run(ctx)128129var entry loki.Entry130go func() {131for e := range ch1 {132entry = e133}134}()135136now := time.Now()137for i := 0; i < b.N; i++ {138c.receiver <- loki.Entry{139Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/%d.log", "kubernetes_namespace": "dev", "kubernetes_pod_name": model.LabelValue(fmt.Sprintf("agent-%d", i)), "foo": "bar"},140Entry: logproto.Entry{141Timestamp: now,142Line: "very important log",143},144}145}146147_ = entry148cancel()149}150151func TestCache(t *testing.T) {152type cfg struct {153Rcs []*flow_relabel.Config `river:"rule,block,optional"`154}155var relabelConfigs cfg156err := river.Unmarshal([]byte(rc), &relabelConfigs)157require.NoError(t, err)158159ch1 := make(loki.LogsReceiver)160161// Create and run the component, so that it relabels and forwards logs.162opts := component.Options{163Logger: util.TestFlowLogger(t),164Registerer: prometheus.NewRegistry(),165OnStateChange: func(e component.Exports) {},166}167args := Arguments{168ForwardTo: []loki.LogsReceiver{ch1},169RelabelConfigs: []*flow_relabel.Config{170{171SourceLabels: []string{"name", "A"},172Regex: flow_relabel.Regexp(relabel.MustNewRegexp("(.+)")),173174Action: "replace",175TargetLabel: "env",176Replacement: "staging",177}},178MaxCacheSize: 4,179}180181c, err := New(opts, args)182require.NoError(t, err)183go c.Run(context.Background())184185go func() {186for e := range ch1 {187require.Equal(t, "very important log", e.Line)188}189}()190191e := getEntry()192193lsets := []model.LabelSet{194{"name": "foo"},195{"name": "bar"},196{"name": "baz"},197{"name": "qux"},198{"name": "xyz"},199}200rlsets := []model.LabelSet{201{"env": "staging", "name": "foo"},202{"env": "staging", "name": "bar"},203{"env": "staging", "name": "baz"},204{"env": "staging", "name": "qux"},205{"env": "staging", "name": "xyz"},206}207// Send three entries with different label sets along the receiver.208e.Labels = lsets[0]209c.receiver <- e210e.Labels = lsets[1]211c.receiver <- e212e.Labels = lsets[2]213c.receiver <- e214215time.Sleep(100 * time.Millisecond)216// Let's look into the cache's structure now!217// The cache should have stored each label set by its fingerprint.218for i := 0; i < 3; i++ {219val, ok := c.cache.Get(lsets[i].Fingerprint())220require.True(t, ok)221cached, ok := val.([]cacheItem)222require.True(t, ok)223224// Each cache value should be a 1-item slice, with the correct initial225// and relabeled values applied to it.226require.Len(t, cached, 1)227require.Equal(t, cached[0].original, lsets[i])228require.Equal(t, cached[0].relabeled, rlsets[i])229}230231// Let's send over an entry we've seen before.232// We should've hit the cached path, with no changes to the cache's length233// or the underlying stored value.234e.Labels = lsets[0]235c.receiver <- e236require.Equal(t, c.cache.Len(), 3)237val, _ := c.cache.Get(lsets[0].Fingerprint())238cachedVal := val.([]cacheItem)239require.Len(t, cachedVal, 1)240require.Equal(t, cachedVal[0].original, lsets[0])241require.Equal(t, cachedVal[0].relabeled, rlsets[0])242243// Now, let's try to hit a collision.244// These LabelSets are known to collide (string: 8746e5b6c5f0fb60)245// https://github.com/pstibrany/fnv-1a-64bit-collisions246ls1 := model.LabelSet{"A": "K6sjsNNczPl"}247ls2 := model.LabelSet{"A": "cswpLMIZpwt"}248envls := model.LabelSet{"env": "staging"}249require.Equal(t, ls1.Fingerprint(), ls2.Fingerprint(), "expected labelset fingerprints to collide; have we changed the hashing algorithm?")250251e.Labels = ls1252c.receiver <- e253254e.Labels = ls2255c.receiver <- e256257time.Sleep(100 * time.Millisecond)258// Both of these should be under a single, new cache key which will contain259// both entries.260require.Equal(t, c.cache.Len(), 4)261val, ok := c.cache.Get(ls1.Fingerprint())262require.True(t, ok)263cachedVal = val.([]cacheItem)264require.Len(t, cachedVal, 2)265266require.Equal(t, cachedVal[0].original, ls1)267require.Equal(t, cachedVal[1].original, ls2)268require.Equal(t, cachedVal[0].relabeled, ls1.Merge(envls))269require.Equal(t, cachedVal[1].relabeled, ls2.Merge(envls))270271// Finally, send two more entries, which should fill up the cache and evict272// the Least Recently Used items (lsets[1], and lsets[2]).273e.Labels = lsets[3]274c.receiver <- e275e.Labels = lsets[4]276c.receiver <- e277278require.Equal(t, c.cache.Len(), 4)279wantKeys := []model.Fingerprint{lsets[0].Fingerprint(), ls1.Fingerprint(), lsets[3].Fingerprint(), lsets[4].Fingerprint()}280for i, k := range c.cache.Keys() { // Returns the cache keys in LRU order.281f, ok := k.(model.Fingerprint)282require.True(t, ok)283require.Equal(t, f, wantKeys[i])284}285}286287func TestRuleGetter(t *testing.T) {288// Set up the component Arguments.289originalCfg := `rule {290action = "keep"291source_labels = ["__name__"]292regex = "up"293}294forward_to = []`295var args Arguments296require.NoError(t, river.Unmarshal([]byte(originalCfg), &args))297298// Set up and start the component.299tc, err := componenttest.NewControllerFromID(util.TestLogger(t), "loki.relabel")300require.NoError(t, err)301go func() {302err = tc.Run(componenttest.TestContext(t), args)303require.NoError(t, err)304}()305require.NoError(t, tc.WaitExports(time.Second))306307// Use the getter to retrieve the original relabeling rules.308exports := tc.Exports().(Exports)309gotOriginal := exports.Rules310311// Update the component with new relabeling rules and retrieve them.312updatedCfg := `rule {313action = "drop"314source_labels = ["__name__"]315regex = "up"316}317forward_to = []`318require.NoError(t, river.Unmarshal([]byte(updatedCfg), &args))319320require.NoError(t, tc.Update(args))321exports = tc.Exports().(Exports)322gotUpdated := exports.Rules323324require.NotEqual(t, gotOriginal, gotUpdated)325require.Len(t, gotOriginal, 1)326require.Len(t, gotUpdated, 1)327328require.Equal(t, gotOriginal[0].Action, flow_relabel.Keep)329require.Equal(t, gotUpdated[0].Action, flow_relabel.Drop)330require.Equal(t, gotUpdated[0].SourceLabels, gotOriginal[0].SourceLabels)331require.Equal(t, gotUpdated[0].Regex, gotOriginal[0].Regex)332}333334func getEntry() loki.Entry {335return loki.Entry{336Labels: model.LabelSet{},337Entry: logproto.Entry{338Timestamp: time.Now(),339Line: "very important log",340},341}342}343344345