Path: blob/main/component/otelcol/internal/fanoutconsumer/metrics.go
4096 views
package fanoutconsumer12// This file is a near copy of3// https://github.com/open-telemetry/opentelemetry-collector/blob/v0.54.0/service/internal/fanoutconsumer/metrics.go4//5// A copy was made because the upstream package is internal. If it is ever made6// public, our copy can be removed.78import (9"context"1011"github.com/grafana/agent/component/otelcol"12otelconsumer "go.opentelemetry.io/collector/consumer"13"go.opentelemetry.io/collector/pdata/pmetric"14"go.uber.org/multierr"15)1617// Metrics creates a new fanout consumer for metrics.18func Metrics(in []otelcol.Consumer) otelconsumer.Metrics {19if len(in) == 0 {20return &metricsFanout{}21} else if len(in) == 1 {22return in[0]23}2425var passthrough, clone []otelconsumer.Metrics2627// Iterate through all the consumers besides the last.28for i := 0; i < len(in)-1; i++ {29consumer := in[i]3031if consumer.Capabilities().MutatesData {32clone = append(clone, consumer)33} else {34passthrough = append(passthrough, consumer)35}36}3738last := in[len(in)-1]3940// The final consumer can be given to the passthrough list regardless of41// whether it mutates as long as there's no other read-only consumers.42if len(passthrough) == 0 || !last.Capabilities().MutatesData {43passthrough = append(passthrough, last)44} else {45clone = append(clone, last)46}4748return &metricsFanout{49passthrough: passthrough,50clone: clone,51}52}5354type metricsFanout struct {55passthrough []otelconsumer.Metrics // Consumers where data can be passed through directly56clone []otelconsumer.Metrics // Consumes which require cloning data57}5859func (f *metricsFanout) Capabilities() otelconsumer.Capabilities {60return otelconsumer.Capabilities{MutatesData: false}61}6263// ConsumeMetrics exports the pmetric.Metrics to all consumers wrapped by the current one.64func (f *metricsFanout) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {65var errs error6667// Initially pass to clone exporter to avoid the case where the optimization68// of sending the incoming data to a mutating consumer is used that may69// change the incoming data before cloning.70for _, f := range f.clone {71newMetrics := pmetric.NewMetrics()72md.CopyTo(newMetrics)73errs = multierr.Append(errs, f.ConsumeMetrics(ctx, newMetrics))74}75for _, f := range f.passthrough {76errs = multierr.Append(errs, f.ConsumeMetrics(ctx, md))77}7879return errs80}818283