Path: blob/main/component/loki/process/process_test.go
4096 views
package process12import (3"context"4"testing"5"time"67"github.com/grafana/agent/component"8"github.com/grafana/agent/component/common/loki"9"github.com/grafana/agent/component/loki/process/internal/stages"10"github.com/grafana/agent/pkg/river"11"github.com/grafana/agent/pkg/util"12"github.com/grafana/loki/pkg/logproto"13"github.com/prometheus/client_golang/prometheus"14"github.com/prometheus/common/model"15"github.com/stretchr/testify/require"16"go.uber.org/goleak"17)1819func TestJSONLabelsStage(t *testing.T) {20defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))2122// The following stages will attempt to parse input lines as JSON.23// The first stage _extract_ any fields found with the correct names:24// Since 'source' is empty, it implies that we want to parse the log line25// itself.26// log --> output27// stream --> stream28// time --> timestamp29// extra --> extra30//31// The second stage will parse the 'extra' field as JSON, and extract the32// 'user' field from the 'extra' field. If the expression value field is33// empty, it is inferred we want to use the same name as the key.34// user --> extra.user35//36// The third stage will set some labels from the extracted values above.37// Again, if the value is empty, it is inferred that we want to use the38// populate the label with extracted value of the same name.39stg := `stage.json {40expressions = {"output" = "log", stream = "stream", timestamp = "time", "extra" = "" }41drop_malformed = true42}43stage.json {44expressions = { "user" = "" }45source = "extra"46}47stage.labels {48values = {49stream = "",50user = "",51ts = "timestamp",52}53}`5455// Unmarshal the River relabel rules into a custom struct, as we don't have56// an easy way to refer to a loki.LogsReceiver value for the forward_to57// argument.58type cfg struct {59Stages []stages.StageConfig `river:"stage,enum"`60}61var stagesCfg cfg62err := river.Unmarshal([]byte(stg), &stagesCfg)63require.NoError(t, err)6465ch1, ch2 := make(loki.LogsReceiver), make(loki.LogsReceiver)6667// Create and run the component, so that it can process and forwards logs.68opts := component.Options{69Logger: util.TestFlowLogger(t),70Registerer: prometheus.NewRegistry(),71OnStateChange: func(e component.Exports) {},72}73args := Arguments{74ForwardTo: []loki.LogsReceiver{ch1, ch2},75Stages: stagesCfg.Stages,76}7778c, err := New(opts, args)79require.NoError(t, err)80ctx, cancel := context.WithCancel(context.Background())81defer cancel()82go c.Run(ctx)8384// Send a log entry to the component's receiver.85ts := time.Now()86logline := `{"log":"log message\n","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z","extra":"{\"user\":\"smith\"}"}`87logEntry := loki.Entry{88Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "foo": "bar"},89Entry: logproto.Entry{90Timestamp: ts,91Line: logline,92},93}9495c.receiver <- logEntry9697wantLabelSet := model.LabelSet{98"filename": "/var/log/pods/agent/agent/1.log",99"foo": "bar",100"stream": "stderr",101"ts": "2019-04-30T02:12:41.8443515Z",102"user": "smith",103}104105// The log entry should be received in both channels, with the processing106// stages correctly applied.107for i := 0; i < 2; i++ {108select {109case logEntry := <-ch1:110require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)111require.Equal(t, logline, logEntry.Line)112require.Equal(t, wantLabelSet, logEntry.Labels)113case logEntry := <-ch2:114require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)115require.Equal(t, logline, logEntry.Line)116require.Equal(t, wantLabelSet, logEntry.Labels)117case <-time.After(5 * time.Second):118require.FailNow(t, "failed waiting for log line")119}120}121}122123func TestStaticLabelsLabelAllowLabelDrop(t *testing.T) {124defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))125126// The following stages manipulate the label set of a log entry.127// The first stage will define a static set of labels (foo, bar, baz, qux)128// to add to the entry along the `filename` and `dev` labels.129// The second stage will drop the foo and bar labels.130// The third stage will keep only a subset of the remaining labels.131stg := `132stage.static_labels {133values = { "foo" = "fooval", "bar" = "barval", "baz" = "bazval", "qux" = "quxval" }134}135stage.label_drop {136values = [ "foo", "bar" ]137}138stage.label_keep {139values = [ "foo", "baz", "filename" ]140}`141142// Unmarshal the River relabel rules into a custom struct, as we don't have143// an easy way to refer to a loki.LogsReceiver value for the forward_to144// argument.145type cfg struct {146Stages []stages.StageConfig `river:"stage,enum"`147}148var stagesCfg cfg149err := river.Unmarshal([]byte(stg), &stagesCfg)150require.NoError(t, err)151152ch1, ch2 := make(loki.LogsReceiver), make(loki.LogsReceiver)153154// Create and run the component, so that it can process and forwards logs.155opts := component.Options{156Logger: util.TestFlowLogger(t),157Registerer: prometheus.NewRegistry(),158OnStateChange: func(e component.Exports) {},159}160args := Arguments{161ForwardTo: []loki.LogsReceiver{ch1, ch2},162Stages: stagesCfg.Stages,163}164165c, err := New(opts, args)166require.NoError(t, err)167ctx, cancel := context.WithCancel(context.Background())168defer cancel()169go c.Run(ctx)170171// Send a log entry to the component's receiver.172ts := time.Now()173logline := `{"log":"log message\n","stream":"stderr","time":"2022-01-09T08:37:45.8233626Z"}`174logEntry := loki.Entry{175Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "env": "dev"},176Entry: logproto.Entry{177Timestamp: ts,178Line: logline,179},180}181182c.receiver <- logEntry183184wantLabelSet := model.LabelSet{185"filename": "/var/log/pods/agent/agent/1.log",186"baz": "bazval",187}188189// The log entry should be received in both channels, with the processing190// stages correctly applied.191for i := 0; i < 2; i++ {192select {193case logEntry := <-ch1:194require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)195require.Equal(t, logline, logEntry.Line)196require.Equal(t, wantLabelSet, logEntry.Labels)197case logEntry := <-ch2:198require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)199require.Equal(t, logline, logEntry.Line)200require.Equal(t, wantLabelSet, logEntry.Labels)201case <-time.After(5 * time.Second):202require.FailNow(t, "failed waiting for log line")203}204}205}206207func TestRegexTimestampOutput(t *testing.T) {208defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))209210// The first stage will attempt to parse the input line using a regular211// expression with named capture groups. The three capture groups (time,212// stream and content) will be extracted in the shared map of values.213// Since 'source' is empty, it implies that we want to parse the log line214// itself.215//216// The second stage will parse the extracted `time` value as Unix epoch217// time and set it to the log entry timestamp.218//219// The third stage will set the `content` value as the message value.220//221// The fourth and final stage will set the `stream` value as the label.222stg := `223stage.regex {224expression = "^(?s)(?P<time>\\S+?) (?P<stream>stdout|stderr) (?P<content>.*)$"225}226stage.timestamp {227source = "time"228format = "RFC3339"229}230stage.output {231source = "content"232}233stage.labels {234values = { src = "stream" }235}`236237// Unmarshal the River relabel rules into a custom struct, as we don't have238// an easy way to refer to a loki.LogsReceiver value for the forward_to239// argument.240type cfg struct {241Stages []stages.StageConfig `river:"stage,enum"`242}243var stagesCfg cfg244err := river.Unmarshal([]byte(stg), &stagesCfg)245require.NoError(t, err)246247ch1, ch2 := make(loki.LogsReceiver), make(loki.LogsReceiver)248249// Create and run the component, so that it can process and forwards logs.250opts := component.Options{251Logger: util.TestFlowLogger(t),252Registerer: prometheus.NewRegistry(),253OnStateChange: func(e component.Exports) {},254}255args := Arguments{256ForwardTo: []loki.LogsReceiver{ch1, ch2},257Stages: stagesCfg.Stages,258}259260c, err := New(opts, args)261require.NoError(t, err)262ctx, cancel := context.WithCancel(context.Background())263defer cancel()264go c.Run(ctx)265266// Send a log entry to the component's receiver.267ts := time.Now()268logline := `2022-01-17T08:17:42-07:00 stderr somewhere, somehow, an error occurred`269logEntry := loki.Entry{270Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "foo": "bar"},271Entry: logproto.Entry{272Timestamp: ts,273Line: logline,274},275}276277c.receiver <- logEntry278279wantLabelSet := model.LabelSet{280"filename": "/var/log/pods/agent/agent/1.log",281"foo": "bar",282"src": "stderr",283}284wantTimestamp, err := time.Parse(time.RFC3339, "2022-01-17T08:17:42-07:00")285wantLogline := `somewhere, somehow, an error occurred`286require.NoError(t, err)287288// The log entry should be received in both channels, with the processing289// stages correctly applied.290for i := 0; i < 2; i++ {291select {292case logEntry := <-ch1:293require.Equal(t, wantLogline, logEntry.Line)294require.Equal(t, wantTimestamp, logEntry.Timestamp)295require.Equal(t, wantLabelSet, logEntry.Labels)296case logEntry := <-ch2:297require.Equal(t, wantLogline, logEntry.Line)298require.Equal(t, wantTimestamp, logEntry.Timestamp)299require.Equal(t, wantLabelSet, logEntry.Labels)300case <-time.After(5 * time.Second):301require.FailNow(t, "failed waiting for log line")302}303}304}305306307