Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/otelcol/exporter/exporter.go
4096 views
1
// Package exporter exposes utilities to create a Flow component from
2
// OpenTelemetry Collector exporters.
3
package exporter
4
5
import (
6
"context"
7
"errors"
8
"os"
9
10
"github.com/grafana/agent/component"
11
"github.com/grafana/agent/component/otelcol"
12
"github.com/grafana/agent/component/otelcol/internal/lazycollector"
13
"github.com/grafana/agent/component/otelcol/internal/lazyconsumer"
14
"github.com/grafana/agent/component/otelcol/internal/scheduler"
15
"github.com/grafana/agent/pkg/build"
16
"github.com/grafana/agent/pkg/util/zapadapter"
17
"github.com/prometheus/client_golang/prometheus"
18
otelcomponent "go.opentelemetry.io/collector/component"
19
otelconfig "go.opentelemetry.io/collector/config"
20
sdkprometheus "go.opentelemetry.io/otel/exporters/prometheus"
21
"go.opentelemetry.io/otel/sdk/metric"
22
23
_ "github.com/grafana/agent/component/otelcol/internal/featuregate" // Enable needed feature gates
24
)
25
26
// Arguments is an extension of component.Arguments which contains necessary
27
// settings for OpenTelemetry Collector exporters.
28
type Arguments interface {
29
component.Arguments
30
31
// Convert converts the Arguments into an OpenTelemetry Collector exporter
32
// configuration.
33
Convert() (otelconfig.Exporter, error)
34
35
// Extensions returns the set of extensions that the configured component is
36
// allowed to use.
37
Extensions() map[otelconfig.ComponentID]otelcomponent.Extension
38
39
// Exporters returns the set of exporters that are exposed to the configured
40
// component.
41
Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter
42
}
43
44
// Exporter is a Flow component shim which manages an OpenTelemetry Collector
45
// exporter component.
46
type Exporter struct {
47
ctx context.Context
48
cancel context.CancelFunc
49
50
opts component.Options
51
factory otelcomponent.ExporterFactory
52
consumer *lazyconsumer.Consumer
53
54
sched *scheduler.Scheduler
55
collector *lazycollector.Collector
56
}
57
58
var (
59
_ component.Component = (*Exporter)(nil)
60
_ component.HealthComponent = (*Exporter)(nil)
61
)
62
63
// New creates a new Flow component which encapsulates an OpenTelemetry
64
// Collector exporter. args must hold a value of the argument type registered
65
// with the Flow component.
66
//
67
// The registered component must be registered to export the
68
// otelcol.ConsumerExports type, otherwise New will panic.
69
func New(opts component.Options, f otelcomponent.ExporterFactory, args Arguments) (*Exporter, error) {
70
ctx, cancel := context.WithCancel(context.Background())
71
72
consumer := lazyconsumer.New(ctx)
73
74
// Create a lazy collector where metrics from the upstream component will be
75
// forwarded.
76
collector := lazycollector.New()
77
opts.Registerer.MustRegister(collector)
78
79
// Immediately set our state with our consumer. The exports will never change
80
// throughout the lifetime of our component.
81
//
82
// This will panic if the wrapping component is not registered to export
83
// otelcol.ConsumerExports.
84
opts.OnStateChange(otelcol.ConsumerExports{Input: consumer})
85
86
e := &Exporter{
87
ctx: ctx,
88
cancel: cancel,
89
90
opts: opts,
91
factory: f,
92
consumer: consumer,
93
94
sched: scheduler.New(opts.Logger),
95
collector: collector,
96
}
97
if err := e.Update(args); err != nil {
98
return nil, err
99
}
100
return e, nil
101
}
102
103
// Run starts the Exporter component.
104
func (e *Exporter) Run(ctx context.Context) error {
105
defer e.cancel()
106
return e.sched.Run(ctx)
107
}
108
109
// Update implements component.Component. It will convert the Arguments into
110
// configuration for OpenTelemetry Collector exporter configuration and manage
111
// the underlying OpenTelemetry Collector exporter.
112
func (e *Exporter) Update(args component.Arguments) error {
113
eargs := args.(Arguments)
114
115
host := scheduler.NewHost(
116
e.opts.Logger,
117
scheduler.WithHostExtensions(eargs.Extensions()),
118
scheduler.WithHostExporters(eargs.Exporters()),
119
)
120
121
reg := prometheus.NewRegistry()
122
e.collector.Set(reg)
123
124
promExporter, err := sdkprometheus.New(sdkprometheus.WithRegisterer(reg), sdkprometheus.WithoutTargetInfo())
125
if err != nil {
126
return err
127
}
128
129
settings := otelcomponent.ExporterCreateSettings{
130
TelemetrySettings: otelcomponent.TelemetrySettings{
131
Logger: zapadapter.New(e.opts.Logger),
132
133
TracerProvider: e.opts.Tracer,
134
MeterProvider: metric.NewMeterProvider(metric.WithReader(promExporter)),
135
},
136
137
BuildInfo: otelcomponent.BuildInfo{
138
Command: os.Args[0],
139
Description: "Grafana Agent",
140
Version: build.Version,
141
},
142
}
143
144
exporterConfig, err := eargs.Convert()
145
if err != nil {
146
return err
147
}
148
149
// Create instances of the exporter from our factory for each of our
150
// supported telemetry signals.
151
var components []otelcomponent.Component
152
153
tracesExporter, err := e.factory.CreateTracesExporter(e.ctx, settings, exporterConfig)
154
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
155
return err
156
} else if tracesExporter != nil {
157
components = append(components, tracesExporter)
158
}
159
160
metricsExporter, err := e.factory.CreateMetricsExporter(e.ctx, settings, exporterConfig)
161
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
162
return err
163
} else if metricsExporter != nil {
164
components = append(components, metricsExporter)
165
}
166
167
logsExporter, err := e.factory.CreateLogsExporter(e.ctx, settings, exporterConfig)
168
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
169
return err
170
} else if logsExporter != nil {
171
components = append(components, logsExporter)
172
}
173
174
// Schedule the components to run once our component is running.
175
e.sched.Schedule(host, components...)
176
e.consumer.SetConsumers(tracesExporter, metricsExporter, logsExporter)
177
return nil
178
}
179
180
// CurrentHealth implements component.HealthComponent.
181
func (e *Exporter) CurrentHealth() component.Health {
182
return e.sched.CurrentHealth()
183
}
184
185