Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/otelcol/exporter/otlp/otlp_test.go
4096 views
1
package otlp_test
2
3
import (
4
"context"
5
"fmt"
6
"net"
7
"testing"
8
"time"
9
10
"github.com/go-kit/log/level"
11
"github.com/grafana/agent/component/otelcol"
12
"github.com/grafana/agent/component/otelcol/exporter/otlp"
13
"github.com/grafana/agent/pkg/flow/componenttest"
14
"github.com/grafana/agent/pkg/river"
15
"github.com/grafana/agent/pkg/util"
16
"github.com/grafana/dskit/backoff"
17
"github.com/stretchr/testify/require"
18
"go.opentelemetry.io/collector/pdata/ptrace"
19
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
20
"google.golang.org/grpc"
21
)
22
23
// Test performs a basic integration test which runs the otelcol.exporter.otlp
24
// component and ensures that it can pass data to an OTLP gRPC server.
25
func Test(t *testing.T) {
26
traceCh := make(chan ptrace.Traces)
27
tracesServer := makeTracesServer(t, traceCh)
28
29
ctx := componenttest.TestContext(t)
30
l := util.TestLogger(t)
31
32
ctrl, err := componenttest.NewControllerFromID(l, "otelcol.exporter.otlp")
33
require.NoError(t, err)
34
35
cfg := fmt.Sprintf(`
36
timeout = "250ms"
37
38
client {
39
endpoint = "%s"
40
41
compression = "none"
42
43
tls {
44
insecure = true
45
insecure_skip_verify = true
46
}
47
}
48
`, tracesServer)
49
var args otlp.Arguments
50
require.NoError(t, river.Unmarshal([]byte(cfg), &args))
51
52
go func() {
53
err := ctrl.Run(ctx, args)
54
require.NoError(t, err)
55
}()
56
57
require.NoError(t, ctrl.WaitRunning(time.Second), "component never started")
58
require.NoError(t, ctrl.WaitExports(time.Second), "component never exported anything")
59
60
// Send traces in the background to our exporter.
61
go func() {
62
exports := ctrl.Exports().(otelcol.ConsumerExports)
63
64
bo := backoff.New(ctx, backoff.Config{
65
MinBackoff: 10 * time.Millisecond,
66
MaxBackoff: 100 * time.Millisecond,
67
})
68
for bo.Ongoing() {
69
err := exports.Input.ConsumeTraces(ctx, createTestTraces())
70
if err != nil {
71
level.Error(l).Log("msg", "failed to send traces", "err", err)
72
bo.Wait()
73
continue
74
}
75
76
return
77
}
78
}()
79
80
// Wait for our exporter to finish and pass data to our HTTP server.
81
select {
82
case <-time.After(time.Second):
83
require.FailNow(t, "failed waiting for traces")
84
case tr := <-traceCh:
85
require.Equal(t, 1, tr.SpanCount())
86
}
87
}
88
89
// makeTracesServer returns a host:port which will accept traces over insecure
90
// gRPC.
91
func makeTracesServer(t *testing.T, ch chan ptrace.Traces) string {
92
t.Helper()
93
94
lis, err := net.Listen("tcp", "127.0.0.1:0")
95
require.NoError(t, err)
96
97
srv := grpc.NewServer()
98
ptraceotlp.RegisterGRPCServer(srv, &mockTracesReceiver{ch: ch})
99
100
go func() {
101
err := srv.Serve(lis)
102
require.NoError(t, err)
103
}()
104
t.Cleanup(srv.Stop)
105
106
return lis.Addr().String()
107
}
108
109
type mockTracesReceiver struct {
110
ch chan ptrace.Traces
111
}
112
113
var _ ptraceotlp.GRPCServer = (*mockTracesReceiver)(nil)
114
115
func (ms *mockTracesReceiver) Export(_ context.Context, req ptraceotlp.ExportRequest) (ptraceotlp.ExportResponse, error) {
116
ms.ch <- req.Traces()
117
return ptraceotlp.NewExportResponse(), nil
118
}
119
120
func createTestTraces() ptrace.Traces {
121
// Matches format from the protobuf definition:
122
// https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto
123
var bb = `{
124
"resource_spans": [{
125
"scope_spans": [{
126
"spans": [{
127
"name": "TestSpan"
128
}]
129
}]
130
}]
131
}`
132
133
decoder := &ptrace.JSONUnmarshaler{}
134
data, err := decoder.UnmarshalTraces([]byte(bb))
135
if err != nil {
136
panic(err)
137
}
138
return data
139
}
140
141