Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/otelcol/processor/batch/batch_test.go
4096 views
1
package batch_test
2
3
import (
4
"context"
5
"testing"
6
"time"
7
8
"github.com/go-kit/log/level"
9
"github.com/grafana/agent/component/otelcol"
10
"github.com/grafana/agent/component/otelcol/internal/fakeconsumer"
11
"github.com/grafana/agent/component/otelcol/processor/batch"
12
"github.com/grafana/agent/pkg/flow/componenttest"
13
"github.com/grafana/agent/pkg/river"
14
"github.com/grafana/agent/pkg/util"
15
"github.com/grafana/dskit/backoff"
16
"github.com/stretchr/testify/require"
17
"go.opentelemetry.io/collector/pdata/ptrace"
18
)
19
20
// Test performs a basic integration test which runs the
21
// otelcol.processor.batch component and ensures that it can accept, process, and forward data.
22
func Test(t *testing.T) {
23
ctx := componenttest.TestContext(t)
24
l := util.TestLogger(t)
25
26
ctrl, err := componenttest.NewControllerFromID(l, "otelcol.processor.batch")
27
require.NoError(t, err)
28
29
cfg := `
30
timeout = "10ms"
31
32
output {
33
// no-op: will be overridden by test code.
34
}
35
`
36
var args batch.Arguments
37
require.NoError(t, river.Unmarshal([]byte(cfg), &args))
38
39
// Override our arguments so traces get forwarded to traceCh.
40
traceCh := make(chan ptrace.Traces)
41
args.Output = makeTracesOutput(traceCh)
42
43
go func() {
44
err := ctrl.Run(ctx, args)
45
require.NoError(t, err)
46
}()
47
48
require.NoError(t, ctrl.WaitRunning(time.Second), "component never started")
49
require.NoError(t, ctrl.WaitExports(time.Second), "component never exported anything")
50
51
// Send traces in the background to our processor.
52
go func() {
53
exports := ctrl.Exports().(otelcol.ConsumerExports)
54
55
bo := backoff.New(ctx, backoff.Config{
56
MinBackoff: 10 * time.Millisecond,
57
MaxBackoff: 100 * time.Millisecond,
58
})
59
for bo.Ongoing() {
60
err := exports.Input.ConsumeTraces(ctx, createTestTraces())
61
if err != nil {
62
level.Error(l).Log("msg", "failed to send traces", "err", err)
63
bo.Wait()
64
continue
65
}
66
67
return
68
}
69
}()
70
71
// Wait for our processor to finish and forward data to traceCh.
72
select {
73
case <-time.After(time.Second):
74
require.FailNow(t, "failed waiting for traces")
75
case tr := <-traceCh:
76
require.Equal(t, 1, tr.SpanCount())
77
}
78
}
79
80
// makeTracesOutput returns ConsumerArguments which will forward traces to the
81
// provided channel.
82
func makeTracesOutput(ch chan ptrace.Traces) *otelcol.ConsumerArguments {
83
traceConsumer := fakeconsumer.Consumer{
84
ConsumeTracesFunc: func(ctx context.Context, t ptrace.Traces) error {
85
select {
86
case <-ctx.Done():
87
return ctx.Err()
88
case ch <- t:
89
return nil
90
}
91
},
92
}
93
94
return &otelcol.ConsumerArguments{
95
Traces: []otelcol.Consumer{&traceConsumer},
96
}
97
}
98
99
func createTestTraces() ptrace.Traces {
100
// Matches format from the protobuf definition:
101
// https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto
102
var bb = `{
103
"resource_spans": [{
104
"scope_spans": [{
105
"spans": [{
106
"name": "TestSpan"
107
}]
108
}]
109
}]
110
}`
111
112
decoder := &ptrace.JSONUnmarshaler{}
113
data, err := decoder.UnmarshalTraces([]byte(bb))
114
if err != nil {
115
panic(err)
116
}
117
return data
118
}
119
120