Path: blob/main/component/otelcol/exporter/exporter_test.go
4096 views
package exporter_test12import (3"context"4"errors"5"testing"6"time"78"github.com/grafana/agent/component"9"github.com/grafana/agent/component/otelcol"10"github.com/grafana/agent/component/otelcol/exporter"11"github.com/grafana/agent/pkg/flow/componenttest"12"github.com/grafana/agent/pkg/util"13"github.com/stretchr/testify/require"14otelcomponent "go.opentelemetry.io/collector/component"15otelconfig "go.opentelemetry.io/collector/config"16otelconsumer "go.opentelemetry.io/collector/consumer"17"go.opentelemetry.io/collector/pdata/ptrace"18)1920func TestExporter(t *testing.T) {21ctx := componenttest.TestContext(t)2223// Channel where received traces will be written to.24tracesCh := make(chan ptrace.Traces, 1)2526// Create an instance of a fake OpenTelemetry Collector exporter which our27// Flow component will wrap around.28innerExporter := &fakeExporter{29ConsumeTracesFunc: func(_ context.Context, td ptrace.Traces) error {30select {31case tracesCh <- td:32default:33}34return nil35},36}3738// Create and start our Flow component. We then wait for it to export a39// consumer that we can send data to.40te := newTestEnvironment(t, innerExporter)41te.Start()4243require.NoError(t, te.Controller.WaitExports(1*time.Second), "test component did not generate exports")44ce := te.Controller.Exports().(otelcol.ConsumerExports)4546// Create a test set of traces and send it to our consumer in the background.47// We then wait for our channel to receive the traces, indicating that48// everything was wired up correctly.49testTraces := createTestTraces()50go func() {51var err error5253for {54err = ce.Input.ConsumeTraces(ctx, testTraces)5556if errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {57// Our component may not have been fully initialized yet. Wait a little58// bit before trying again.59time.Sleep(100 * time.Millisecond)60continue61}6263require.NoError(t, err)64break65}66}()6768select {69case <-time.After(1 * time.Second):70require.FailNow(t, "testcomponent did not receive traces")71case td := <-tracesCh:72require.Equal(t, testTraces, td)73}74}7576type testEnvironment struct {77t *testing.T7879Controller *componenttest.Controller80}8182func newTestEnvironment(t *testing.T, fe *fakeExporter) *testEnvironment {83t.Helper()8485reg := component.Registration{86Name: "testcomponent",87Args: fakeExporterArgs{},88Exports: otelcol.ConsumerExports{},89Build: func(opts component.Options, args component.Arguments) (component.Component, error) {90// Create a factory which always returns our instance of fakeExporter91// defined above.92factory := otelcomponent.NewExporterFactory(93"testcomponent",94func() otelconfig.Exporter {95res, err := fakeExporterArgs{}.Convert()96require.NoError(t, err)97return res98},99otelcomponent.WithTracesExporter(func(ctx context.Context, ecs otelcomponent.ExporterCreateSettings, e otelconfig.Exporter) (otelcomponent.TracesExporter, error) {100return fe, nil101}, otelcomponent.StabilityLevelUndefined),102)103104return exporter.New(opts, factory, args.(exporter.Arguments))105},106}107108return &testEnvironment{109t: t,110Controller: componenttest.NewControllerFromReg(util.TestLogger(t), reg),111}112}113114func (te *testEnvironment) Start() {115go func() {116ctx := componenttest.TestContext(te.t)117err := te.Controller.Run(ctx, fakeExporterArgs{})118require.NoError(te.t, err, "failed to run component")119}()120}121122type fakeExporterArgs struct {123}124125var _ exporter.Arguments = fakeExporterArgs{}126127func (fa fakeExporterArgs) Convert() (otelconfig.Exporter, error) {128settings := otelconfig.NewExporterSettings(otelconfig.NewComponentID("testcomponent"))129return &settings, nil130}131132func (fa fakeExporterArgs) Extensions() map[otelconfig.ComponentID]otelcomponent.Extension {133return nil134}135136func (fa fakeExporterArgs) Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter {137return nil138}139140type fakeExporter struct {141StartFunc func(ctx context.Context, host otelcomponent.Host) error142ShutdownFunc func(ctx context.Context) error143CapabilitiesFunc func() otelconsumer.Capabilities144ConsumeTracesFunc func(ctx context.Context, td ptrace.Traces) error145}146147var _ otelcomponent.TracesExporter = (*fakeExporter)(nil)148149func (fe *fakeExporter) Start(ctx context.Context, host otelcomponent.Host) error {150if fe.StartFunc != nil {151return fe.StartFunc(ctx, host)152}153return nil154}155156func (fe *fakeExporter) Shutdown(ctx context.Context) error {157if fe.ShutdownFunc != nil {158return fe.ShutdownFunc(ctx)159}160return nil161}162163func (fe *fakeExporter) Capabilities() otelconsumer.Capabilities {164if fe.CapabilitiesFunc != nil {165return fe.CapabilitiesFunc()166}167return otelconsumer.Capabilities{}168}169170func (fe *fakeExporter) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {171if fe.ConsumeTracesFunc != nil {172return fe.ConsumeTracesFunc(ctx, td)173}174return nil175}176177func createTestTraces() ptrace.Traces {178// Matches format from the protobuf definition:179// https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto180var bb = `{181"resource_spans": [{182"scope_spans": [{183"spans": [{184"name": "TestSpan"185}]186}]187}]188}`189190decoder := &ptrace.JSONUnmarshaler{}191data, err := decoder.UnmarshalTraces([]byte(bb))192if err != nil {193panic(err)194}195return data196}197198199