Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/otelcol/receiver/loki/loki_test.go
4096 views
1
package loki
2
3
import (
4
"context"
5
"testing"
6
"time"
7
8
lokiapi "github.com/grafana/agent/component/common/loki"
9
"github.com/grafana/agent/component/otelcol"
10
"github.com/grafana/agent/component/otelcol/internal/fakeconsumer"
11
"github.com/grafana/agent/pkg/flow/componenttest"
12
"github.com/grafana/agent/pkg/river"
13
"github.com/grafana/agent/pkg/util"
14
"github.com/grafana/loki/pkg/logproto"
15
"github.com/prometheus/common/model"
16
"github.com/stretchr/testify/require"
17
"go.opentelemetry.io/collector/pdata/plog"
18
)
19
20
func Test(t *testing.T) {
21
ctx := componenttest.TestContext(t)
22
l := util.TestLogger(t)
23
24
ctrl, err := componenttest.NewControllerFromID(l, "otelcol.receiver.loki")
25
require.NoError(t, err)
26
27
cfg := `
28
output {
29
// no-op: will be overridden by test code.
30
}
31
`
32
var args Arguments
33
require.NoError(t, river.Unmarshal([]byte(cfg), &args))
34
35
// Override our settings so logs get forwarded to logCh.
36
logCh := make(chan plog.Logs)
37
args.Output = makeLogsOutput(logCh)
38
39
go func() {
40
err := ctrl.Run(ctx, args)
41
require.NoError(t, err)
42
}()
43
44
require.NoError(t, ctrl.WaitRunning(time.Second))
45
require.NoError(t, ctrl.WaitExports(time.Second))
46
47
exports := ctrl.Exports().(Exports)
48
49
// Use the exported receiver to send log entries in the background.
50
go func() {
51
entry := lokiapi.Entry{
52
Labels: map[model.LabelName]model.LabelValue{
53
"filename": "/var/log/app/errors.log",
54
"env": "dev",
55
},
56
Entry: logproto.Entry{
57
Timestamp: time.Now(),
58
Line: "It's super effective!",
59
},
60
}
61
exports.Receiver <- entry
62
}()
63
64
wantAttributes := map[string]interface{}{
65
"env": "dev",
66
"filename": "/var/log/app/errors.log",
67
"log.file.name": "errors.log",
68
"log.file.path": "/var/log/app/errors.log",
69
"loki.attribute.labels": "filename,env",
70
}
71
72
// Wait for our client to get the log.
73
var otelLogs plog.Logs
74
select {
75
case <-time.After(time.Second):
76
require.FailNow(t, "failed waiting for log entry")
77
case otelLogs = <-logCh:
78
require.Equal(t, 1, otelLogs.LogRecordCount())
79
require.Equal(t, "It's super effective!", otelLogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().AsString())
80
require.Equal(t, wantAttributes["env"], otelLogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().AsRaw()["env"])
81
require.Equal(t, wantAttributes["filename"], otelLogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().AsRaw()["filename"])
82
require.Equal(t, wantAttributes["log.file.name"], otelLogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().AsRaw()["log.file.name"])
83
require.Equal(t, wantAttributes["log.file.path"], otelLogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().AsRaw()["log.file.path"])
84
require.Contains(t, otelLogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().AsRaw()["loki.attribute.labels"], "env")
85
require.Contains(t, otelLogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().AsRaw()["loki.attribute.labels"], "filename")
86
}
87
}
88
89
// makeLogsOutput returns a ConsumerArguments which will forward logs to
90
// the provided channel.
91
func makeLogsOutput(ch chan plog.Logs) *otelcol.ConsumerArguments {
92
logsConsumer := fakeconsumer.Consumer{
93
ConsumeLogsFunc: func(ctx context.Context, l plog.Logs) error {
94
select {
95
case <-ctx.Done():
96
return ctx.Err()
97
case ch <- l:
98
return nil
99
}
100
},
101
}
102
103
return &otelcol.ConsumerArguments{
104
Logs: []otelcol.Consumer{&logsConsumer},
105
}
106
}
107
108