Path: blob/main/component/otelcol/receiver/loki/loki_test.go
4096 views
package loki12import (3"context"4"testing"5"time"67lokiapi "github.com/grafana/agent/component/common/loki"8"github.com/grafana/agent/component/otelcol"9"github.com/grafana/agent/component/otelcol/internal/fakeconsumer"10"github.com/grafana/agent/pkg/flow/componenttest"11"github.com/grafana/agent/pkg/river"12"github.com/grafana/agent/pkg/util"13"github.com/grafana/loki/pkg/logproto"14"github.com/prometheus/common/model"15"github.com/stretchr/testify/require"16"go.opentelemetry.io/collector/pdata/plog"17)1819func Test(t *testing.T) {20ctx := componenttest.TestContext(t)21l := util.TestLogger(t)2223ctrl, err := componenttest.NewControllerFromID(l, "otelcol.receiver.loki")24require.NoError(t, err)2526cfg := `27output {28// no-op: will be overridden by test code.29}30`31var args Arguments32require.NoError(t, river.Unmarshal([]byte(cfg), &args))3334// Override our settings so logs get forwarded to logCh.35logCh := make(chan plog.Logs)36args.Output = makeLogsOutput(logCh)3738go func() {39err := ctrl.Run(ctx, args)40require.NoError(t, err)41}()4243require.NoError(t, ctrl.WaitRunning(time.Second))44require.NoError(t, ctrl.WaitExports(time.Second))4546exports := ctrl.Exports().(Exports)4748// Use the exported receiver to send log entries in the background.49go func() {50entry := lokiapi.Entry{51Labels: map[model.LabelName]model.LabelValue{52"filename": "/var/log/app/errors.log",53"env": "dev",54},55Entry: logproto.Entry{56Timestamp: time.Now(),57Line: "It's super effective!",58},59}60exports.Receiver <- entry61}()6263wantAttributes := map[string]interface{}{64"env": "dev",65"filename": "/var/log/app/errors.log",66"log.file.name": "errors.log",67"log.file.path": "/var/log/app/errors.log",68"loki.attribute.labels": "filename,env",69}7071// Wait for our client to get the log.72var otelLogs plog.Logs73select {74case <-time.After(time.Second):75require.FailNow(t, "failed waiting for log entry")76case otelLogs = <-logCh:77require.Equal(t, 1, otelLogs.LogRecordCount())78require.Equal(t, "It's super effective!", otelLogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().AsString())79require.Equal(t, wantAttributes["env"], otelLogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().AsRaw()["env"])80require.Equal(t, wantAttributes["filename"], otelLogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().AsRaw()["filename"])81require.Equal(t, wantAttributes["log.file.name"], otelLogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().AsRaw()["log.file.name"])82require.Equal(t, wantAttributes["log.file.path"], otelLogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().AsRaw()["log.file.path"])83require.Contains(t, otelLogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().AsRaw()["loki.attribute.labels"], "env")84require.Contains(t, otelLogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().AsRaw()["loki.attribute.labels"], "filename")85}86}8788// makeLogsOutput returns a ConsumerArguments which will forward logs to89// the provided channel.90func makeLogsOutput(ch chan plog.Logs) *otelcol.ConsumerArguments {91logsConsumer := fakeconsumer.Consumer{92ConsumeLogsFunc: func(ctx context.Context, l plog.Logs) error {93select {94case <-ctx.Done():95return ctx.Err()96case ch <- l:97return nil98}99},100}101102return &otelcol.ConsumerArguments{103Logs: []otelcol.Consumer{&logsConsumer},104}105}106107108