Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/otelcol/extension/extension.go
4096 views
1
// Package extension provides utilities to create a Flow component from
2
// OpenTelemetry Collector extensions.
3
//
4
// Other OpenTelemetry Collector extensions are better served as generic Flow
5
// components rather than being placed in the otelcol namespace.
6
package extension
7
8
import (
9
"context"
10
"os"
11
12
"github.com/grafana/agent/component"
13
"github.com/grafana/agent/component/otelcol/internal/lazycollector"
14
"github.com/grafana/agent/component/otelcol/internal/scheduler"
15
"github.com/grafana/agent/pkg/build"
16
"github.com/grafana/agent/pkg/util/zapadapter"
17
"github.com/prometheus/client_golang/prometheus"
18
otelcomponent "go.opentelemetry.io/collector/component"
19
otelconfig "go.opentelemetry.io/collector/config"
20
sdkprometheus "go.opentelemetry.io/otel/exporters/prometheus"
21
"go.opentelemetry.io/otel/sdk/metric"
22
23
_ "github.com/grafana/agent/component/otelcol/internal/featuregate" // Enable needed feature gates
24
)
25
26
// Arguments is an extension of component.Arguments which contains necessary
27
// settings for OpenTelemetry Collector extensions.
28
type Arguments interface {
29
component.Arguments
30
31
// Convert converts the Arguments into an OpenTelemetry Collector
32
// extension configuration.
33
Convert() (otelconfig.Extension, error)
34
35
// Extensions returns the set of extensions that the configured component is
36
// allowed to use.
37
Extensions() map[otelconfig.ComponentID]otelcomponent.Extension
38
39
// Exporters returns the set of exporters that are exposed to the configured
40
// component.
41
Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter
42
}
43
44
// Extension is a Flow component shim which manages an OpenTelemetry Collector
45
// extension.
46
type Extension struct {
47
ctx context.Context
48
cancel context.CancelFunc
49
50
opts component.Options
51
factory otelcomponent.ExtensionFactory
52
53
sched *scheduler.Scheduler
54
collector *lazycollector.Collector
55
}
56
57
var (
58
_ component.Component = (*Extension)(nil)
59
_ component.HealthComponent = (*Extension)(nil)
60
)
61
62
// New creates a new Flow component which encapsulates an OpenTelemetry
63
// Collector extension. args must hold a value of the argument
64
// type registered with the Flow component.
65
func New(opts component.Options, f otelcomponent.ExtensionFactory, args Arguments) (*Extension, error) {
66
ctx, cancel := context.WithCancel(context.Background())
67
68
// Create a lazy collector where metrics from the upstream component will be
69
// forwarded.
70
collector := lazycollector.New()
71
opts.Registerer.MustRegister(collector)
72
73
r := &Extension{
74
ctx: ctx,
75
cancel: cancel,
76
77
opts: opts,
78
factory: f,
79
80
sched: scheduler.New(opts.Logger),
81
collector: collector,
82
}
83
if err := r.Update(args); err != nil {
84
return nil, err
85
}
86
return r, nil
87
}
88
89
// Run starts the Extension component.
90
func (e *Extension) Run(ctx context.Context) error {
91
defer e.cancel()
92
return e.sched.Run(ctx)
93
}
94
95
// Update implements component.Component. It will convert the Arguments into
96
// configuration for OpenTelemetry Collector extension
97
// configuration and manage the underlying OpenTelemetry Collector extension.
98
func (e *Extension) Update(args component.Arguments) error {
99
rargs := args.(Arguments)
100
101
host := scheduler.NewHost(
102
e.opts.Logger,
103
scheduler.WithHostExtensions(rargs.Extensions()),
104
scheduler.WithHostExporters(rargs.Exporters()),
105
)
106
107
reg := prometheus.NewRegistry()
108
e.collector.Set(reg)
109
110
promExporter, err := sdkprometheus.New(sdkprometheus.WithRegisterer(reg), sdkprometheus.WithoutTargetInfo())
111
if err != nil {
112
return err
113
}
114
115
settings := otelcomponent.ExtensionCreateSettings{
116
TelemetrySettings: otelcomponent.TelemetrySettings{
117
Logger: zapadapter.New(e.opts.Logger),
118
119
TracerProvider: e.opts.Tracer,
120
MeterProvider: metric.NewMeterProvider(metric.WithReader(promExporter)),
121
},
122
123
BuildInfo: otelcomponent.BuildInfo{
124
Command: os.Args[0],
125
Description: "Grafana Agent",
126
Version: build.Version,
127
},
128
}
129
130
extensionConfig, err := rargs.Convert()
131
if err != nil {
132
return err
133
}
134
135
// Create instances of the extension from our factory.
136
var components []otelcomponent.Component
137
138
ext, err := e.factory.CreateExtension(e.ctx, settings, extensionConfig)
139
if err != nil {
140
return err
141
} else if ext != nil {
142
components = append(components, ext)
143
}
144
145
// Schedule the components to run once our component is running.
146
e.sched.Schedule(host, components...)
147
return nil
148
}
149
150
// CurrentHealth implements component.HealthComponent.
151
func (e *Extension) CurrentHealth() component.Health {
152
return e.sched.CurrentHealth()
153
}
154
155