Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/otelcol/processor/processor.go
4096 views
1
// Package processor exposes utilities to create a Flow component from
2
// OpenTelemetry Collector processors.
3
package processor
4
5
import (
6
"context"
7
"errors"
8
"os"
9
10
"github.com/grafana/agent/component"
11
"github.com/grafana/agent/component/otelcol"
12
"github.com/grafana/agent/component/otelcol/internal/fanoutconsumer"
13
"github.com/grafana/agent/component/otelcol/internal/lazycollector"
14
"github.com/grafana/agent/component/otelcol/internal/lazyconsumer"
15
"github.com/grafana/agent/component/otelcol/internal/scheduler"
16
"github.com/grafana/agent/pkg/build"
17
"github.com/grafana/agent/pkg/util/zapadapter"
18
"github.com/prometheus/client_golang/prometheus"
19
otelcomponent "go.opentelemetry.io/collector/component"
20
otelconfig "go.opentelemetry.io/collector/config"
21
sdkprometheus "go.opentelemetry.io/otel/exporters/prometheus"
22
"go.opentelemetry.io/otel/sdk/metric"
23
24
_ "github.com/grafana/agent/component/otelcol/internal/featuregate" // Enable needed feature gates
25
)
26
27
// Arguments is an extension of component.Arguments which contains necessary
28
// settings for OpenTelemetry Collector processors.
29
type Arguments interface {
30
component.Arguments
31
32
// Convert converts the Arguments into an OpenTelemetry Collector processor
33
// configuration.
34
Convert() (otelconfig.Processor, error)
35
36
// Extensions returns the set of extensions that the configured component is
37
// allowed to use.
38
Extensions() map[otelconfig.ComponentID]otelcomponent.Extension
39
40
// Exporters returns the set of exporters that are exposed to the configured
41
// component.
42
Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter
43
44
// NextConsumers returns the set of consumers to send data to.
45
NextConsumers() *otelcol.ConsumerArguments
46
}
47
48
// Processor is a Flow component shim which manages an OpenTelemetry Collector
49
// processor component.
50
type Processor struct {
51
ctx context.Context
52
cancel context.CancelFunc
53
54
opts component.Options
55
factory otelcomponent.ProcessorFactory
56
consumer *lazyconsumer.Consumer
57
58
sched *scheduler.Scheduler
59
collector *lazycollector.Collector
60
}
61
62
var (
63
_ component.Component = (*Processor)(nil)
64
_ component.HealthComponent = (*Processor)(nil)
65
)
66
67
// New creates a new Flow component which encapsulates an OpenTelemetry
68
// Collector processor. args must hold a value of the argument type registered
69
// with the Flow component.
70
//
71
// The registered component must be registered to export the
72
// otelcol.ConsumerExports type, otherwise New will panic.
73
func New(opts component.Options, f otelcomponent.ProcessorFactory, args Arguments) (*Processor, error) {
74
ctx, cancel := context.WithCancel(context.Background())
75
76
consumer := lazyconsumer.New(ctx)
77
78
// Create a lazy collector where metrics from the upstream component will be
79
// forwarded.
80
collector := lazycollector.New()
81
opts.Registerer.MustRegister(collector)
82
83
// Immediately set our state with our consumer. The exports will never change
84
// throughout the lifetime of our component.
85
//
86
// This will panic if the wrapping component is not registered to export
87
// otelcol.ConsumerExports.
88
opts.OnStateChange(otelcol.ConsumerExports{Input: consumer})
89
90
p := &Processor{
91
ctx: ctx,
92
cancel: cancel,
93
94
opts: opts,
95
factory: f,
96
consumer: consumer,
97
98
sched: scheduler.New(opts.Logger),
99
collector: collector,
100
}
101
if err := p.Update(args); err != nil {
102
return nil, err
103
}
104
return p, nil
105
}
106
107
// Run starts the Processor component.
108
func (p *Processor) Run(ctx context.Context) error {
109
defer p.cancel()
110
return p.sched.Run(ctx)
111
}
112
113
// Update implements component.Component. It will convert the Arguments into
114
// configuration for OpenTelemetry Collector processor configuration and manage
115
// the underlying OpenTelemetry Collector processor.
116
func (p *Processor) Update(args component.Arguments) error {
117
pargs := args.(Arguments)
118
119
host := scheduler.NewHost(
120
p.opts.Logger,
121
scheduler.WithHostExtensions(pargs.Extensions()),
122
scheduler.WithHostExporters(pargs.Exporters()),
123
)
124
125
reg := prometheus.NewRegistry()
126
p.collector.Set(reg)
127
128
promExporter, err := sdkprometheus.New(sdkprometheus.WithRegisterer(reg), sdkprometheus.WithoutTargetInfo())
129
if err != nil {
130
return err
131
}
132
133
settings := otelcomponent.ProcessorCreateSettings{
134
TelemetrySettings: otelcomponent.TelemetrySettings{
135
Logger: zapadapter.New(p.opts.Logger),
136
137
TracerProvider: p.opts.Tracer,
138
MeterProvider: metric.NewMeterProvider(metric.WithReader(promExporter)),
139
},
140
141
BuildInfo: otelcomponent.BuildInfo{
142
Command: os.Args[0],
143
Description: "Grafana Agent",
144
Version: build.Version,
145
},
146
}
147
148
processorConfig, err := pargs.Convert()
149
if err != nil {
150
return err
151
}
152
153
var (
154
next = pargs.NextConsumers()
155
nextTraces = fanoutconsumer.Traces(next.Traces)
156
nextMetrics = fanoutconsumer.Metrics(next.Metrics)
157
nextLogs = fanoutconsumer.Logs(next.Logs)
158
)
159
160
// Create instances of the processor from our factory for each of our
161
// supported telemetry signals.
162
var components []otelcomponent.Component
163
164
tracesProcessor, err := p.factory.CreateTracesProcessor(p.ctx, settings, processorConfig, nextTraces)
165
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
166
return err
167
} else if tracesProcessor != nil {
168
components = append(components, tracesProcessor)
169
}
170
171
metricsProcessor, err := p.factory.CreateMetricsProcessor(p.ctx, settings, processorConfig, nextMetrics)
172
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
173
return err
174
} else if metricsProcessor != nil {
175
components = append(components, metricsProcessor)
176
}
177
178
logsProcessor, err := p.factory.CreateLogsProcessor(p.ctx, settings, processorConfig, nextLogs)
179
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
180
return err
181
} else if logsProcessor != nil {
182
components = append(components, logsProcessor)
183
}
184
185
// Schedule the components to run once our component is running.
186
p.sched.Schedule(host, components...)
187
p.consumer.SetConsumers(tracesProcessor, metricsProcessor, logsProcessor)
188
return nil
189
}
190
191
// CurrentHealth implements component.HealthComponent.
192
func (p *Processor) CurrentHealth() component.Health {
193
return p.sched.CurrentHealth()
194
}
195
196