Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/otelcol/receiver/otlp/otlp_test.go
4096 views
1
package otlp_test
2
3
import (
4
"context"
5
"fmt"
6
"net/http"
7
"os"
8
"testing"
9
"time"
10
11
"github.com/go-kit/log/level"
12
"github.com/grafana/agent/component/otelcol"
13
"github.com/grafana/agent/component/otelcol/internal/fakeconsumer"
14
"github.com/grafana/agent/component/otelcol/receiver/otlp"
15
"github.com/grafana/agent/pkg/flow/componenttest"
16
"github.com/grafana/agent/pkg/river"
17
"github.com/grafana/agent/pkg/util"
18
"github.com/grafana/dskit/backoff"
19
"github.com/phayes/freeport"
20
"github.com/stretchr/testify/require"
21
"go.opentelemetry.io/collector/pdata/ptrace"
22
)
23
24
// Test performs a basic integration test which runs the otelcol.receiver.otlp
25
// component and ensures that it can receive and forward data.
26
func Test(t *testing.T) {
27
httpAddr := getFreeAddr(t)
28
29
ctx := componenttest.TestContext(t)
30
l := util.TestLogger(t)
31
32
ctrl, err := componenttest.NewControllerFromID(l, "otelcol.receiver.otlp")
33
require.NoError(t, err)
34
35
cfg := fmt.Sprintf(`
36
http {
37
endpoint = "%s"
38
}
39
40
output {
41
// no-op: will be overridden by test code.
42
}
43
`, httpAddr)
44
var args otlp.Arguments
45
require.NoError(t, river.Unmarshal([]byte(cfg), &args))
46
47
// Override our settings so traces get forwarded to traceCh.
48
traceCh := make(chan ptrace.Traces)
49
args.Output = makeTracesOutput(traceCh)
50
51
go func() {
52
err := ctrl.Run(ctx, args)
53
require.NoError(t, err)
54
}()
55
56
require.NoError(t, ctrl.WaitRunning(time.Second))
57
58
// Send traces in the background to our receiver.
59
go func() {
60
request := func() error {
61
f, err := os.Open("testdata/payload.json")
62
require.NoError(t, err)
63
defer f.Close()
64
65
tracesURL := fmt.Sprintf("http://%s/v1/traces", httpAddr)
66
_, err = http.DefaultClient.Post(tracesURL, "application/json", f)
67
return err
68
}
69
70
bo := backoff.New(ctx, backoff.Config{
71
MinBackoff: 10 * time.Millisecond,
72
MaxBackoff: 100 * time.Millisecond,
73
})
74
for bo.Ongoing() {
75
if err := request(); err != nil {
76
level.Error(l).Log("msg", "failed to send traces", "err", err)
77
bo.Wait()
78
continue
79
}
80
81
return
82
}
83
}()
84
85
// Wait for our client to get a span.
86
select {
87
case <-time.After(time.Second):
88
require.FailNow(t, "failed waiting for traces")
89
case tr := <-traceCh:
90
require.Equal(t, 1, tr.SpanCount())
91
}
92
}
93
94
// makeTracesOutput returns ConsumerArguments which will forward traces to the
95
// provided channel.
96
func makeTracesOutput(ch chan ptrace.Traces) *otelcol.ConsumerArguments {
97
traceConsumer := fakeconsumer.Consumer{
98
ConsumeTracesFunc: func(ctx context.Context, t ptrace.Traces) error {
99
select {
100
case <-ctx.Done():
101
return ctx.Err()
102
case ch <- t:
103
return nil
104
}
105
},
106
}
107
108
return &otelcol.ConsumerArguments{
109
Traces: []otelcol.Consumer{&traceConsumer},
110
}
111
}
112
113
func getFreeAddr(t *testing.T) string {
114
t.Helper()
115
116
portNumber, err := freeport.GetFreePort()
117
require.NoError(t, err)
118
119
return fmt.Sprintf("localhost:%d", portNumber)
120
}
121
122