Path: blob/main/component/otelcol/exporter/exporter.go
4096 views
// Package exporter exposes utilities to create a Flow component from1// OpenTelemetry Collector exporters.2package exporter34import (5"context"6"errors"7"os"89"github.com/grafana/agent/component"10"github.com/grafana/agent/component/otelcol"11"github.com/grafana/agent/component/otelcol/internal/lazycollector"12"github.com/grafana/agent/component/otelcol/internal/lazyconsumer"13"github.com/grafana/agent/component/otelcol/internal/scheduler"14"github.com/grafana/agent/pkg/build"15"github.com/grafana/agent/pkg/util/zapadapter"16"github.com/prometheus/client_golang/prometheus"17otelcomponent "go.opentelemetry.io/collector/component"18otelconfig "go.opentelemetry.io/collector/config"19sdkprometheus "go.opentelemetry.io/otel/exporters/prometheus"20"go.opentelemetry.io/otel/sdk/metric"2122_ "github.com/grafana/agent/component/otelcol/internal/featuregate" // Enable needed feature gates23)2425// Arguments is an extension of component.Arguments which contains necessary26// settings for OpenTelemetry Collector exporters.27type Arguments interface {28component.Arguments2930// Convert converts the Arguments into an OpenTelemetry Collector exporter31// configuration.32Convert() (otelconfig.Exporter, error)3334// Extensions returns the set of extensions that the configured component is35// allowed to use.36Extensions() map[otelconfig.ComponentID]otelcomponent.Extension3738// Exporters returns the set of exporters that are exposed to the configured39// component.40Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter41}4243// Exporter is a Flow component shim which manages an OpenTelemetry Collector44// exporter component.45type Exporter struct {46ctx context.Context47cancel context.CancelFunc4849opts component.Options50factory otelcomponent.ExporterFactory51consumer *lazyconsumer.Consumer5253sched *scheduler.Scheduler54collector *lazycollector.Collector55}5657var (58_ component.Component = (*Exporter)(nil)59_ component.HealthComponent = (*Exporter)(nil)60)6162// New creates a new Flow component which encapsulates an OpenTelemetry63// Collector exporter. args must hold a value of the argument type registered64// with the Flow component.65//66// The registered component must be registered to export the67// otelcol.ConsumerExports type, otherwise New will panic.68func New(opts component.Options, f otelcomponent.ExporterFactory, args Arguments) (*Exporter, error) {69ctx, cancel := context.WithCancel(context.Background())7071consumer := lazyconsumer.New(ctx)7273// Create a lazy collector where metrics from the upstream component will be74// forwarded.75collector := lazycollector.New()76opts.Registerer.MustRegister(collector)7778// Immediately set our state with our consumer. The exports will never change79// throughout the lifetime of our component.80//81// This will panic if the wrapping component is not registered to export82// otelcol.ConsumerExports.83opts.OnStateChange(otelcol.ConsumerExports{Input: consumer})8485e := &Exporter{86ctx: ctx,87cancel: cancel,8889opts: opts,90factory: f,91consumer: consumer,9293sched: scheduler.New(opts.Logger),94collector: collector,95}96if err := e.Update(args); err != nil {97return nil, err98}99return e, nil100}101102// Run starts the Exporter component.103func (e *Exporter) Run(ctx context.Context) error {104defer e.cancel()105return e.sched.Run(ctx)106}107108// Update implements component.Component. It will convert the Arguments into109// configuration for OpenTelemetry Collector exporter configuration and manage110// the underlying OpenTelemetry Collector exporter.111func (e *Exporter) Update(args component.Arguments) error {112eargs := args.(Arguments)113114host := scheduler.NewHost(115e.opts.Logger,116scheduler.WithHostExtensions(eargs.Extensions()),117scheduler.WithHostExporters(eargs.Exporters()),118)119120reg := prometheus.NewRegistry()121e.collector.Set(reg)122123promExporter, err := sdkprometheus.New(sdkprometheus.WithRegisterer(reg), sdkprometheus.WithoutTargetInfo())124if err != nil {125return err126}127128settings := otelcomponent.ExporterCreateSettings{129TelemetrySettings: otelcomponent.TelemetrySettings{130Logger: zapadapter.New(e.opts.Logger),131132TracerProvider: e.opts.Tracer,133MeterProvider: metric.NewMeterProvider(metric.WithReader(promExporter)),134},135136BuildInfo: otelcomponent.BuildInfo{137Command: os.Args[0],138Description: "Grafana Agent",139Version: build.Version,140},141}142143exporterConfig, err := eargs.Convert()144if err != nil {145return err146}147148// Create instances of the exporter from our factory for each of our149// supported telemetry signals.150var components []otelcomponent.Component151152tracesExporter, err := e.factory.CreateTracesExporter(e.ctx, settings, exporterConfig)153if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {154return err155} else if tracesExporter != nil {156components = append(components, tracesExporter)157}158159metricsExporter, err := e.factory.CreateMetricsExporter(e.ctx, settings, exporterConfig)160if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {161return err162} else if metricsExporter != nil {163components = append(components, metricsExporter)164}165166logsExporter, err := e.factory.CreateLogsExporter(e.ctx, settings, exporterConfig)167if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {168return err169} else if logsExporter != nil {170components = append(components, logsExporter)171}172173// Schedule the components to run once our component is running.174e.sched.Schedule(host, components...)175e.consumer.SetConsumers(tracesExporter, metricsExporter, logsExporter)176return nil177}178179// CurrentHealth implements component.HealthComponent.180func (e *Exporter) CurrentHealth() component.Health {181return e.sched.CurrentHealth()182}183184185