Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/otelcol/internal/fanoutconsumer/traces.go
4096 views
1
package fanoutconsumer
2
3
// This file is a near copy of
4
// https://github.com/open-telemetry/opentelemetry-collector/blob/v0.54.0/service/internal/fanoutconsumer/traces.go
5
//
6
// A copy was made because the upstream package is internal. If it is ever made
7
// public, our copy can be removed.
8
9
import (
10
"context"
11
12
"github.com/grafana/agent/component/otelcol"
13
otelconsumer "go.opentelemetry.io/collector/consumer"
14
"go.opentelemetry.io/collector/pdata/ptrace"
15
"go.uber.org/multierr"
16
)
17
18
// Traces creates a new fanout consumer for traces.
19
func Traces(in []otelcol.Consumer) otelconsumer.Traces {
20
if len(in) == 0 {
21
return &tracesFanout{}
22
} else if len(in) == 1 {
23
return in[0]
24
}
25
26
var passthrough, clone []otelconsumer.Traces
27
28
// Iterate through all the consumers besides the last.
29
for i := 0; i < len(in)-1; i++ {
30
consumer := in[i]
31
32
if consumer.Capabilities().MutatesData {
33
clone = append(clone, consumer)
34
} else {
35
passthrough = append(passthrough, consumer)
36
}
37
}
38
39
last := in[len(in)-1]
40
41
// The final consumer can be given to the passthrough list regardless of
42
// whether it mutates as long as there's no other read-only consumers.
43
if len(passthrough) == 0 || !last.Capabilities().MutatesData {
44
passthrough = append(passthrough, last)
45
} else {
46
clone = append(clone, last)
47
}
48
49
return &tracesFanout{
50
passthrough: passthrough,
51
clone: clone,
52
}
53
}
54
55
type tracesFanout struct {
56
passthrough []otelconsumer.Traces // Consumers where data can be passed through directly
57
clone []otelconsumer.Traces // Consumes which require cloning data
58
}
59
60
func (f *tracesFanout) Capabilities() otelconsumer.Capabilities {
61
return otelconsumer.Capabilities{MutatesData: false}
62
}
63
64
// ConsumeTraces exports the pmetric.Traces to all consumers wrapped by the current one.
65
func (f *tracesFanout) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
66
var errs error
67
68
// Initially pass to clone exporter to avoid the case where the optimization
69
// of sending the incoming data to a mutating consumer is used that may
70
// change the incoming data before cloning.
71
for _, f := range f.clone {
72
newTraces := ptrace.NewTraces()
73
td.CopyTo(newTraces)
74
errs = multierr.Append(errs, f.ConsumeTraces(ctx, newTraces))
75
}
76
for _, f := range f.passthrough {
77
errs = multierr.Append(errs, f.ConsumeTraces(ctx, td))
78
}
79
80
return errs
81
}
82
83