Path: blob/main/component/otelcol/processor/processor.go
4096 views
// Package processor exposes utilities to create a Flow component from1// OpenTelemetry Collector processors.2package processor34import (5"context"6"errors"7"os"89"github.com/grafana/agent/component"10"github.com/grafana/agent/component/otelcol"11"github.com/grafana/agent/component/otelcol/internal/fanoutconsumer"12"github.com/grafana/agent/component/otelcol/internal/lazycollector"13"github.com/grafana/agent/component/otelcol/internal/lazyconsumer"14"github.com/grafana/agent/component/otelcol/internal/scheduler"15"github.com/grafana/agent/pkg/build"16"github.com/grafana/agent/pkg/util/zapadapter"17"github.com/prometheus/client_golang/prometheus"18otelcomponent "go.opentelemetry.io/collector/component"19otelconfig "go.opentelemetry.io/collector/config"20sdkprometheus "go.opentelemetry.io/otel/exporters/prometheus"21"go.opentelemetry.io/otel/sdk/metric"2223_ "github.com/grafana/agent/component/otelcol/internal/featuregate" // Enable needed feature gates24)2526// Arguments is an extension of component.Arguments which contains necessary27// settings for OpenTelemetry Collector processors.28type Arguments interface {29component.Arguments3031// Convert converts the Arguments into an OpenTelemetry Collector processor32// configuration.33Convert() (otelconfig.Processor, error)3435// Extensions returns the set of extensions that the configured component is36// allowed to use.37Extensions() map[otelconfig.ComponentID]otelcomponent.Extension3839// Exporters returns the set of exporters that are exposed to the configured40// component.41Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter4243// NextConsumers returns the set of consumers to send data to.44NextConsumers() *otelcol.ConsumerArguments45}4647// Processor is a Flow component shim which manages an OpenTelemetry Collector48// processor component.49type Processor struct {50ctx context.Context51cancel context.CancelFunc5253opts component.Options54factory otelcomponent.ProcessorFactory55consumer *lazyconsumer.Consumer5657sched *scheduler.Scheduler58collector *lazycollector.Collector59}6061var (62_ component.Component = (*Processor)(nil)63_ component.HealthComponent = (*Processor)(nil)64)6566// New creates a new Flow component which encapsulates an OpenTelemetry67// Collector processor. args must hold a value of the argument type registered68// with the Flow component.69//70// The registered component must be registered to export the71// otelcol.ConsumerExports type, otherwise New will panic.72func New(opts component.Options, f otelcomponent.ProcessorFactory, args Arguments) (*Processor, error) {73ctx, cancel := context.WithCancel(context.Background())7475consumer := lazyconsumer.New(ctx)7677// Create a lazy collector where metrics from the upstream component will be78// forwarded.79collector := lazycollector.New()80opts.Registerer.MustRegister(collector)8182// Immediately set our state with our consumer. The exports will never change83// throughout the lifetime of our component.84//85// This will panic if the wrapping component is not registered to export86// otelcol.ConsumerExports.87opts.OnStateChange(otelcol.ConsumerExports{Input: consumer})8889p := &Processor{90ctx: ctx,91cancel: cancel,9293opts: opts,94factory: f,95consumer: consumer,9697sched: scheduler.New(opts.Logger),98collector: collector,99}100if err := p.Update(args); err != nil {101return nil, err102}103return p, nil104}105106// Run starts the Processor component.107func (p *Processor) Run(ctx context.Context) error {108defer p.cancel()109return p.sched.Run(ctx)110}111112// Update implements component.Component. It will convert the Arguments into113// configuration for OpenTelemetry Collector processor configuration and manage114// the underlying OpenTelemetry Collector processor.115func (p *Processor) Update(args component.Arguments) error {116pargs := args.(Arguments)117118host := scheduler.NewHost(119p.opts.Logger,120scheduler.WithHostExtensions(pargs.Extensions()),121scheduler.WithHostExporters(pargs.Exporters()),122)123124reg := prometheus.NewRegistry()125p.collector.Set(reg)126127promExporter, err := sdkprometheus.New(sdkprometheus.WithRegisterer(reg), sdkprometheus.WithoutTargetInfo())128if err != nil {129return err130}131132settings := otelcomponent.ProcessorCreateSettings{133TelemetrySettings: otelcomponent.TelemetrySettings{134Logger: zapadapter.New(p.opts.Logger),135136TracerProvider: p.opts.Tracer,137MeterProvider: metric.NewMeterProvider(metric.WithReader(promExporter)),138},139140BuildInfo: otelcomponent.BuildInfo{141Command: os.Args[0],142Description: "Grafana Agent",143Version: build.Version,144},145}146147processorConfig, err := pargs.Convert()148if err != nil {149return err150}151152var (153next = pargs.NextConsumers()154nextTraces = fanoutconsumer.Traces(next.Traces)155nextMetrics = fanoutconsumer.Metrics(next.Metrics)156nextLogs = fanoutconsumer.Logs(next.Logs)157)158159// Create instances of the processor from our factory for each of our160// supported telemetry signals.161var components []otelcomponent.Component162163tracesProcessor, err := p.factory.CreateTracesProcessor(p.ctx, settings, processorConfig, nextTraces)164if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {165return err166} else if tracesProcessor != nil {167components = append(components, tracesProcessor)168}169170metricsProcessor, err := p.factory.CreateMetricsProcessor(p.ctx, settings, processorConfig, nextMetrics)171if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {172return err173} else if metricsProcessor != nil {174components = append(components, metricsProcessor)175}176177logsProcessor, err := p.factory.CreateLogsProcessor(p.ctx, settings, processorConfig, nextLogs)178if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {179return err180} else if logsProcessor != nil {181components = append(components, logsProcessor)182}183184// Schedule the components to run once our component is running.185p.sched.Schedule(host, components...)186p.consumer.SetConsumers(tracesProcessor, metricsProcessor, logsProcessor)187return nil188}189190// CurrentHealth implements component.HealthComponent.191func (p *Processor) CurrentHealth() component.Health {192return p.sched.CurrentHealth()193}194195196