Path: blob/main/component/otelcol/internal/fanoutconsumer/traces.go
4096 views
package fanoutconsumer12// This file is a near copy of3// https://github.com/open-telemetry/opentelemetry-collector/blob/v0.54.0/service/internal/fanoutconsumer/traces.go4//5// A copy was made because the upstream package is internal. If it is ever made6// public, our copy can be removed.78import (9"context"1011"github.com/grafana/agent/component/otelcol"12otelconsumer "go.opentelemetry.io/collector/consumer"13"go.opentelemetry.io/collector/pdata/ptrace"14"go.uber.org/multierr"15)1617// Traces creates a new fanout consumer for traces.18func Traces(in []otelcol.Consumer) otelconsumer.Traces {19if len(in) == 0 {20return &tracesFanout{}21} else if len(in) == 1 {22return in[0]23}2425var passthrough, clone []otelconsumer.Traces2627// Iterate through all the consumers besides the last.28for i := 0; i < len(in)-1; i++ {29consumer := in[i]3031if consumer.Capabilities().MutatesData {32clone = append(clone, consumer)33} else {34passthrough = append(passthrough, consumer)35}36}3738last := in[len(in)-1]3940// The final consumer can be given to the passthrough list regardless of41// whether it mutates as long as there's no other read-only consumers.42if len(passthrough) == 0 || !last.Capabilities().MutatesData {43passthrough = append(passthrough, last)44} else {45clone = append(clone, last)46}4748return &tracesFanout{49passthrough: passthrough,50clone: clone,51}52}5354type tracesFanout struct {55passthrough []otelconsumer.Traces // Consumers where data can be passed through directly56clone []otelconsumer.Traces // Consumes which require cloning data57}5859func (f *tracesFanout) Capabilities() otelconsumer.Capabilities {60return otelconsumer.Capabilities{MutatesData: false}61}6263// ConsumeTraces exports the pmetric.Traces to all consumers wrapped by the current one.64func (f *tracesFanout) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {65var errs error6667// Initially pass to clone exporter to avoid the case where the optimization68// of sending the incoming data to a mutating consumer is used that may69// change the incoming data before cloning.70for _, f := range f.clone {71newTraces := ptrace.NewTraces()72td.CopyTo(newTraces)73errs = multierr.Append(errs, f.ConsumeTraces(ctx, newTraces))74}75for _, f := range f.passthrough {76errs = multierr.Append(errs, f.ConsumeTraces(ctx, td))77}7879return errs80}818283