Path: blob/main/component/otelcol/extension/extension.go
4096 views
// Package extension provides utilities to create a Flow component from1// OpenTelemetry Collector extensions.2//3// Other OpenTelemetry Collector extensions are better served as generic Flow4// components rather than being placed in the otelcol namespace.5package extension67import (8"context"9"os"1011"github.com/grafana/agent/component"12"github.com/grafana/agent/component/otelcol/internal/lazycollector"13"github.com/grafana/agent/component/otelcol/internal/scheduler"14"github.com/grafana/agent/pkg/build"15"github.com/grafana/agent/pkg/util/zapadapter"16"github.com/prometheus/client_golang/prometheus"17otelcomponent "go.opentelemetry.io/collector/component"18otelconfig "go.opentelemetry.io/collector/config"19sdkprometheus "go.opentelemetry.io/otel/exporters/prometheus"20"go.opentelemetry.io/otel/sdk/metric"2122_ "github.com/grafana/agent/component/otelcol/internal/featuregate" // Enable needed feature gates23)2425// Arguments is an extension of component.Arguments which contains necessary26// settings for OpenTelemetry Collector extensions.27type Arguments interface {28component.Arguments2930// Convert converts the Arguments into an OpenTelemetry Collector31// extension configuration.32Convert() (otelconfig.Extension, error)3334// Extensions returns the set of extensions that the configured component is35// allowed to use.36Extensions() map[otelconfig.ComponentID]otelcomponent.Extension3738// Exporters returns the set of exporters that are exposed to the configured39// component.40Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter41}4243// Extension is a Flow component shim which manages an OpenTelemetry Collector44// extension.45type Extension struct {46ctx context.Context47cancel context.CancelFunc4849opts component.Options50factory otelcomponent.ExtensionFactory5152sched *scheduler.Scheduler53collector *lazycollector.Collector54}5556var (57_ component.Component = (*Extension)(nil)58_ component.HealthComponent = (*Extension)(nil)59)6061// New creates a new Flow component which encapsulates an OpenTelemetry62// Collector extension. args must hold a value of the argument63// type registered with the Flow component.64func New(opts component.Options, f otelcomponent.ExtensionFactory, args Arguments) (*Extension, error) {65ctx, cancel := context.WithCancel(context.Background())6667// Create a lazy collector where metrics from the upstream component will be68// forwarded.69collector := lazycollector.New()70opts.Registerer.MustRegister(collector)7172r := &Extension{73ctx: ctx,74cancel: cancel,7576opts: opts,77factory: f,7879sched: scheduler.New(opts.Logger),80collector: collector,81}82if err := r.Update(args); err != nil {83return nil, err84}85return r, nil86}8788// Run starts the Extension component.89func (e *Extension) Run(ctx context.Context) error {90defer e.cancel()91return e.sched.Run(ctx)92}9394// Update implements component.Component. It will convert the Arguments into95// configuration for OpenTelemetry Collector extension96// configuration and manage the underlying OpenTelemetry Collector extension.97func (e *Extension) Update(args component.Arguments) error {98rargs := args.(Arguments)99100host := scheduler.NewHost(101e.opts.Logger,102scheduler.WithHostExtensions(rargs.Extensions()),103scheduler.WithHostExporters(rargs.Exporters()),104)105106reg := prometheus.NewRegistry()107e.collector.Set(reg)108109promExporter, err := sdkprometheus.New(sdkprometheus.WithRegisterer(reg), sdkprometheus.WithoutTargetInfo())110if err != nil {111return err112}113114settings := otelcomponent.ExtensionCreateSettings{115TelemetrySettings: otelcomponent.TelemetrySettings{116Logger: zapadapter.New(e.opts.Logger),117118TracerProvider: e.opts.Tracer,119MeterProvider: metric.NewMeterProvider(metric.WithReader(promExporter)),120},121122BuildInfo: otelcomponent.BuildInfo{123Command: os.Args[0],124Description: "Grafana Agent",125Version: build.Version,126},127}128129extensionConfig, err := rargs.Convert()130if err != nil {131return err132}133134// Create instances of the extension from our factory.135var components []otelcomponent.Component136137ext, err := e.factory.CreateExtension(e.ctx, settings, extensionConfig)138if err != nil {139return err140} else if ext != nil {141components = append(components, ext)142}143144// Schedule the components to run once our component is running.145e.sched.Schedule(host, components...)146return nil147}148149// CurrentHealth implements component.HealthComponent.150func (e *Extension) CurrentHealth() component.Health {151return e.sched.CurrentHealth()152}153154155