Path: blob/main/component/otelcol/internal/fanoutconsumer/logs.go
4096 views
package fanoutconsumer12// This file is a near copy of3// https://github.com/open-telemetry/opentelemetry-collector/blob/v0.54.0/service/internal/fanoutconsumer/logs.go4//5// A copy was made because the upstream package is internal. If it is ever made6// public, our copy can be removed.78import (9"context"1011"github.com/grafana/agent/component/otelcol"12otelconsumer "go.opentelemetry.io/collector/consumer"13"go.opentelemetry.io/collector/pdata/plog"14"go.uber.org/multierr"15)1617// Logs creates a new fanout consumer for logs.18func Logs(in []otelcol.Consumer) otelconsumer.Logs {19if len(in) == 0 {20return &logsFanout{}21} else if len(in) == 1 {22return in[0]23}2425var passthrough, clone []otelconsumer.Logs2627// Iterate through all the consumers besides the last.28for i := 0; i < len(in)-1; i++ {29consumer := in[i]3031if consumer.Capabilities().MutatesData {32clone = append(clone, consumer)33} else {34passthrough = append(passthrough, consumer)35}36}3738last := in[len(in)-1]3940// The final consumer can be given to the passthrough list regardless of41// whether it mutates as long as there's no other read-only consumers.42if len(passthrough) == 0 || !last.Capabilities().MutatesData {43passthrough = append(passthrough, last)44} else {45clone = append(clone, last)46}4748return &logsFanout{49passthrough: passthrough,50clone: clone,51}52}5354type logsFanout struct {55passthrough []otelconsumer.Logs // Consumers where data can be passed through directly56clone []otelconsumer.Logs // Consumes which require cloning data57}5859func (f *logsFanout) Capabilities() otelconsumer.Capabilities {60return otelconsumer.Capabilities{MutatesData: false}61}6263// ConsumeLogs exports the pmetric.Logs to all consumers wrapped by the current one.64func (f *logsFanout) ConsumeLogs(ctx context.Context, ld plog.Logs) error {65var errs error6667// Initially pass to clone exporter to avoid the case where the optimization68// of sending the incoming data to a mutating consumer is used that may69// change the incoming data before cloning.70for _, f := range f.clone {71newLogs := plog.NewLogs()72ld.CopyTo(newLogs)73errs = multierr.Append(errs, f.ConsumeLogs(ctx, newLogs))74}75for _, f := range f.passthrough {76errs = multierr.Append(errs, f.ConsumeLogs(ctx, ld))77}7879return errs80}818283