Path: blob/main/component/otelcol/processor/batch/batch_test.go
4096 views
package batch_test12import (3"context"4"testing"5"time"67"github.com/go-kit/log/level"8"github.com/grafana/agent/component/otelcol"9"github.com/grafana/agent/component/otelcol/internal/fakeconsumer"10"github.com/grafana/agent/component/otelcol/processor/batch"11"github.com/grafana/agent/pkg/flow/componenttest"12"github.com/grafana/agent/pkg/river"13"github.com/grafana/agent/pkg/util"14"github.com/grafana/dskit/backoff"15"github.com/stretchr/testify/require"16"go.opentelemetry.io/collector/pdata/ptrace"17)1819// Test performs a basic integration test which runs the20// otelcol.processor.batch component and ensures that it can accept, process, and forward data.21func Test(t *testing.T) {22ctx := componenttest.TestContext(t)23l := util.TestLogger(t)2425ctrl, err := componenttest.NewControllerFromID(l, "otelcol.processor.batch")26require.NoError(t, err)2728cfg := `29timeout = "10ms"3031output {32// no-op: will be overridden by test code.33}34`35var args batch.Arguments36require.NoError(t, river.Unmarshal([]byte(cfg), &args))3738// Override our arguments so traces get forwarded to traceCh.39traceCh := make(chan ptrace.Traces)40args.Output = makeTracesOutput(traceCh)4142go func() {43err := ctrl.Run(ctx, args)44require.NoError(t, err)45}()4647require.NoError(t, ctrl.WaitRunning(time.Second), "component never started")48require.NoError(t, ctrl.WaitExports(time.Second), "component never exported anything")4950// Send traces in the background to our processor.51go func() {52exports := ctrl.Exports().(otelcol.ConsumerExports)5354bo := backoff.New(ctx, backoff.Config{55MinBackoff: 10 * time.Millisecond,56MaxBackoff: 100 * time.Millisecond,57})58for bo.Ongoing() {59err := exports.Input.ConsumeTraces(ctx, createTestTraces())60if err != nil {61level.Error(l).Log("msg", "failed to send traces", "err", err)62bo.Wait()63continue64}6566return67}68}()6970// Wait for our processor to finish and forward data to traceCh.71select {72case <-time.After(time.Second):73require.FailNow(t, "failed waiting for traces")74case tr := <-traceCh:75require.Equal(t, 1, tr.SpanCount())76}77}7879// makeTracesOutput returns ConsumerArguments which will forward traces to the80// provided channel.81func makeTracesOutput(ch chan ptrace.Traces) *otelcol.ConsumerArguments {82traceConsumer := fakeconsumer.Consumer{83ConsumeTracesFunc: func(ctx context.Context, t ptrace.Traces) error {84select {85case <-ctx.Done():86return ctx.Err()87case ch <- t:88return nil89}90},91}9293return &otelcol.ConsumerArguments{94Traces: []otelcol.Consumer{&traceConsumer},95}96}9798func createTestTraces() ptrace.Traces {99// Matches format from the protobuf definition:100// https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto101var bb = `{102"resource_spans": [{103"scope_spans": [{104"spans": [{105"name": "TestSpan"106}]107}]108}]109}`110111decoder := &ptrace.JSONUnmarshaler{}112data, err := decoder.UnmarshalTraces([]byte(bb))113if err != nil {114panic(err)115}116return data117}118119120