Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/traces/instance.go
4093 views
1
package traces
2
3
import (
4
"context"
5
"fmt"
6
"sync"
7
"time"
8
9
"github.com/prometheus/client_golang/prometheus"
10
"go.opencensus.io/stats/view"
11
"go.opentelemetry.io/collector/component"
12
"go.opentelemetry.io/collector/config"
13
"go.opentelemetry.io/collector/service/extensions"
14
"go.opentelemetry.io/collector/service/external/pipelines"
15
"go.opentelemetry.io/otel/metric"
16
"go.opentelemetry.io/otel/trace"
17
"go.uber.org/zap"
18
19
"github.com/grafana/agent/pkg/build"
20
"github.com/grafana/agent/pkg/logs"
21
"github.com/grafana/agent/pkg/metrics/instance"
22
"github.com/grafana/agent/pkg/traces/automaticloggingprocessor"
23
"github.com/grafana/agent/pkg/traces/contextkeys"
24
"github.com/grafana/agent/pkg/util"
25
)
26
27
// Instance wraps the OpenTelemetry collector to enable tracing pipelines
28
type Instance struct {
29
mut sync.Mutex
30
cfg InstanceConfig
31
logger *zap.Logger
32
metricViews []*view.View
33
34
extensions *extensions.Extensions
35
pipelines *pipelines.Pipelines
36
factories component.Factories
37
}
38
39
var _ component.Host = (*Instance)(nil)
40
41
// NewInstance creates and starts an instance of tracing pipelines.
42
func NewInstance(logsSubsystem *logs.Logs, reg prometheus.Registerer, cfg InstanceConfig, logger *zap.Logger, promInstanceManager instance.Manager) (*Instance, error) {
43
var err error
44
45
instance := &Instance{}
46
instance.logger = logger
47
instance.metricViews, err = newMetricViews(reg)
48
if err != nil {
49
return nil, fmt.Errorf("failed to create metric views: %w", err)
50
}
51
52
if err := instance.ApplyConfig(logsSubsystem, promInstanceManager, reg, cfg); err != nil {
53
return nil, err
54
}
55
return instance, nil
56
}
57
58
// ApplyConfig updates the configuration of the Instance.
59
func (i *Instance) ApplyConfig(logsSubsystem *logs.Logs, promInstanceManager instance.Manager, reg prometheus.Registerer, cfg InstanceConfig) error {
60
i.mut.Lock()
61
defer i.mut.Unlock()
62
63
if util.CompareYAML(cfg, i.cfg) {
64
// No config change
65
return nil
66
}
67
i.cfg = cfg
68
69
// Shut down any existing pipeline
70
i.stop()
71
72
err := i.buildAndStartPipeline(context.Background(), cfg, logsSubsystem, promInstanceManager, reg)
73
if err != nil {
74
return fmt.Errorf("failed to create pipeline: %w", err)
75
}
76
77
return nil
78
}
79
80
// Stop stops the OpenTelemetry collector subsystem
81
func (i *Instance) Stop() {
82
i.mut.Lock()
83
defer i.mut.Unlock()
84
85
i.stop()
86
view.Unregister(i.metricViews...)
87
}
88
89
func (i *Instance) stop() {
90
shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
91
defer cancel()
92
93
if i.extensions != nil {
94
err := i.extensions.NotifyPipelineNotReady()
95
if err != nil {
96
i.logger.Error("failed to notify extension of pipeline shutdown", zap.Error(err))
97
}
98
}
99
100
dependencies := []struct {
101
name string
102
shutdown func() error
103
}{
104
{
105
name: "pipelines",
106
shutdown: func() error {
107
if i.pipelines == nil {
108
return nil
109
}
110
return i.pipelines.ShutdownAll(shutdownCtx)
111
},
112
},
113
{
114
name: "extensions",
115
shutdown: func() error {
116
if i.extensions == nil {
117
return nil
118
}
119
return i.extensions.Shutdown(shutdownCtx)
120
},
121
},
122
}
123
124
for _, dep := range dependencies {
125
i.logger.Info(fmt.Sprintf("shutting down %s", dep.name))
126
if err := dep.shutdown(); err != nil {
127
i.logger.Error(fmt.Sprintf("failed to shutdown %s", dep.name), zap.Error(err))
128
}
129
}
130
131
i.pipelines = nil
132
i.extensions = nil
133
}
134
135
func (i *Instance) buildAndStartPipeline(ctx context.Context, cfg InstanceConfig, logs *logs.Logs, instManager instance.Manager, reg prometheus.Registerer) error {
136
// create component factories
137
otelConfig, err := cfg.otelConfig()
138
if err != nil {
139
return fmt.Errorf("failed to load otelConfig from agent traces config: %w", err)
140
}
141
for _, rw := range cfg.RemoteWrite {
142
if rw.InsecureSkipVerify {
143
i.logger.Warn("Configuring TLS with insecure_skip_verify. Use tls_config.insecure_skip_verify instead")
144
}
145
if rw.TLSConfig != nil && rw.TLSConfig.ServerName != "" {
146
i.logger.Warn("Configuring unsupported tls_config.server_name")
147
}
148
}
149
150
if cfg.SpanMetrics != nil && len(cfg.SpanMetrics.MetricsInstance) != 0 {
151
ctx = context.WithValue(ctx, contextkeys.Metrics, instManager)
152
}
153
154
if cfg.LoadBalancing == nil && (cfg.TailSampling != nil || cfg.ServiceGraphs != nil) {
155
i.logger.Warn("Configuring tail_sampling and/or service_graphs without load_balance." +
156
"Load balancing is required for those features to properly work in multi agent deployments")
157
}
158
159
if cfg.AutomaticLogging != nil && cfg.AutomaticLogging.Backend != automaticloggingprocessor.BackendStdout {
160
ctx = context.WithValue(ctx, contextkeys.Logs, logs)
161
}
162
163
if cfg.ServiceGraphs != nil {
164
ctx = context.WithValue(ctx, contextkeys.PrometheusRegisterer, reg)
165
}
166
167
factories, err := tracingFactories()
168
if err != nil {
169
return fmt.Errorf("failed to load tracing factories: %w", err)
170
}
171
i.factories = factories
172
173
appinfo := component.BuildInfo{
174
Command: "agent",
175
Description: "agent",
176
Version: build.Version,
177
}
178
179
settings := component.TelemetrySettings{
180
Logger: i.logger,
181
TracerProvider: trace.NewNoopTracerProvider(),
182
MeterProvider: metric.NewNoopMeterProvider(),
183
}
184
185
// start extensions
186
i.extensions, err = extensions.New(ctx, extensions.Settings{
187
Telemetry: settings,
188
BuildInfo: appinfo,
189
190
Factories: factories.Extensions,
191
Configs: otelConfig.Extensions,
192
}, otelConfig.Service.Extensions)
193
if err != nil {
194
i.logger.Error(fmt.Sprintf("failed to build extensions: %s", err.Error()))
195
return fmt.Errorf("failed to create extensions builder: %w", err)
196
}
197
err = i.extensions.Start(ctx, i)
198
if err != nil {
199
i.logger.Error(fmt.Sprintf("failed to start extensions: %s", err.Error()))
200
return fmt.Errorf("failed to start extensions: %w", err)
201
}
202
203
i.pipelines, err = pipelines.Build(ctx, pipelines.Settings{
204
Telemetry: settings,
205
BuildInfo: appinfo,
206
207
ReceiverFactories: factories.Receivers,
208
ReceiverConfigs: otelConfig.Receivers,
209
ProcessorFactories: factories.Processors,
210
ProcessorConfigs: otelConfig.Processors,
211
ExporterFactories: factories.Exporters,
212
ExporterConfigs: otelConfig.Exporters,
213
214
PipelineConfigs: otelConfig.Pipelines,
215
})
216
if err != nil {
217
return fmt.Errorf("failed to create pipelines: %w", err)
218
}
219
if err := i.pipelines.StartAll(ctx, i); err != nil {
220
i.logger.Error(fmt.Sprintf("failed to start pipelines: %s", err.Error()))
221
return fmt.Errorf("failed to start pipelines: %w", err)
222
}
223
224
return i.extensions.NotifyPipelineReady()
225
}
226
227
// ReportFatalError implements component.Host
228
func (i *Instance) ReportFatalError(err error) {
229
i.logger.Error("fatal error reported", zap.Error(err))
230
}
231
232
// GetFactory implements component.Host
233
func (i *Instance) GetFactory(kind component.Kind, componentType config.Type) component.Factory {
234
switch kind {
235
case component.KindReceiver:
236
return i.factories.Receivers[componentType]
237
default:
238
return nil
239
}
240
}
241
242
// GetExtensions implements component.Host
243
func (i *Instance) GetExtensions() map[config.ComponentID]component.Extension {
244
return i.extensions.GetExtensions()
245
}
246
247
// GetExporters implements component.Host
248
func (i *Instance) GetExporters() map[config.DataType]map[config.ComponentID]component.Exporter {
249
// SpanMetricsProcessor needs to get the configured exporters.
250
return i.pipelines.GetExporters()
251
}
252
253