Path: blob/main/component/otelcol/exporter/otlp/otlp_test.go
4096 views
package otlp_test12import (3"context"4"fmt"5"net"6"testing"7"time"89"github.com/go-kit/log/level"10"github.com/grafana/agent/component/otelcol"11"github.com/grafana/agent/component/otelcol/exporter/otlp"12"github.com/grafana/agent/pkg/flow/componenttest"13"github.com/grafana/agent/pkg/river"14"github.com/grafana/agent/pkg/util"15"github.com/grafana/dskit/backoff"16"github.com/stretchr/testify/require"17"go.opentelemetry.io/collector/pdata/ptrace"18"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"19"google.golang.org/grpc"20)2122// Test performs a basic integration test which runs the otelcol.exporter.otlp23// component and ensures that it can pass data to an OTLP gRPC server.24func Test(t *testing.T) {25traceCh := make(chan ptrace.Traces)26tracesServer := makeTracesServer(t, traceCh)2728ctx := componenttest.TestContext(t)29l := util.TestLogger(t)3031ctrl, err := componenttest.NewControllerFromID(l, "otelcol.exporter.otlp")32require.NoError(t, err)3334cfg := fmt.Sprintf(`35timeout = "250ms"3637client {38endpoint = "%s"3940compression = "none"4142tls {43insecure = true44insecure_skip_verify = true45}46}47`, tracesServer)48var args otlp.Arguments49require.NoError(t, river.Unmarshal([]byte(cfg), &args))5051go func() {52err := ctrl.Run(ctx, args)53require.NoError(t, err)54}()5556require.NoError(t, ctrl.WaitRunning(time.Second), "component never started")57require.NoError(t, ctrl.WaitExports(time.Second), "component never exported anything")5859// Send traces in the background to our exporter.60go func() {61exports := ctrl.Exports().(otelcol.ConsumerExports)6263bo := backoff.New(ctx, backoff.Config{64MinBackoff: 10 * time.Millisecond,65MaxBackoff: 100 * time.Millisecond,66})67for bo.Ongoing() {68err := exports.Input.ConsumeTraces(ctx, createTestTraces())69if err != nil {70level.Error(l).Log("msg", "failed to send traces", "err", err)71bo.Wait()72continue73}7475return76}77}()7879// Wait for our exporter to finish and pass data to our HTTP server.80select {81case <-time.After(time.Second):82require.FailNow(t, "failed waiting for traces")83case tr := <-traceCh:84require.Equal(t, 1, tr.SpanCount())85}86}8788// makeTracesServer returns a host:port which will accept traces over insecure89// gRPC.90func makeTracesServer(t *testing.T, ch chan ptrace.Traces) string {91t.Helper()9293lis, err := net.Listen("tcp", "127.0.0.1:0")94require.NoError(t, err)9596srv := grpc.NewServer()97ptraceotlp.RegisterGRPCServer(srv, &mockTracesReceiver{ch: ch})9899go func() {100err := srv.Serve(lis)101require.NoError(t, err)102}()103t.Cleanup(srv.Stop)104105return lis.Addr().String()106}107108type mockTracesReceiver struct {109ch chan ptrace.Traces110}111112var _ ptraceotlp.GRPCServer = (*mockTracesReceiver)(nil)113114func (ms *mockTracesReceiver) Export(_ context.Context, req ptraceotlp.ExportRequest) (ptraceotlp.ExportResponse, error) {115ms.ch <- req.Traces()116return ptraceotlp.NewExportResponse(), nil117}118119func createTestTraces() ptrace.Traces {120// Matches format from the protobuf definition:121// https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto122var bb = `{123"resource_spans": [{124"scope_spans": [{125"spans": [{126"name": "TestSpan"127}]128}]129}]130}`131132decoder := &ptrace.JSONUnmarshaler{}133data, err := decoder.UnmarshalTraces([]byte(bb))134if err != nil {135panic(err)136}137return data138}139140141