Path: blob/main/component/otelcol/receiver/otlp/otlp_test.go
4096 views
package otlp_test12import (3"context"4"fmt"5"net/http"6"os"7"testing"8"time"910"github.com/go-kit/log/level"11"github.com/grafana/agent/component/otelcol"12"github.com/grafana/agent/component/otelcol/internal/fakeconsumer"13"github.com/grafana/agent/component/otelcol/receiver/otlp"14"github.com/grafana/agent/pkg/flow/componenttest"15"github.com/grafana/agent/pkg/river"16"github.com/grafana/agent/pkg/util"17"github.com/grafana/dskit/backoff"18"github.com/phayes/freeport"19"github.com/stretchr/testify/require"20"go.opentelemetry.io/collector/pdata/ptrace"21)2223// Test performs a basic integration test which runs the otelcol.receiver.otlp24// component and ensures that it can receive and forward data.25func Test(t *testing.T) {26httpAddr := getFreeAddr(t)2728ctx := componenttest.TestContext(t)29l := util.TestLogger(t)3031ctrl, err := componenttest.NewControllerFromID(l, "otelcol.receiver.otlp")32require.NoError(t, err)3334cfg := fmt.Sprintf(`35http {36endpoint = "%s"37}3839output {40// no-op: will be overridden by test code.41}42`, httpAddr)43var args otlp.Arguments44require.NoError(t, river.Unmarshal([]byte(cfg), &args))4546// Override our settings so traces get forwarded to traceCh.47traceCh := make(chan ptrace.Traces)48args.Output = makeTracesOutput(traceCh)4950go func() {51err := ctrl.Run(ctx, args)52require.NoError(t, err)53}()5455require.NoError(t, ctrl.WaitRunning(time.Second))5657// Send traces in the background to our receiver.58go func() {59request := func() error {60f, err := os.Open("testdata/payload.json")61require.NoError(t, err)62defer f.Close()6364tracesURL := fmt.Sprintf("http://%s/v1/traces", httpAddr)65_, err = http.DefaultClient.Post(tracesURL, "application/json", f)66return err67}6869bo := backoff.New(ctx, backoff.Config{70MinBackoff: 10 * time.Millisecond,71MaxBackoff: 100 * time.Millisecond,72})73for bo.Ongoing() {74if err := request(); err != nil {75level.Error(l).Log("msg", "failed to send traces", "err", err)76bo.Wait()77continue78}7980return81}82}()8384// Wait for our client to get a span.85select {86case <-time.After(time.Second):87require.FailNow(t, "failed waiting for traces")88case tr := <-traceCh:89require.Equal(t, 1, tr.SpanCount())90}91}9293// makeTracesOutput returns ConsumerArguments which will forward traces to the94// provided channel.95func makeTracesOutput(ch chan ptrace.Traces) *otelcol.ConsumerArguments {96traceConsumer := fakeconsumer.Consumer{97ConsumeTracesFunc: func(ctx context.Context, t ptrace.Traces) error {98select {99case <-ctx.Done():100return ctx.Err()101case ch <- t:102return nil103}104},105}106107return &otelcol.ConsumerArguments{108Traces: []otelcol.Consumer{&traceConsumer},109}110}111112func getFreeAddr(t *testing.T) string {113t.Helper()114115portNumber, err := freeport.GetFreePort()116require.NoError(t, err)117118return fmt.Sprintf("localhost:%d", portNumber)119}120121122