Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/otelcol/internal/lazyconsumer/lazyconsumer.go
4096 views
1
// Package lazyconsumer implements a lazy OpenTelemetry Collector consumer
2
// which can lazily forward request to another consumer implementation.
3
package lazyconsumer
4
5
import (
6
"context"
7
"sync"
8
9
otelcomponent "go.opentelemetry.io/collector/component"
10
otelconsumer "go.opentelemetry.io/collector/consumer"
11
"go.opentelemetry.io/collector/pdata/plog"
12
"go.opentelemetry.io/collector/pdata/pmetric"
13
"go.opentelemetry.io/collector/pdata/ptrace"
14
)
15
16
// Consumer is a lazily-loaded consumer.
17
type Consumer struct {
18
ctx context.Context
19
20
mut sync.RWMutex
21
metricsConsumer otelconsumer.Metrics
22
logsConsumer otelconsumer.Logs
23
tracesConsumer otelconsumer.Traces
24
}
25
26
var (
27
_ otelconsumer.Traces = (*Consumer)(nil)
28
_ otelconsumer.Metrics = (*Consumer)(nil)
29
_ otelconsumer.Logs = (*Consumer)(nil)
30
)
31
32
// New creates a new Consumer. The provided ctx is used to determine when the
33
// Consumer should stop accepting data; if the ctx is closed, no further data
34
// will be accepted.
35
func New(ctx context.Context) *Consumer {
36
return &Consumer{ctx: ctx}
37
}
38
39
// Capabilities implements otelconsumer.baseConsumer.
40
func (c *Consumer) Capabilities() otelconsumer.Capabilities {
41
return otelconsumer.Capabilities{
42
// MutatesData is always set to false; the lazy consumer will check the
43
// underlying consumer's capabilities prior to forwarding data and will
44
// pass a copy if the underlying consumer mutates data.
45
MutatesData: false,
46
}
47
}
48
49
// ConsumeTraces implements otelconsumer.Traces.
50
func (c *Consumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
51
if c.ctx.Err() != nil {
52
return c.ctx.Err()
53
}
54
55
c.mut.RLock()
56
defer c.mut.RUnlock()
57
58
if c.tracesConsumer == nil {
59
return otelcomponent.ErrDataTypeIsNotSupported
60
}
61
62
if c.tracesConsumer.Capabilities().MutatesData {
63
newTraces := ptrace.NewTraces()
64
td.CopyTo(newTraces)
65
td = newTraces
66
}
67
return c.tracesConsumer.ConsumeTraces(ctx, td)
68
}
69
70
// ConsumeMetrics implements otelconsumer.Metrics.
71
func (c *Consumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
72
if c.ctx.Err() != nil {
73
return c.ctx.Err()
74
}
75
76
c.mut.RLock()
77
defer c.mut.RUnlock()
78
79
if c.metricsConsumer == nil {
80
return otelcomponent.ErrDataTypeIsNotSupported
81
}
82
83
if c.metricsConsumer.Capabilities().MutatesData {
84
newMetrics := pmetric.NewMetrics()
85
md.CopyTo(newMetrics)
86
md = newMetrics
87
}
88
return c.metricsConsumer.ConsumeMetrics(ctx, md)
89
}
90
91
// ConsumeLogs implements otelconsumer.Logs.
92
func (c *Consumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
93
if c.ctx.Err() != nil {
94
return c.ctx.Err()
95
}
96
97
c.mut.RLock()
98
defer c.mut.RUnlock()
99
100
if c.logsConsumer == nil {
101
return otelcomponent.ErrDataTypeIsNotSupported
102
}
103
104
if c.logsConsumer.Capabilities().MutatesData {
105
newLogs := plog.NewLogs()
106
ld.CopyTo(newLogs)
107
ld = newLogs
108
}
109
return c.logsConsumer.ConsumeLogs(ctx, ld)
110
}
111
112
// SetConsumers updates the internal consumers that Consumer will forward data
113
// to. It is valid for any combination of m, l, and t to be nil.
114
func (c *Consumer) SetConsumers(t otelconsumer.Traces, m otelconsumer.Metrics, l otelconsumer.Logs) {
115
c.mut.Lock()
116
defer c.mut.Unlock()
117
118
c.metricsConsumer = m
119
c.logsConsumer = l
120
c.tracesConsumer = t
121
}
122
123