Path: blob/main/component/otelcol/receiver/receiver.go
4096 views
// Package receiver utilities to create a Flow component from OpenTelemetry1// Collector receivers.2package receiver34import (5"context"6"errors"7"os"89"github.com/grafana/agent/component"10"github.com/grafana/agent/component/otelcol"11"github.com/grafana/agent/component/otelcol/internal/fanoutconsumer"12"github.com/grafana/agent/component/otelcol/internal/lazycollector"13"github.com/grafana/agent/component/otelcol/internal/scheduler"14"github.com/grafana/agent/pkg/build"15"github.com/grafana/agent/pkg/util/zapadapter"16"github.com/prometheus/client_golang/prometheus"17otelcomponent "go.opentelemetry.io/collector/component"18otelconfig "go.opentelemetry.io/collector/config"19sdkprometheus "go.opentelemetry.io/otel/exporters/prometheus"20"go.opentelemetry.io/otel/sdk/metric"2122_ "github.com/grafana/agent/component/otelcol/internal/featuregate" // Enable needed feature gates23)2425// Arguments is an extension of component.Arguments which contains necessary26// settings for OpenTelemetry Collector receivers.27type Arguments interface {28component.Arguments2930// Convert converts the Arguments into an OpenTelemetry Collector receiver31// configuration.32Convert() (otelconfig.Receiver, error)3334// Extensions returns the set of extensions that the configured component is35// allowed to use.36Extensions() map[otelconfig.ComponentID]otelcomponent.Extension3738// Exporters returns the set of exporters that are exposed to the configured39// component.40Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter4142// NextConsumers returns the set of consumers to send data to.43NextConsumers() *otelcol.ConsumerArguments44}4546// Receiver is a Flow component shim which manages an OpenTelemetry Collector47// receiver component.48type Receiver struct {49ctx context.Context50cancel context.CancelFunc5152opts component.Options53factory otelcomponent.ReceiverFactory5455sched *scheduler.Scheduler56collector *lazycollector.Collector57}5859var (60_ component.Component = (*Receiver)(nil)61_ component.HealthComponent = (*Receiver)(nil)62)6364// New creates a new Flow component which encapsulates an OpenTelemetry65// Collector receiver. args must hold a value of the argument type registered66// with the Flow component.67//68// If the registered Flow component registers exported fields, it is the69// responsibility of the caller to export values when needed; the Receiver70// component never exports any values.71func New(opts component.Options, f otelcomponent.ReceiverFactory, args Arguments) (*Receiver, error) {72ctx, cancel := context.WithCancel(context.Background())7374// Create a lazy collector where metrics from the upstream component will be75// forwarded.76collector := lazycollector.New()77opts.Registerer.MustRegister(collector)7879r := &Receiver{80ctx: ctx,81cancel: cancel,8283opts: opts,84factory: f,8586sched: scheduler.New(opts.Logger),87collector: collector,88}89if err := r.Update(args); err != nil {90return nil, err91}92return r, nil93}9495// Run starts the Receiver component.96func (r *Receiver) Run(ctx context.Context) error {97defer r.cancel()98return r.sched.Run(ctx)99}100101// Update implements component.Component. It will convert the Arguments into102// configuration for OpenTelemetry Collector receiver configuration and manage103// the underlying OpenTelemetry Collector receiver.104func (r *Receiver) Update(args component.Arguments) error {105rargs := args.(Arguments)106107host := scheduler.NewHost(108r.opts.Logger,109scheduler.WithHostExtensions(rargs.Extensions()),110scheduler.WithHostExporters(rargs.Exporters()),111)112113reg := prometheus.NewRegistry()114r.collector.Set(reg)115116promExporter, err := sdkprometheus.New(sdkprometheus.WithRegisterer(reg), sdkprometheus.WithoutTargetInfo())117if err != nil {118return err119}120121settings := otelcomponent.ReceiverCreateSettings{122TelemetrySettings: otelcomponent.TelemetrySettings{123Logger: zapadapter.New(r.opts.Logger),124125TracerProvider: r.opts.Tracer,126MeterProvider: metric.NewMeterProvider(metric.WithReader(promExporter)),127},128129BuildInfo: otelcomponent.BuildInfo{130Command: os.Args[0],131Description: "Grafana Agent",132Version: build.Version,133},134}135136receiverConfig, err := rargs.Convert()137if err != nil {138return err139}140141var (142next = rargs.NextConsumers()143nextTraces = fanoutconsumer.Traces(next.Traces)144nextMetrics = fanoutconsumer.Metrics(next.Metrics)145nextLogs = fanoutconsumer.Logs(next.Logs)146)147148// Create instances of the receiver from our factory for each of our149// supported telemetry signals.150var components []otelcomponent.Component151152tracesReceiver, err := r.factory.CreateTracesReceiver(r.ctx, settings, receiverConfig, nextTraces)153if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {154return err155} else if tracesReceiver != nil {156components = append(components, tracesReceiver)157}158159metricsReceiver, err := r.factory.CreateMetricsReceiver(r.ctx, settings, receiverConfig, nextMetrics)160if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {161return err162} else if metricsReceiver != nil {163components = append(components, metricsReceiver)164}165166logsReceiver, err := r.factory.CreateLogsReceiver(r.ctx, settings, receiverConfig, nextLogs)167if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {168return err169} else if logsReceiver != nil {170components = append(components, logsReceiver)171}172173// Schedule the components to run once our component is running.174r.sched.Schedule(host, components...)175return nil176}177178// CurrentHealth implements component.HealthComponent.179func (r *Receiver) CurrentHealth() component.Health {180return r.sched.CurrentHealth()181}182183184