Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/otelcol/processor/memorylimiter/memorylimiter_test.go
4096 views
1
package memorylimiter_test
2
3
import (
4
"context"
5
"testing"
6
"time"
7
8
"github.com/go-kit/log/level"
9
"github.com/grafana/agent/component/otelcol"
10
"github.com/grafana/agent/component/otelcol/internal/fakeconsumer"
11
"github.com/grafana/agent/component/otelcol/processor/memorylimiter"
12
"github.com/grafana/agent/pkg/flow/componenttest"
13
"github.com/grafana/agent/pkg/river"
14
"github.com/grafana/agent/pkg/util"
15
"github.com/grafana/dskit/backoff"
16
"github.com/stretchr/testify/require"
17
"go.opentelemetry.io/collector/pdata/ptrace"
18
)
19
20
// Test performs a basic integration test which runs the
21
// otelcol.processor.memory_limiter component and ensures that it can accept,
22
// process, and forward data.
23
func Test(t *testing.T) {
24
ctx := componenttest.TestContext(t)
25
l := util.TestLogger(t)
26
27
ctrl, err := componenttest.NewControllerFromID(l, "otelcol.processor.memory_limiter")
28
require.NoError(t, err)
29
30
cfg := `
31
check_interval = "10ms"
32
limit = "20MiB"
33
34
output {
35
// no-op: will be overridden by test code.
36
}
37
`
38
var args memorylimiter.Arguments
39
require.NoError(t, river.Unmarshal([]byte(cfg), &args))
40
41
// Override our arguments so traces get forwarded to traceCh.
42
traceCh := make(chan ptrace.Traces)
43
args.Output = makeTracesOutput(traceCh)
44
45
go func() {
46
err := ctrl.Run(ctx, args)
47
require.NoError(t, err)
48
}()
49
50
require.NoError(t, ctrl.WaitRunning(time.Second), "component never started")
51
require.NoError(t, ctrl.WaitExports(time.Second), "component never exported anything")
52
53
// Send traces in the background to our processor.
54
go func() {
55
exports := ctrl.Exports().(otelcol.ConsumerExports)
56
57
bo := backoff.New(ctx, backoff.Config{
58
MinBackoff: 10 * time.Millisecond,
59
MaxBackoff: 100 * time.Millisecond,
60
})
61
for bo.Ongoing() {
62
err := exports.Input.ConsumeTraces(ctx, createTestTraces())
63
if err != nil {
64
level.Error(l).Log("msg", "failed to send traces", "err", err)
65
bo.Wait()
66
continue
67
}
68
69
return
70
}
71
}()
72
73
// Wait for our processor to finish and forward data to traceCh.
74
select {
75
case <-time.After(time.Second):
76
require.FailNow(t, "failed waiting for traces")
77
case tr := <-traceCh:
78
require.Equal(t, 1, tr.SpanCount())
79
}
80
}
81
82
// makeTracesOutput returns ConsumerArguments which will forward traces to the
83
// provided channel.
84
func makeTracesOutput(ch chan ptrace.Traces) *otelcol.ConsumerArguments {
85
traceConsumer := fakeconsumer.Consumer{
86
ConsumeTracesFunc: func(ctx context.Context, t ptrace.Traces) error {
87
select {
88
case <-ctx.Done():
89
return ctx.Err()
90
case ch <- t:
91
return nil
92
}
93
},
94
}
95
96
return &otelcol.ConsumerArguments{
97
Traces: []otelcol.Consumer{&traceConsumer},
98
}
99
}
100
101
func createTestTraces() ptrace.Traces {
102
// Matches format from the protobuf definition:
103
// https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto
104
var bb = `{
105
"resource_spans": [{
106
"scope_spans": [{
107
"spans": [{
108
"name": "TestSpan"
109
}]
110
}]
111
}]
112
}`
113
114
decoder := &ptrace.JSONUnmarshaler{}
115
data, err := decoder.UnmarshalTraces([]byte(bb))
116
if err != nil {
117
panic(err)
118
}
119
return data
120
}
121
122