Path: blob/main/pkg/traces/internal/traceutils/server.go
4096 views
package traceutils12import (3"context"4"fmt"5"math/rand"6"strings"7"testing"8"time"910"github.com/grafana/agent/pkg/util"11"github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanmetricsprocessor/mocks"12"github.com/stretchr/testify/assert"13"go.opentelemetry.io/collector/component"14"go.opentelemetry.io/collector/config"15"go.opentelemetry.io/collector/confmap"16"go.opentelemetry.io/collector/consumer"17"go.opentelemetry.io/collector/pdata/ptrace"18"go.opentelemetry.io/collector/receiver/otlpreceiver"19"go.opentelemetry.io/collector/service/external/configunmarshaler"20"go.opentelemetry.io/collector/service/external/pipelines"21"go.opentelemetry.io/otel/metric"22"go.opentelemetry.io/otel/trace"23"go.uber.org/zap"24"gopkg.in/yaml.v3"25)2627// Server is a Tracing testing server that invokes a function every time a span28// is received.29type Server struct {30pipelines *pipelines.Pipelines31}3233// NewTestServer creates a new Server for testing, where received traces will34// call the callback function. The returned string is the address where traces35// can be sent using OTLP.36func NewTestServer(t *testing.T, callback func(ptrace.Traces)) string {37t.Helper()3839srv, listenAddr, err := NewServerWithRandomPort(callback)40if err != nil {41t.Fatalf("failed to create OTLP server: %s", err)42}43t.Cleanup(func() {44err := srv.Stop()45assert.NoError(t, err)46})4748return listenAddr49}5051// NewServerWithRandomPort calls NewServer with a random port >49152 and52// <65535. It will try up to five times before failing.53func NewServerWithRandomPort(callback func(ptrace.Traces)) (srv *Server, addr string, err error) {54var lastError error5556for i := 0; i < 5; i++ {57port := rand.Intn(65535-49152) + 4915258listenAddr := fmt.Sprintf("127.0.0.1:%d", port)5960srv, err = NewServer(listenAddr, callback)61if err != nil {62lastError = err63continue64}6566return srv, listenAddr, nil67}6869return nil, "", fmt.Errorf("failed 5 times to create a server. last error: %w", lastError)70}7172// NewServer creates an OTLP-accepting server that calls a function when a73// trace is received. This is primarily useful for testing.74func NewServer(addr string, callback func(ptrace.Traces)) (*Server, error) {75conf := util.Untab(fmt.Sprintf(`76processors:77func_processor:78receivers:79otlp:80protocols:81grpc:82endpoint: %s83exporters:84noop:85service:86pipelines:87traces:88receivers: [otlp]89processors: [func_processor]90exporters: [noop]91`, addr))9293var cfg map[string]interface{}94if err := yaml.NewDecoder(strings.NewReader(conf)).Decode(&cfg); err != nil {95panic("could not decode config: " + err.Error())96}9798extensionsFactory, err := component.MakeExtensionFactoryMap()99if err != nil {100return nil, fmt.Errorf("failed to make extension factory map: %w", err)101}102103receiversFactory, err := component.MakeReceiverFactoryMap(otlpreceiver.NewFactory())104if err != nil {105return nil, fmt.Errorf("failed to make receiver factory map: %w", err)106}107108exportersFactory, err := component.MakeExporterFactoryMap(newNoopExporterFactory())109if err != nil {110return nil, fmt.Errorf("failed to make exporter factory map: %w", err)111}112113processorsFactory, err := component.MakeProcessorFactoryMap(114newFuncProcessorFactory(callback),115)116if err != nil {117return nil, fmt.Errorf("failed to make processor factory map: %w", err)118}119120factories := component.Factories{121Extensions: extensionsFactory,122Receivers: receiversFactory,123Processors: processorsFactory,124Exporters: exportersFactory,125}126127configMap := confmap.NewFromStringMap(cfg)128otelCfg, err := configunmarshaler.Unmarshal(configMap, factories)129if err != nil {130return nil, fmt.Errorf("failed to make otel config: %w", err)131}132133var (134logger = zap.NewNop()135startInfo component.BuildInfo136)137138settings := component.TelemetrySettings{139Logger: logger,140TracerProvider: trace.NewNoopTracerProvider(),141MeterProvider: metric.NewNoopMeterProvider(),142}143144pipelines, err := pipelines.Build(context.Background(), pipelines.Settings{145Telemetry: settings,146BuildInfo: startInfo,147148ReceiverFactories: factories.Receivers,149ReceiverConfigs: otelCfg.Receivers,150ProcessorFactories: factories.Processors,151ProcessorConfigs: otelCfg.Processors,152ExporterFactories: factories.Exporters,153ExporterConfigs: otelCfg.Exporters,154155PipelineConfigs: otelCfg.Pipelines,156})157if err != nil {158return nil, fmt.Errorf("failed to build pipelines: %w", err)159}160161h := &mocks.Host{}162h.On("GetExtensions").Return(nil)163if err := pipelines.StartAll(context.Background(), h); err != nil {164return nil, fmt.Errorf("failed to start receivers: %w", err)165}166167return &Server{168pipelines: pipelines,169}, nil170}171172// Stop stops the testing server.173func (s *Server) Stop() error {174shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)175defer cancel()176177return s.pipelines.ShutdownAll(shutdownCtx)178}179180func newFuncProcessorFactory(callback func(ptrace.Traces)) component.ProcessorFactory {181return component.NewProcessorFactory(182"func_processor",183func() config.Processor {184processorSettings := config.NewProcessorSettings(config.NewComponentIDWithName("func_processor", "func_processor"))185return &processorSettings186},187component.WithTracesProcessor(func(188_ context.Context,189_ component.ProcessorCreateSettings,190_ config.Processor,191next consumer.Traces,192) (component.TracesProcessor, error) {193194return &funcProcessor{195Callback: callback,196Next: next,197}, nil198}, component.StabilityLevelUndefined),199)200}201202type funcProcessor struct {203Callback func(ptrace.Traces)204Next consumer.Traces205}206207func (p *funcProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {208if p.Callback != nil {209p.Callback(td)210}211return p.Next.ConsumeTraces(ctx, td)212}213214func (p *funcProcessor) Capabilities() consumer.Capabilities {215return consumer.Capabilities{MutatesData: true}216}217218func (p *funcProcessor) Start(context.Context, component.Host) error { return nil }219func (p *funcProcessor) Shutdown(context.Context) error { return nil }220221func newNoopExporterFactory() component.ExporterFactory {222return component.NewExporterFactory(223"noop",224func() config.Exporter {225exporterSettings := config.NewExporterSettings(config.NewComponentIDWithName("noop", "noop"))226return &exporterSettings227},228component.WithTracesExporter(func(229context.Context,230component.ExporterCreateSettings,231config.Exporter) (232component.TracesExporter,233error) {234235return &noopExporter{}, nil236}, component.StabilityLevelUndefined),237)238}239240type noopExporter struct{}241242func (n noopExporter) Start(context.Context, component.Host) error { return nil }243244func (n noopExporter) Shutdown(context.Context) error { return nil }245246func (n noopExporter) Capabilities() consumer.Capabilities { return consumer.Capabilities{} }247248func (n noopExporter) ConsumeTraces(context.Context, ptrace.Traces) error { return nil }249250251