Path: blob/main/component/otelcol/processor/processor_test.go
4096 views
package processor_test12import (3"context"4"errors"5"testing"6"time"78"github.com/grafana/agent/component"9"github.com/grafana/agent/component/otelcol"10"github.com/grafana/agent/component/otelcol/internal/fakeconsumer"11"github.com/grafana/agent/component/otelcol/processor"12"github.com/grafana/agent/pkg/flow/componenttest"13"github.com/grafana/agent/pkg/util"14"github.com/stretchr/testify/require"15otelcomponent "go.opentelemetry.io/collector/component"16otelconfig "go.opentelemetry.io/collector/config"17otelconsumer "go.opentelemetry.io/collector/consumer"18"go.opentelemetry.io/collector/pdata/ptrace"19)2021func TestProcessor(t *testing.T) {22ctx := componenttest.TestContext(t)2324// Create an instance of a fake OpenTelemetry Collector processor which our25// Flow component will wrap around. Our fake processor will immediately26// forward data to the connected consumer once one is made available to it.27var (28consumer otelconsumer.Traces2930waitConsumerTrigger = util.NewWaitTrigger()31onTracesConsumer = func(t otelconsumer.Traces) {32consumer = t33waitConsumerTrigger.Trigger()34}3536waitTracesTrigger = util.NewWaitTrigger()37nextConsumer = &fakeconsumer.Consumer{38ConsumeTracesFunc: func(context.Context, ptrace.Traces) error {39waitTracesTrigger.Trigger()40return nil41},42}4344// Our fake processor will wait for a consumer to be registered and then45// pass along data directly to it.46innerProcessor = &fakeProcessor{47ConsumeTracesFunc: func(ctx context.Context, td ptrace.Traces) error {48require.NoError(t, waitConsumerTrigger.Wait(time.Second), "no next consumer registered")49return consumer.ConsumeTraces(ctx, td)50},51}52)5354// Create and start our Flow component. We then wait for it to export a55// consumer that we can send data to.56te := newTestEnvironment(t, innerProcessor, onTracesConsumer)57te.Start(fakeProcessorArgs{58Output: &otelcol.ConsumerArguments{59Metrics: []otelcol.Consumer{nextConsumer},60Logs: []otelcol.Consumer{nextConsumer},61Traces: []otelcol.Consumer{nextConsumer},62},63})6465require.NoError(t, te.Controller.WaitExports(1*time.Second), "test component did not generate exports")66ce := te.Controller.Exports().(otelcol.ConsumerExports)6768// Create a test set of traces and send it to our consumer in the background.69// We then wait for our channel to receive the traces, indicating that70// everything was wired up correctly.71go func() {72var err error7374for {75err = ce.Input.ConsumeTraces(ctx, ptrace.NewTraces())7677if errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {78// Our component may not have been fully initialized yet. Wait a little79// bit before trying again.80time.Sleep(100 * time.Millisecond)81continue82}8384require.NoError(t, err)85break86}87}()8889require.NoError(t, waitTracesTrigger.Wait(time.Second), "consumer did not get invoked")90}9192type testEnvironment struct {93t *testing.T9495Controller *componenttest.Controller96}9798func newTestEnvironment(99t *testing.T,100fp otelcomponent.TracesProcessor,101onTracesConsumer func(t otelconsumer.Traces),102) *testEnvironment {103104t.Helper()105106reg := component.Registration{107Name: "testcomponent",108Args: fakeProcessorArgs{},109Exports: otelcol.ConsumerExports{},110Build: func(opts component.Options, args component.Arguments) (component.Component, error) {111// Create a factory which always returns our instance of fakeProcessor112// defined above.113factory := otelcomponent.NewProcessorFactory(114"testcomponent",115func() otelconfig.Processor {116res, err := fakeProcessorArgs{}.Convert()117require.NoError(t, err)118return res119},120otelcomponent.WithTracesProcessor(func(121_ context.Context,122_ otelcomponent.ProcessorCreateSettings,123_ otelconfig.Processor,124t otelconsumer.Traces,125) (otelcomponent.TracesProcessor, error) {126127onTracesConsumer(t)128return fp, nil129}, otelcomponent.StabilityLevelUndefined),130)131132return processor.New(opts, factory, args.(processor.Arguments))133},134}135136return &testEnvironment{137t: t,138Controller: componenttest.NewControllerFromReg(util.TestLogger(t), reg),139}140}141142func (te *testEnvironment) Start(args component.Arguments) {143go func() {144ctx := componenttest.TestContext(te.t)145err := te.Controller.Run(ctx, args)146require.NoError(te.t, err, "failed to run component")147}()148}149150type fakeProcessorArgs struct {151Output *otelcol.ConsumerArguments152}153154var _ processor.Arguments = fakeProcessorArgs{}155156func (fa fakeProcessorArgs) Convert() (otelconfig.Processor, error) {157settings := otelconfig.NewProcessorSettings(otelconfig.NewComponentID("testcomponent"))158return &settings, nil159}160161func (fa fakeProcessorArgs) Extensions() map[otelconfig.ComponentID]otelcomponent.Extension {162return nil163}164165func (fa fakeProcessorArgs) Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter {166return nil167}168169func (fa fakeProcessorArgs) NextConsumers() *otelcol.ConsumerArguments {170return fa.Output171}172173type fakeProcessor struct {174StartFunc func(ctx context.Context, host otelcomponent.Host) error175ShutdownFunc func(ctx context.Context) error176CapabilitiesFunc func() otelconsumer.Capabilities177ConsumeTracesFunc func(ctx context.Context, td ptrace.Traces) error178}179180var _ otelcomponent.TracesProcessor = (*fakeProcessor)(nil)181182func (fe *fakeProcessor) Start(ctx context.Context, host otelcomponent.Host) error {183if fe.StartFunc != nil {184return fe.StartFunc(ctx, host)185}186return nil187}188189func (fe *fakeProcessor) Shutdown(ctx context.Context) error {190if fe.ShutdownFunc != nil {191return fe.ShutdownFunc(ctx)192}193return nil194}195196func (fe *fakeProcessor) Capabilities() otelconsumer.Capabilities {197if fe.CapabilitiesFunc != nil {198return fe.CapabilitiesFunc()199}200return otelconsumer.Capabilities{}201}202203func (fe *fakeProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {204if fe.ConsumeTracesFunc != nil {205return fe.ConsumeTracesFunc(ctx, td)206}207return nil208}209210211