Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/otelcol/processor/processor_test.go
4096 views
1
package processor_test
2
3
import (
4
"context"
5
"errors"
6
"testing"
7
"time"
8
9
"github.com/grafana/agent/component"
10
"github.com/grafana/agent/component/otelcol"
11
"github.com/grafana/agent/component/otelcol/internal/fakeconsumer"
12
"github.com/grafana/agent/component/otelcol/processor"
13
"github.com/grafana/agent/pkg/flow/componenttest"
14
"github.com/grafana/agent/pkg/util"
15
"github.com/stretchr/testify/require"
16
otelcomponent "go.opentelemetry.io/collector/component"
17
otelconfig "go.opentelemetry.io/collector/config"
18
otelconsumer "go.opentelemetry.io/collector/consumer"
19
"go.opentelemetry.io/collector/pdata/ptrace"
20
)
21
22
func TestProcessor(t *testing.T) {
23
ctx := componenttest.TestContext(t)
24
25
// Create an instance of a fake OpenTelemetry Collector processor which our
26
// Flow component will wrap around. Our fake processor will immediately
27
// forward data to the connected consumer once one is made available to it.
28
var (
29
consumer otelconsumer.Traces
30
31
waitConsumerTrigger = util.NewWaitTrigger()
32
onTracesConsumer = func(t otelconsumer.Traces) {
33
consumer = t
34
waitConsumerTrigger.Trigger()
35
}
36
37
waitTracesTrigger = util.NewWaitTrigger()
38
nextConsumer = &fakeconsumer.Consumer{
39
ConsumeTracesFunc: func(context.Context, ptrace.Traces) error {
40
waitTracesTrigger.Trigger()
41
return nil
42
},
43
}
44
45
// Our fake processor will wait for a consumer to be registered and then
46
// pass along data directly to it.
47
innerProcessor = &fakeProcessor{
48
ConsumeTracesFunc: func(ctx context.Context, td ptrace.Traces) error {
49
require.NoError(t, waitConsumerTrigger.Wait(time.Second), "no next consumer registered")
50
return consumer.ConsumeTraces(ctx, td)
51
},
52
}
53
)
54
55
// Create and start our Flow component. We then wait for it to export a
56
// consumer that we can send data to.
57
te := newTestEnvironment(t, innerProcessor, onTracesConsumer)
58
te.Start(fakeProcessorArgs{
59
Output: &otelcol.ConsumerArguments{
60
Metrics: []otelcol.Consumer{nextConsumer},
61
Logs: []otelcol.Consumer{nextConsumer},
62
Traces: []otelcol.Consumer{nextConsumer},
63
},
64
})
65
66
require.NoError(t, te.Controller.WaitExports(1*time.Second), "test component did not generate exports")
67
ce := te.Controller.Exports().(otelcol.ConsumerExports)
68
69
// Create a test set of traces and send it to our consumer in the background.
70
// We then wait for our channel to receive the traces, indicating that
71
// everything was wired up correctly.
72
go func() {
73
var err error
74
75
for {
76
err = ce.Input.ConsumeTraces(ctx, ptrace.NewTraces())
77
78
if errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
79
// Our component may not have been fully initialized yet. Wait a little
80
// bit before trying again.
81
time.Sleep(100 * time.Millisecond)
82
continue
83
}
84
85
require.NoError(t, err)
86
break
87
}
88
}()
89
90
require.NoError(t, waitTracesTrigger.Wait(time.Second), "consumer did not get invoked")
91
}
92
93
type testEnvironment struct {
94
t *testing.T
95
96
Controller *componenttest.Controller
97
}
98
99
func newTestEnvironment(
100
t *testing.T,
101
fp otelcomponent.TracesProcessor,
102
onTracesConsumer func(t otelconsumer.Traces),
103
) *testEnvironment {
104
105
t.Helper()
106
107
reg := component.Registration{
108
Name: "testcomponent",
109
Args: fakeProcessorArgs{},
110
Exports: otelcol.ConsumerExports{},
111
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
112
// Create a factory which always returns our instance of fakeProcessor
113
// defined above.
114
factory := otelcomponent.NewProcessorFactory(
115
"testcomponent",
116
func() otelconfig.Processor {
117
res, err := fakeProcessorArgs{}.Convert()
118
require.NoError(t, err)
119
return res
120
},
121
otelcomponent.WithTracesProcessor(func(
122
_ context.Context,
123
_ otelcomponent.ProcessorCreateSettings,
124
_ otelconfig.Processor,
125
t otelconsumer.Traces,
126
) (otelcomponent.TracesProcessor, error) {
127
128
onTracesConsumer(t)
129
return fp, nil
130
}, otelcomponent.StabilityLevelUndefined),
131
)
132
133
return processor.New(opts, factory, args.(processor.Arguments))
134
},
135
}
136
137
return &testEnvironment{
138
t: t,
139
Controller: componenttest.NewControllerFromReg(util.TestLogger(t), reg),
140
}
141
}
142
143
func (te *testEnvironment) Start(args component.Arguments) {
144
go func() {
145
ctx := componenttest.TestContext(te.t)
146
err := te.Controller.Run(ctx, args)
147
require.NoError(te.t, err, "failed to run component")
148
}()
149
}
150
151
type fakeProcessorArgs struct {
152
Output *otelcol.ConsumerArguments
153
}
154
155
var _ processor.Arguments = fakeProcessorArgs{}
156
157
func (fa fakeProcessorArgs) Convert() (otelconfig.Processor, error) {
158
settings := otelconfig.NewProcessorSettings(otelconfig.NewComponentID("testcomponent"))
159
return &settings, nil
160
}
161
162
func (fa fakeProcessorArgs) Extensions() map[otelconfig.ComponentID]otelcomponent.Extension {
163
return nil
164
}
165
166
func (fa fakeProcessorArgs) Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter {
167
return nil
168
}
169
170
func (fa fakeProcessorArgs) NextConsumers() *otelcol.ConsumerArguments {
171
return fa.Output
172
}
173
174
type fakeProcessor struct {
175
StartFunc func(ctx context.Context, host otelcomponent.Host) error
176
ShutdownFunc func(ctx context.Context) error
177
CapabilitiesFunc func() otelconsumer.Capabilities
178
ConsumeTracesFunc func(ctx context.Context, td ptrace.Traces) error
179
}
180
181
var _ otelcomponent.TracesProcessor = (*fakeProcessor)(nil)
182
183
func (fe *fakeProcessor) Start(ctx context.Context, host otelcomponent.Host) error {
184
if fe.StartFunc != nil {
185
return fe.StartFunc(ctx, host)
186
}
187
return nil
188
}
189
190
func (fe *fakeProcessor) Shutdown(ctx context.Context) error {
191
if fe.ShutdownFunc != nil {
192
return fe.ShutdownFunc(ctx)
193
}
194
return nil
195
}
196
197
func (fe *fakeProcessor) Capabilities() otelconsumer.Capabilities {
198
if fe.CapabilitiesFunc != nil {
199
return fe.CapabilitiesFunc()
200
}
201
return otelconsumer.Capabilities{}
202
}
203
204
func (fe *fakeProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
205
if fe.ConsumeTracesFunc != nil {
206
return fe.ConsumeTracesFunc(ctx, td)
207
}
208
return nil
209
}
210
211