Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/otelcol/exporter/exporter_test.go
4096 views
1
package exporter_test
2
3
import (
4
"context"
5
"errors"
6
"testing"
7
"time"
8
9
"github.com/grafana/agent/component"
10
"github.com/grafana/agent/component/otelcol"
11
"github.com/grafana/agent/component/otelcol/exporter"
12
"github.com/grafana/agent/pkg/flow/componenttest"
13
"github.com/grafana/agent/pkg/util"
14
"github.com/stretchr/testify/require"
15
otelcomponent "go.opentelemetry.io/collector/component"
16
otelconfig "go.opentelemetry.io/collector/config"
17
otelconsumer "go.opentelemetry.io/collector/consumer"
18
"go.opentelemetry.io/collector/pdata/ptrace"
19
)
20
21
func TestExporter(t *testing.T) {
22
ctx := componenttest.TestContext(t)
23
24
// Channel where received traces will be written to.
25
tracesCh := make(chan ptrace.Traces, 1)
26
27
// Create an instance of a fake OpenTelemetry Collector exporter which our
28
// Flow component will wrap around.
29
innerExporter := &fakeExporter{
30
ConsumeTracesFunc: func(_ context.Context, td ptrace.Traces) error {
31
select {
32
case tracesCh <- td:
33
default:
34
}
35
return nil
36
},
37
}
38
39
// Create and start our Flow component. We then wait for it to export a
40
// consumer that we can send data to.
41
te := newTestEnvironment(t, innerExporter)
42
te.Start()
43
44
require.NoError(t, te.Controller.WaitExports(1*time.Second), "test component did not generate exports")
45
ce := te.Controller.Exports().(otelcol.ConsumerExports)
46
47
// Create a test set of traces and send it to our consumer in the background.
48
// We then wait for our channel to receive the traces, indicating that
49
// everything was wired up correctly.
50
testTraces := createTestTraces()
51
go func() {
52
var err error
53
54
for {
55
err = ce.Input.ConsumeTraces(ctx, testTraces)
56
57
if errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
58
// Our component may not have been fully initialized yet. Wait a little
59
// bit before trying again.
60
time.Sleep(100 * time.Millisecond)
61
continue
62
}
63
64
require.NoError(t, err)
65
break
66
}
67
}()
68
69
select {
70
case <-time.After(1 * time.Second):
71
require.FailNow(t, "testcomponent did not receive traces")
72
case td := <-tracesCh:
73
require.Equal(t, testTraces, td)
74
}
75
}
76
77
type testEnvironment struct {
78
t *testing.T
79
80
Controller *componenttest.Controller
81
}
82
83
func newTestEnvironment(t *testing.T, fe *fakeExporter) *testEnvironment {
84
t.Helper()
85
86
reg := component.Registration{
87
Name: "testcomponent",
88
Args: fakeExporterArgs{},
89
Exports: otelcol.ConsumerExports{},
90
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
91
// Create a factory which always returns our instance of fakeExporter
92
// defined above.
93
factory := otelcomponent.NewExporterFactory(
94
"testcomponent",
95
func() otelconfig.Exporter {
96
res, err := fakeExporterArgs{}.Convert()
97
require.NoError(t, err)
98
return res
99
},
100
otelcomponent.WithTracesExporter(func(ctx context.Context, ecs otelcomponent.ExporterCreateSettings, e otelconfig.Exporter) (otelcomponent.TracesExporter, error) {
101
return fe, nil
102
}, otelcomponent.StabilityLevelUndefined),
103
)
104
105
return exporter.New(opts, factory, args.(exporter.Arguments))
106
},
107
}
108
109
return &testEnvironment{
110
t: t,
111
Controller: componenttest.NewControllerFromReg(util.TestLogger(t), reg),
112
}
113
}
114
115
func (te *testEnvironment) Start() {
116
go func() {
117
ctx := componenttest.TestContext(te.t)
118
err := te.Controller.Run(ctx, fakeExporterArgs{})
119
require.NoError(te.t, err, "failed to run component")
120
}()
121
}
122
123
type fakeExporterArgs struct {
124
}
125
126
var _ exporter.Arguments = fakeExporterArgs{}
127
128
func (fa fakeExporterArgs) Convert() (otelconfig.Exporter, error) {
129
settings := otelconfig.NewExporterSettings(otelconfig.NewComponentID("testcomponent"))
130
return &settings, nil
131
}
132
133
func (fa fakeExporterArgs) Extensions() map[otelconfig.ComponentID]otelcomponent.Extension {
134
return nil
135
}
136
137
func (fa fakeExporterArgs) Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter {
138
return nil
139
}
140
141
type fakeExporter struct {
142
StartFunc func(ctx context.Context, host otelcomponent.Host) error
143
ShutdownFunc func(ctx context.Context) error
144
CapabilitiesFunc func() otelconsumer.Capabilities
145
ConsumeTracesFunc func(ctx context.Context, td ptrace.Traces) error
146
}
147
148
var _ otelcomponent.TracesExporter = (*fakeExporter)(nil)
149
150
func (fe *fakeExporter) Start(ctx context.Context, host otelcomponent.Host) error {
151
if fe.StartFunc != nil {
152
return fe.StartFunc(ctx, host)
153
}
154
return nil
155
}
156
157
func (fe *fakeExporter) Shutdown(ctx context.Context) error {
158
if fe.ShutdownFunc != nil {
159
return fe.ShutdownFunc(ctx)
160
}
161
return nil
162
}
163
164
func (fe *fakeExporter) Capabilities() otelconsumer.Capabilities {
165
if fe.CapabilitiesFunc != nil {
166
return fe.CapabilitiesFunc()
167
}
168
return otelconsumer.Capabilities{}
169
}
170
171
func (fe *fakeExporter) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
172
if fe.ConsumeTracesFunc != nil {
173
return fe.ConsumeTracesFunc(ctx, td)
174
}
175
return nil
176
}
177
178
func createTestTraces() ptrace.Traces {
179
// Matches format from the protobuf definition:
180
// https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto
181
var bb = `{
182
"resource_spans": [{
183
"scope_spans": [{
184
"spans": [{
185
"name": "TestSpan"
186
}]
187
}]
188
}]
189
}`
190
191
decoder := &ptrace.JSONUnmarshaler{}
192
data, err := decoder.UnmarshalTraces([]byte(bb))
193
if err != nil {
194
panic(err)
195
}
196
return data
197
}
198
199