Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/otelcol/receiver/receiver.go
4096 views
1
// Package receiver utilities to create a Flow component from OpenTelemetry
2
// Collector receivers.
3
package receiver
4
5
import (
6
"context"
7
"errors"
8
"os"
9
10
"github.com/grafana/agent/component"
11
"github.com/grafana/agent/component/otelcol"
12
"github.com/grafana/agent/component/otelcol/internal/fanoutconsumer"
13
"github.com/grafana/agent/component/otelcol/internal/lazycollector"
14
"github.com/grafana/agent/component/otelcol/internal/scheduler"
15
"github.com/grafana/agent/pkg/build"
16
"github.com/grafana/agent/pkg/util/zapadapter"
17
"github.com/prometheus/client_golang/prometheus"
18
otelcomponent "go.opentelemetry.io/collector/component"
19
otelconfig "go.opentelemetry.io/collector/config"
20
sdkprometheus "go.opentelemetry.io/otel/exporters/prometheus"
21
"go.opentelemetry.io/otel/sdk/metric"
22
23
_ "github.com/grafana/agent/component/otelcol/internal/featuregate" // Enable needed feature gates
24
)
25
26
// Arguments is an extension of component.Arguments which contains necessary
27
// settings for OpenTelemetry Collector receivers.
28
type Arguments interface {
29
component.Arguments
30
31
// Convert converts the Arguments into an OpenTelemetry Collector receiver
32
// configuration.
33
Convert() (otelconfig.Receiver, error)
34
35
// Extensions returns the set of extensions that the configured component is
36
// allowed to use.
37
Extensions() map[otelconfig.ComponentID]otelcomponent.Extension
38
39
// Exporters returns the set of exporters that are exposed to the configured
40
// component.
41
Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter
42
43
// NextConsumers returns the set of consumers to send data to.
44
NextConsumers() *otelcol.ConsumerArguments
45
}
46
47
// Receiver is a Flow component shim which manages an OpenTelemetry Collector
48
// receiver component.
49
type Receiver struct {
50
ctx context.Context
51
cancel context.CancelFunc
52
53
opts component.Options
54
factory otelcomponent.ReceiverFactory
55
56
sched *scheduler.Scheduler
57
collector *lazycollector.Collector
58
}
59
60
var (
61
_ component.Component = (*Receiver)(nil)
62
_ component.HealthComponent = (*Receiver)(nil)
63
)
64
65
// New creates a new Flow component which encapsulates an OpenTelemetry
66
// Collector receiver. args must hold a value of the argument type registered
67
// with the Flow component.
68
//
69
// If the registered Flow component registers exported fields, it is the
70
// responsibility of the caller to export values when needed; the Receiver
71
// component never exports any values.
72
func New(opts component.Options, f otelcomponent.ReceiverFactory, args Arguments) (*Receiver, error) {
73
ctx, cancel := context.WithCancel(context.Background())
74
75
// Create a lazy collector where metrics from the upstream component will be
76
// forwarded.
77
collector := lazycollector.New()
78
opts.Registerer.MustRegister(collector)
79
80
r := &Receiver{
81
ctx: ctx,
82
cancel: cancel,
83
84
opts: opts,
85
factory: f,
86
87
sched: scheduler.New(opts.Logger),
88
collector: collector,
89
}
90
if err := r.Update(args); err != nil {
91
return nil, err
92
}
93
return r, nil
94
}
95
96
// Run starts the Receiver component.
97
func (r *Receiver) Run(ctx context.Context) error {
98
defer r.cancel()
99
return r.sched.Run(ctx)
100
}
101
102
// Update implements component.Component. It will convert the Arguments into
103
// configuration for OpenTelemetry Collector receiver configuration and manage
104
// the underlying OpenTelemetry Collector receiver.
105
func (r *Receiver) Update(args component.Arguments) error {
106
rargs := args.(Arguments)
107
108
host := scheduler.NewHost(
109
r.opts.Logger,
110
scheduler.WithHostExtensions(rargs.Extensions()),
111
scheduler.WithHostExporters(rargs.Exporters()),
112
)
113
114
reg := prometheus.NewRegistry()
115
r.collector.Set(reg)
116
117
promExporter, err := sdkprometheus.New(sdkprometheus.WithRegisterer(reg), sdkprometheus.WithoutTargetInfo())
118
if err != nil {
119
return err
120
}
121
122
settings := otelcomponent.ReceiverCreateSettings{
123
TelemetrySettings: otelcomponent.TelemetrySettings{
124
Logger: zapadapter.New(r.opts.Logger),
125
126
TracerProvider: r.opts.Tracer,
127
MeterProvider: metric.NewMeterProvider(metric.WithReader(promExporter)),
128
},
129
130
BuildInfo: otelcomponent.BuildInfo{
131
Command: os.Args[0],
132
Description: "Grafana Agent",
133
Version: build.Version,
134
},
135
}
136
137
receiverConfig, err := rargs.Convert()
138
if err != nil {
139
return err
140
}
141
142
var (
143
next = rargs.NextConsumers()
144
nextTraces = fanoutconsumer.Traces(next.Traces)
145
nextMetrics = fanoutconsumer.Metrics(next.Metrics)
146
nextLogs = fanoutconsumer.Logs(next.Logs)
147
)
148
149
// Create instances of the receiver from our factory for each of our
150
// supported telemetry signals.
151
var components []otelcomponent.Component
152
153
tracesReceiver, err := r.factory.CreateTracesReceiver(r.ctx, settings, receiverConfig, nextTraces)
154
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
155
return err
156
} else if tracesReceiver != nil {
157
components = append(components, tracesReceiver)
158
}
159
160
metricsReceiver, err := r.factory.CreateMetricsReceiver(r.ctx, settings, receiverConfig, nextMetrics)
161
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
162
return err
163
} else if metricsReceiver != nil {
164
components = append(components, metricsReceiver)
165
}
166
167
logsReceiver, err := r.factory.CreateLogsReceiver(r.ctx, settings, receiverConfig, nextLogs)
168
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
169
return err
170
} else if logsReceiver != nil {
171
components = append(components, logsReceiver)
172
}
173
174
// Schedule the components to run once our component is running.
175
r.sched.Schedule(host, components...)
176
return nil
177
}
178
179
// CurrentHealth implements component.HealthComponent.
180
func (r *Receiver) CurrentHealth() component.Health {
181
return r.sched.CurrentHealth()
182
}
183
184