Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/traces/config.go
4094 views
1
package traces
2
3
import (
4
"encoding/base64"
5
"errors"
6
"fmt"
7
"net"
8
"os"
9
"sort"
10
"strings"
11
"time"
12
13
"github.com/mitchellh/mapstructure"
14
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/jaegerexporter"
15
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter"
16
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusexporter"
17
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/oauth2clientauthextension"
18
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor"
19
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanmetricsprocessor"
20
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor"
21
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver"
22
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver"
23
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/opencensusreceiver"
24
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinreceiver"
25
"github.com/prometheus/client_golang/prometheus"
26
prom_config "github.com/prometheus/common/config"
27
"go.opentelemetry.io/collector/component"
28
"go.opentelemetry.io/collector/config"
29
"go.opentelemetry.io/collector/config/configtest"
30
"go.opentelemetry.io/collector/confmap"
31
"go.opentelemetry.io/collector/exporter/otlpexporter"
32
"go.opentelemetry.io/collector/exporter/otlphttpexporter"
33
"go.opentelemetry.io/collector/processor/batchprocessor"
34
"go.opentelemetry.io/collector/receiver/otlpreceiver"
35
"go.opentelemetry.io/collector/service/external/configunmarshaler"
36
"go.uber.org/multierr"
37
38
"github.com/grafana/agent/pkg/logs"
39
"github.com/grafana/agent/pkg/traces/automaticloggingprocessor"
40
"github.com/grafana/agent/pkg/traces/noopreceiver"
41
"github.com/grafana/agent/pkg/traces/promsdprocessor"
42
"github.com/grafana/agent/pkg/traces/pushreceiver"
43
"github.com/grafana/agent/pkg/traces/remotewriteexporter"
44
"github.com/grafana/agent/pkg/traces/servicegraphprocessor"
45
"github.com/grafana/agent/pkg/util"
46
)
47
48
const (
49
spanMetricsPipelineName = "metrics/spanmetrics"
50
51
// defaultDecisionWait is the default time to wait for a trace before making a sampling decision
52
defaultDecisionWait = time.Second * 5
53
54
// defaultNumTraces is the default number of traces kept on memory.
55
defaultNumTraces = uint64(50000)
56
57
// defaultLoadBalancingPort is the default port the agent uses for internal load balancing
58
defaultLoadBalancingPort = "4318"
59
// agent's load balancing options
60
dnsTagName = "dns"
61
staticTagName = "static"
62
63
// sampling policies
64
alwaysSamplePolicy = "always_sample"
65
66
// otlp receiver
67
otlpReceiverName = "otlp"
68
)
69
70
// Config controls the configuration of Traces trace pipelines.
71
type Config struct {
72
Configs []InstanceConfig `yaml:"configs,omitempty"`
73
74
// Unmarshaled is true when the Config was unmarshaled from YAML.
75
Unmarshaled bool `yaml:"-"`
76
}
77
78
// UnmarshalYAML implements yaml.Unmarshaler.
79
func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
80
c.Unmarshaled = true
81
type plain Config
82
return unmarshal((*plain)(c))
83
}
84
85
// Validate ensures that the Config is valid.
86
func (c *Config) Validate(logsConfig *logs.Config) error {
87
names := make(map[string]struct{}, len(c.Configs))
88
for idx, c := range c.Configs {
89
if c.Name == "" {
90
return fmt.Errorf("traces config at index %d is missing a name", idx)
91
}
92
if _, exist := names[c.Name]; exist {
93
return fmt.Errorf("found multiple traces configs with name %s", c.Name)
94
}
95
names[c.Name] = struct{}{}
96
}
97
98
for _, inst := range c.Configs {
99
if inst.AutomaticLogging != nil {
100
if err := inst.AutomaticLogging.Validate(logsConfig); err != nil {
101
return fmt.Errorf("failed to validate automatic_logging for traces config %s: %w", inst.Name, err)
102
}
103
}
104
}
105
106
return nil
107
}
108
109
// InstanceConfig configures an individual Traces trace pipeline.
110
type InstanceConfig struct {
111
Name string `yaml:"name"`
112
113
// RemoteWrite defines one or multiple backends that can receive the pipeline's traffic.
114
RemoteWrite []RemoteWriteConfig `yaml:"remote_write,omitempty"`
115
116
// Receivers: https://github.com/open-telemetry/opentelemetry-collector/blob/7d7ae2eb34b5d387627875c498d7f43619f37ee3/receiver/README.md
117
Receivers ReceiverMap `yaml:"receivers,omitempty"`
118
119
// Batch: https://github.com/open-telemetry/opentelemetry-collector/blob/7d7ae2eb34b5d387627875c498d7f43619f37ee3/processor/batchprocessor/config.go#L24
120
Batch map[string]interface{} `yaml:"batch,omitempty"`
121
122
// Attributes: https://github.com/open-telemetry/opentelemetry-collector/blob/7d7ae2eb34b5d387627875c498d7f43619f37ee3/processor/attributesprocessor/config.go#L30
123
Attributes map[string]interface{} `yaml:"attributes,omitempty"`
124
125
// prom service discovery config
126
ScrapeConfigs []interface{} `yaml:"scrape_configs,omitempty"`
127
OperationType string `yaml:"prom_sd_operation_type,omitempty"`
128
PodAssociations []string `yaml:"prom_sd_pod_associations,omitempty"`
129
130
// SpanMetricsProcessor: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/processor/spanmetricsprocessor/README.md
131
SpanMetrics *SpanMetricsConfig `yaml:"spanmetrics,omitempty"`
132
133
// AutomaticLogging
134
AutomaticLogging *automaticloggingprocessor.AutomaticLoggingConfig `yaml:"automatic_logging,omitempty"`
135
136
// TailSampling defines a sampling strategy for the pipeline
137
TailSampling *tailSamplingConfig `yaml:"tail_sampling,omitempty"`
138
139
// LoadBalancing is used to distribute spans of the same trace to the same agent instance
140
LoadBalancing *loadBalancingConfig `yaml:"load_balancing"`
141
142
// ServiceGraphs
143
ServiceGraphs *serviceGraphsConfig `yaml:"service_graphs,omitempty"`
144
}
145
146
// ReceiverMap stores a set of receivers. Because receivers may be configured
147
// with an unknown set of sensitive information, ReceiverMap will marshal as
148
// YAML to the text "<secret>".
149
type ReceiverMap map[string]interface{}
150
151
// UnmarshalYAML implements yaml.Unmarshaler.
152
func (r *ReceiverMap) UnmarshalYAML(unmarshal func(interface{}) error) error {
153
type plain ReceiverMap
154
if err := unmarshal((*plain)(r)); err != nil {
155
return err
156
}
157
158
protocols := []string{protocolHTTP, protocolGRPC}
159
// enable include_metadata by default if receiver is OTLP
160
for k := range *r {
161
if strings.HasPrefix(k, otlpReceiverName) {
162
// for http and grpc receivers, include_metadata is set to true by default
163
receiverCfg, ok := (*r)[k].(map[interface{}]interface{})
164
if !ok {
165
return fmt.Errorf("failed to parse OTLP receiver config: %s", k)
166
}
167
168
protocolsCfg, ok := receiverCfg["protocols"].(map[interface{}]interface{})
169
if !ok {
170
return fmt.Errorf("otlp receiver requires a \"protocols\" field which must be a YAML map: %s", k)
171
}
172
173
for _, p := range protocols {
174
if cfg, ok := protocolsCfg[p]; ok {
175
if cfg == nil {
176
protocolsCfg[p] = map[interface{}]interface{}{"include_metadata": true}
177
} else {
178
if _, ok := cfg.(map[interface{}]interface{})["include_metadata"]; !ok {
179
protocolsCfg[p].(map[interface{}]interface{})["include_metadata"] = true
180
}
181
}
182
}
183
}
184
}
185
}
186
187
return nil
188
}
189
190
// MarshalYAML implements yaml.Marshaler.
191
func (r ReceiverMap) MarshalYAML() (interface{}, error) {
192
return "<secret>", nil
193
}
194
195
const (
196
compressionNone = "none"
197
compressionGzip = "gzip"
198
protocolGRPC = "grpc"
199
protocolHTTP = "http"
200
)
201
202
const (
203
formatOtlp = "otlp"
204
formatJaeger = "jaeger"
205
)
206
207
// DefaultRemoteWriteConfig holds the default settings for a PushConfig.
208
var DefaultRemoteWriteConfig = RemoteWriteConfig{
209
Compression: compressionGzip,
210
Protocol: protocolGRPC,
211
Format: formatOtlp,
212
}
213
214
// TLSClientSetting configures the oauth2client extension TLS; compatible with configtls.TLSClientSetting
215
type TLSClientSetting struct {
216
CAFile string `yaml:"ca_file,omitempty"`
217
CertFile string `yaml:"cert_file,omitempty"`
218
KeyFile string `yaml:"key_file,omitempty"`
219
MinVersion string `yaml:"min_version,omitempty"`
220
MaxVersion string `yaml:"max_version,omitempty"`
221
Insecure bool `yaml:"insecure"`
222
InsecureSkipVerify bool `yaml:"insecure_skip_verify"`
223
ServerNameOverride string `yaml:"server_name_override,omitempty"`
224
}
225
226
// OAuth2Config configures the oauth2client extension for a remote_write exporter
227
// compatible with oauth2clientauthextension.Config
228
type OAuth2Config struct {
229
ClientID string `yaml:"client_id"`
230
ClientSecret string `yaml:"client_secret"`
231
TokenURL string `yaml:"token_url"`
232
Scopes []string `yaml:"scopes,omitempty"`
233
TLS TLSClientSetting `yaml:"tls,omitempty"`
234
Timeout time.Duration `yaml:"timeout,omitempty"`
235
}
236
237
// Agent uses standard YAML unmarshalling, while the oauth2clientauthextension relies on
238
// mapstructure without providing YAML labels. `toOtelConfig` marshals `Oauth2Config` to configuration type expected by
239
// the oauth2clientauthextension Extension Factory
240
func (c OAuth2Config) toOtelConfig() (*oauth2clientauthextension.Config, error) {
241
var result *oauth2clientauthextension.Config
242
decoderConfig := &mapstructure.DecoderConfig{
243
MatchName: func(s, t string) bool { return util.CamelToSnake(s) == t },
244
Result: &result,
245
WeaklyTypedInput: true,
246
DecodeHook: mapstructure.ComposeDecodeHookFunc(
247
mapstructure.StringToSliceHookFunc(","),
248
mapstructure.StringToTimeDurationHookFunc(),
249
),
250
}
251
decoder, _ := mapstructure.NewDecoder(decoderConfig)
252
if err := decoder.Decode(c); err != nil {
253
return nil, err
254
}
255
return result, nil
256
}
257
258
// RemoteWriteConfig controls the configuration of an exporter
259
type RemoteWriteConfig struct {
260
Endpoint string `yaml:"endpoint,omitempty"`
261
Compression string `yaml:"compression,omitempty"`
262
Protocol string `yaml:"protocol,omitempty"`
263
Insecure bool `yaml:"insecure,omitempty"`
264
Format string `yaml:"format,omitempty"`
265
// Deprecated
266
InsecureSkipVerify bool `yaml:"insecure_skip_verify,omitempty"`
267
TLSConfig *prom_config.TLSConfig `yaml:"tls_config,omitempty"`
268
BasicAuth *prom_config.BasicAuth `yaml:"basic_auth,omitempty"`
269
Oauth2 *OAuth2Config `yaml:"oauth2,omitempty"`
270
Headers map[string]string `yaml:"headers,omitempty"`
271
SendingQueue map[string]interface{} `yaml:"sending_queue,omitempty"` // https://github.com/open-telemetry/opentelemetry-collector/blob/7d7ae2eb34b5d387627875c498d7f43619f37ee3/exporter/exporterhelper/queued_retry.go#L30
272
RetryOnFailure map[string]interface{} `yaml:"retry_on_failure,omitempty"` // https://github.com/open-telemetry/opentelemetry-collector/blob/7d7ae2eb34b5d387627875c498d7f43619f37ee3/exporter/exporterhelper/queued_retry.go#L54
273
}
274
275
// UnmarshalYAML implements yaml.Unmarshaler.
276
func (c *RemoteWriteConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
277
*c = DefaultRemoteWriteConfig
278
279
type plain RemoteWriteConfig
280
281
if err := unmarshal((*plain)(c)); err != nil {
282
return err
283
}
284
285
if c.Compression != compressionGzip && c.Compression != compressionNone {
286
return fmt.Errorf("unsupported compression '%s', expected 'gzip' or 'none'", c.Compression)
287
}
288
289
if c.Format != formatOtlp && c.Format != formatJaeger {
290
return fmt.Errorf("unsupported format '%s', expected 'otlp' or 'jaeger'", c.Format)
291
}
292
return nil
293
}
294
295
// SpanMetricsConfig controls the configuration of spanmetricsprocessor and the related metrics exporter.
296
type SpanMetricsConfig struct {
297
LatencyHistogramBuckets []time.Duration `yaml:"latency_histogram_buckets,omitempty"`
298
Dimensions []spanmetricsprocessor.Dimension `yaml:"dimensions,omitempty"`
299
// Namespace if set, exports metrics under the provided value.
300
Namespace string `yaml:"namespace,omitempty"`
301
// ConstLabels are values that are applied for every exported metric.
302
ConstLabels *prometheus.Labels `yaml:"const_labels,omitempty"`
303
// MetricsInstance is the Agent's metrics instance that will be used to push metrics
304
MetricsInstance string `yaml:"metrics_instance"`
305
// HandlerEndpoint is the address where a prometheus exporter will be exposed
306
HandlerEndpoint string `yaml:"handler_endpoint"`
307
308
// DimensionsCacheSize defines the size of cache for storing Dimensions, which helps to avoid cache memory growing
309
// indefinitely over the lifetime of the collector.
310
DimensionsCacheSize int `yaml:"dimensions_cache_size"`
311
}
312
313
// tailSamplingConfig is the configuration for tail-based sampling
314
type tailSamplingConfig struct {
315
// Policies are the strategies used for sampling. Multiple policies can be used in the same pipeline.
316
// For more information, refer to https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/tailsamplingprocessor
317
Policies []policy `yaml:"policies"`
318
// DecisionWait defines the time to wait for a complete trace before making a decision
319
DecisionWait time.Duration `yaml:"decision_wait,omitempty"`
320
// NumTraces is the number of traces kept on memory. Typically most of the data
321
// of a trace is released after a sampling decision is taken.
322
NumTraces uint64 `yaml:"num_traces,omitempty"`
323
// ExpectedNewTracesPerSec sets the expected number of new traces sending to the tail sampling processor
324
// per second. This helps with allocating data structures with closer to actual usage size.
325
ExpectedNewTracesPerSec uint64 `yaml:"expected_new_traces_per_sec,omitempty"`
326
}
327
328
type policy struct {
329
Name string `yaml:"name,omitempty"`
330
Type string `yaml:"type"`
331
Policy map[string]interface{} `yaml:",inline"`
332
}
333
334
// loadBalancingConfig defines the configuration for load balancing spans between agent instances
335
// loadBalancingConfig is an OTel exporter's config with extra resolver config
336
type loadBalancingConfig struct {
337
Exporter exporterConfig `yaml:"exporter"`
338
Resolver map[string]interface{} `yaml:"resolver"`
339
// ReceiverPort is the port the instance will use to receive load balanced traces
340
ReceiverPort string `yaml:"receiver_port"`
341
}
342
343
// exporterConfig defined the config for an otlp exporter for load balancing
344
type exporterConfig struct {
345
Compression string `yaml:"compression,omitempty"`
346
Insecure bool `yaml:"insecure,omitempty"`
347
InsecureSkipVerify bool `yaml:"insecure_skip_verify,omitempty"`
348
BasicAuth *prom_config.BasicAuth `yaml:"basic_auth,omitempty"`
349
Format string `yaml:"format,omitempty"`
350
}
351
352
type serviceGraphsConfig struct {
353
Enabled bool `yaml:"enabled,omitempty"`
354
Wait time.Duration `yaml:"wait,omitempty"`
355
MaxItems int `yaml:"max_items,omitempty"`
356
}
357
358
// exporter builds an OTel exporter from RemoteWriteConfig
359
func exporter(rwCfg RemoteWriteConfig) (map[string]interface{}, error) {
360
if len(rwCfg.Endpoint) == 0 {
361
return nil, errors.New("must have a configured a backend endpoint")
362
}
363
364
headers := map[string]string{}
365
if rwCfg.Headers != nil {
366
headers = rwCfg.Headers
367
}
368
369
if rwCfg.BasicAuth != nil && rwCfg.Oauth2 != nil {
370
return nil, fmt.Errorf("only one auth type may be configured per exporter (basic_auth or oauth2)")
371
}
372
373
if rwCfg.BasicAuth != nil {
374
password := string(rwCfg.BasicAuth.Password)
375
376
if len(rwCfg.BasicAuth.PasswordFile) > 0 {
377
buff, err := os.ReadFile(rwCfg.BasicAuth.PasswordFile)
378
if err != nil {
379
return nil, fmt.Errorf("unable to load password file %s: %w", rwCfg.BasicAuth.PasswordFile, err)
380
}
381
password = strings.TrimSpace(string(buff))
382
}
383
384
encodedAuth := base64.StdEncoding.EncodeToString([]byte(rwCfg.BasicAuth.Username + ":" + password))
385
headers["authorization"] = "Basic " + encodedAuth
386
}
387
388
compression := rwCfg.Compression
389
if compression == "" {
390
compression = compressionNone
391
}
392
393
// Default OTLP exporter config awaits an empty headers map. Other exporters
394
// (e.g. Jaeger) may expect a nil value instead
395
if len(headers) == 0 && rwCfg.Format == formatJaeger {
396
headers = nil
397
}
398
exporter := map[string]interface{}{
399
"endpoint": rwCfg.Endpoint,
400
"compression": compression,
401
"headers": headers,
402
"sending_queue": rwCfg.SendingQueue,
403
"retry_on_failure": rwCfg.RetryOnFailure,
404
}
405
406
tlsConfig := map[string]interface{}{
407
"insecure": rwCfg.Insecure,
408
}
409
if !rwCfg.Insecure {
410
// If there is a TLSConfig use it
411
if rwCfg.TLSConfig != nil {
412
tlsConfig["ca_file"] = rwCfg.TLSConfig.CAFile
413
tlsConfig["cert_file"] = rwCfg.TLSConfig.CertFile
414
tlsConfig["key_file"] = rwCfg.TLSConfig.KeyFile
415
tlsConfig["insecure_skip_verify"] = rwCfg.TLSConfig.InsecureSkipVerify
416
} else {
417
// If not, set whatever value is specified in the old config.
418
tlsConfig["insecure_skip_verify"] = rwCfg.InsecureSkipVerify
419
}
420
}
421
exporter["tls"] = tlsConfig
422
423
// Apply some sane defaults to the exporter. The
424
// sending_queue.retry_on_failure default is 300s which prevents any
425
// sending-related errors to not be logged for 5 minutes. We'll lower that
426
// to 60s.
427
if retryConfig := exporter["retry_on_failure"].(map[string]interface{}); retryConfig == nil {
428
exporter["retry_on_failure"] = map[string]interface{}{
429
"max_elapsed_time": "60s",
430
}
431
} else if retryConfig["max_elapsed_time"] == nil {
432
retryConfig["max_elapsed_time"] = "60s"
433
}
434
435
return exporter, nil
436
}
437
438
func getExporterName(index int, protocol string, format string) (string, error) {
439
switch format {
440
case formatOtlp:
441
switch protocol {
442
case protocolGRPC:
443
return fmt.Sprintf("otlp/%d", index), nil
444
case protocolHTTP:
445
return fmt.Sprintf("otlphttp/%d", index), nil
446
default:
447
return "", errors.New("unknown protocol, expected either 'http' or 'grpc'")
448
}
449
case formatJaeger:
450
switch protocol {
451
case protocolGRPC:
452
return fmt.Sprintf("jaeger/%d", index), nil
453
default:
454
return "", errors.New("unknown protocol, expected 'grpc'")
455
}
456
default:
457
return "", errors.New("unknown format, expected either 'otlp' or 'jaeger'")
458
}
459
}
460
461
// exporters builds one or multiple exporters from a remote_write block.
462
func (c *InstanceConfig) exporters() (map[string]interface{}, error) {
463
exporters := map[string]interface{}{}
464
for i, remoteWriteConfig := range c.RemoteWrite {
465
exporter, err := exporter(remoteWriteConfig)
466
if err != nil {
467
return nil, err
468
}
469
exporterName, err := getExporterName(i, remoteWriteConfig.Protocol, remoteWriteConfig.Format)
470
if err != nil {
471
return nil, err
472
}
473
if remoteWriteConfig.Oauth2 != nil {
474
exporter["auth"] = map[string]string{"authenticator": getAuthExtensionName(exporterName)}
475
}
476
exporters[exporterName] = exporter
477
}
478
return exporters, nil
479
}
480
481
func getAuthExtensionName(exporterName string) string {
482
return fmt.Sprintf("oauth2client/%s", strings.Replace(exporterName, "/", "", -1))
483
}
484
485
// builds oauth2clientauth extensions required to support RemoteWriteConfigurations.
486
func (c *InstanceConfig) extensions() (map[string]interface{}, error) {
487
extensions := map[string]interface{}{}
488
for i, remoteWriteConfig := range c.RemoteWrite {
489
if remoteWriteConfig.Oauth2 == nil {
490
continue
491
}
492
exporterName, err := getExporterName(i, remoteWriteConfig.Protocol, remoteWriteConfig.Format)
493
if err != nil {
494
return nil, err
495
}
496
oauthConfig, err := remoteWriteConfig.Oauth2.toOtelConfig()
497
if err != nil {
498
return nil, err
499
}
500
extensions[getAuthExtensionName(exporterName)] = oauthConfig
501
}
502
return extensions, nil
503
}
504
505
func resolver(config map[string]interface{}) (map[string]interface{}, error) {
506
if len(config) == 0 {
507
return nil, fmt.Errorf("must configure one resolver (dns or static)")
508
}
509
resolverCfg := make(map[string]interface{})
510
for typ, cfg := range config {
511
switch typ {
512
case dnsTagName, staticTagName:
513
resolverCfg[typ] = cfg
514
default:
515
return nil, fmt.Errorf("unsupported resolver config type: %s", typ)
516
}
517
}
518
return resolverCfg, nil
519
}
520
521
func (c *InstanceConfig) loadBalancingExporter() (map[string]interface{}, error) {
522
exporter, err := exporter(RemoteWriteConfig{
523
// Endpoint is omitted in OTel load balancing exporter
524
Endpoint: "noop",
525
Compression: c.LoadBalancing.Exporter.Compression,
526
Insecure: c.LoadBalancing.Exporter.Insecure,
527
TLSConfig: &prom_config.TLSConfig{InsecureSkipVerify: c.LoadBalancing.Exporter.InsecureSkipVerify},
528
BasicAuth: c.LoadBalancing.Exporter.BasicAuth,
529
Format: c.LoadBalancing.Exporter.Format,
530
Headers: map[string]string{},
531
})
532
if err != nil {
533
return nil, err
534
}
535
resolverCfg, err := resolver(c.LoadBalancing.Resolver)
536
if err != nil {
537
return nil, err
538
}
539
return map[string]interface{}{
540
"protocol": map[string]interface{}{
541
"otlp": exporter,
542
},
543
"resolver": resolverCfg,
544
}, nil
545
}
546
547
// formatPolicies creates sampling policies (i.e. rules) compatible with OTel's tail sampling processor
548
// https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.46.0/processor/tailsamplingprocessor
549
func formatPolicies(cfg []policy) ([]map[string]interface{}, error) {
550
policies := make([]map[string]interface{}, 0, len(cfg))
551
for i, policy := range cfg {
552
typ, name := policy.Type, policy.Name
553
if typ == "" {
554
return nil, fmt.Errorf("policy %d must have a type", i)
555
}
556
557
if name == "" {
558
name = fmt.Sprintf("%s/%d", typ, i)
559
}
560
561
switch typ {
562
case alwaysSamplePolicy:
563
policies = append(policies, map[string]interface{}{
564
"name": name,
565
"type": typ,
566
})
567
default:
568
policies = append(policies, map[string]interface{}{
569
"name": name,
570
"type": typ,
571
typ: policy.Policy[typ],
572
})
573
}
574
}
575
return policies, nil
576
}
577
578
func (c *InstanceConfig) otelConfig() (*config.Config, error) {
579
otelMapStructure := map[string]interface{}{}
580
581
if len(c.Receivers) == 0 {
582
return nil, errors.New("must have at least one configured receiver")
583
}
584
585
// add a hacky push receiver for when an integration
586
// wants to push traces directly, e.g. app agent receiver.
587
// it can only accept traces programmatically from inside the agent
588
c.Receivers[pushreceiver.TypeStr] = nil
589
590
extensions, err := c.extensions()
591
if err != nil {
592
return nil, err
593
}
594
extensionsNames := make([]string, 0, len(extensions))
595
for name := range extensions {
596
extensionsNames = append(extensionsNames, name)
597
}
598
599
exporters, err := c.exporters()
600
if err != nil {
601
return nil, err
602
}
603
exportersNames := make([]string, 0, len(exporters))
604
for name := range exporters {
605
exportersNames = append(exportersNames, name)
606
}
607
608
// processors
609
processors := map[string]interface{}{}
610
processorNames := []string{}
611
if c.ScrapeConfigs != nil {
612
opType := promsdprocessor.OperationTypeUpsert
613
if c.OperationType != "" {
614
opType = c.OperationType
615
}
616
processorNames = append(processorNames, promsdprocessor.TypeStr)
617
processors[promsdprocessor.TypeStr] = map[string]interface{}{
618
"scrape_configs": c.ScrapeConfigs,
619
"operation_type": opType,
620
"pod_associations": c.PodAssociations,
621
}
622
}
623
624
if c.AutomaticLogging != nil {
625
processorNames = append(processorNames, automaticloggingprocessor.TypeStr)
626
processors[automaticloggingprocessor.TypeStr] = map[string]interface{}{
627
"automatic_logging": c.AutomaticLogging,
628
}
629
}
630
631
if c.Attributes != nil {
632
processors["attributes"] = c.Attributes
633
processorNames = append(processorNames, "attributes")
634
}
635
636
if c.Batch != nil {
637
processors["batch"] = c.Batch
638
processorNames = append(processorNames, "batch")
639
}
640
641
pipelines := make(map[string]interface{})
642
if c.SpanMetrics != nil {
643
// Configure the metrics exporter.
644
namespace := "traces_spanmetrics"
645
if len(c.SpanMetrics.Namespace) != 0 {
646
namespace = fmt.Sprintf("%s_%s", c.SpanMetrics.Namespace, namespace)
647
}
648
649
var exporterName string
650
if len(c.SpanMetrics.MetricsInstance) != 0 && len(c.SpanMetrics.HandlerEndpoint) == 0 {
651
exporterName = remotewriteexporter.TypeStr
652
exporters[remotewriteexporter.TypeStr] = map[string]interface{}{
653
"namespace": namespace,
654
"const_labels": c.SpanMetrics.ConstLabels,
655
"metrics_instance": c.SpanMetrics.MetricsInstance,
656
}
657
} else if len(c.SpanMetrics.MetricsInstance) == 0 && len(c.SpanMetrics.HandlerEndpoint) != 0 {
658
exporterName = "prometheus"
659
exporters[exporterName] = map[string]interface{}{
660
"endpoint": c.SpanMetrics.HandlerEndpoint,
661
"namespace": namespace,
662
"const_labels": c.SpanMetrics.ConstLabels,
663
}
664
} else {
665
return nil, fmt.Errorf("must specify a prometheus instance or a metrics handler endpoint to export the metrics")
666
}
667
668
processorNames = append(processorNames, "spanmetrics")
669
spanMetrics := map[string]interface{}{
670
"metrics_exporter": exporterName,
671
"latency_histogram_buckets": c.SpanMetrics.LatencyHistogramBuckets,
672
"dimensions": c.SpanMetrics.Dimensions,
673
}
674
if c.SpanMetrics.DimensionsCacheSize != 0 {
675
spanMetrics["dimensions_cache_size"] = c.SpanMetrics.DimensionsCacheSize
676
}
677
processors["spanmetrics"] = spanMetrics
678
679
pipelines[spanMetricsPipelineName] = map[string]interface{}{
680
"receivers": []string{noopreceiver.TypeStr},
681
"exporters": []string{exporterName},
682
}
683
}
684
685
// receivers
686
receiverNames := []string{}
687
for name := range c.Receivers {
688
receiverNames = append(receiverNames, name)
689
}
690
691
if c.TailSampling != nil {
692
expectedNewTracesPerSec := c.TailSampling.ExpectedNewTracesPerSec
693
694
numTraces := defaultNumTraces
695
if c.TailSampling.NumTraces != 0 {
696
numTraces = c.TailSampling.NumTraces
697
}
698
699
wait := defaultDecisionWait
700
if c.TailSampling.DecisionWait != 0 {
701
wait = c.TailSampling.DecisionWait
702
}
703
704
policies, err := formatPolicies(c.TailSampling.Policies)
705
if err != nil {
706
return nil, err
707
}
708
709
// tail_sampling should be executed before the batch processor
710
// TODO(mario.rodriguez): put attributes processor before tail_sampling. Maybe we want to sample on mutated spans
711
processorNames = append([]string{"tail_sampling"}, processorNames...)
712
processors["tail_sampling"] = map[string]interface{}{
713
"policies": policies,
714
"decision_wait": wait,
715
"num_traces": numTraces,
716
"expected_new_traces_per_sec": expectedNewTracesPerSec,
717
}
718
}
719
720
if c.LoadBalancing != nil {
721
internalExporter, err := c.loadBalancingExporter()
722
if err != nil {
723
return nil, err
724
}
725
exporters["loadbalancing"] = internalExporter
726
727
receiverPort := defaultLoadBalancingPort
728
if c.LoadBalancing.ReceiverPort != "" {
729
receiverPort = c.LoadBalancing.ReceiverPort
730
}
731
c.Receivers["otlp/lb"] = map[string]interface{}{
732
"protocols": map[string]interface{}{
733
"grpc": map[string]interface{}{
734
"endpoint": net.JoinHostPort("0.0.0.0", receiverPort),
735
},
736
},
737
}
738
}
739
740
if c.ServiceGraphs != nil && c.ServiceGraphs.Enabled {
741
processors[servicegraphprocessor.TypeStr] = map[string]interface{}{
742
"wait": c.ServiceGraphs.Wait,
743
"max_items": c.ServiceGraphs.MaxItems,
744
}
745
processorNames = append(processorNames, servicegraphprocessor.TypeStr)
746
}
747
748
// Build Pipelines
749
splitPipeline := c.LoadBalancing != nil
750
orderedSplitProcessors := orderProcessors(processorNames, splitPipeline)
751
if splitPipeline {
752
// load balancing pipeline
753
pipelines["traces/0"] = map[string]interface{}{
754
"receivers": receiverNames,
755
"processors": orderedSplitProcessors[0],
756
"exporters": []string{"loadbalancing"},
757
}
758
// processing pipeline
759
pipelines["traces/1"] = map[string]interface{}{
760
"exporters": exportersNames,
761
"processors": orderedSplitProcessors[1],
762
"receivers": []string{"otlp/lb"},
763
}
764
} else {
765
pipelines["traces"] = map[string]interface{}{
766
"exporters": exportersNames,
767
"processors": orderedSplitProcessors[0],
768
"receivers": receiverNames,
769
}
770
}
771
772
if c.SpanMetrics != nil {
773
// Insert a noop receiver in the metrics pipeline.
774
// Added to pass validation requiring at least one receiver in a pipeline.
775
c.Receivers[noopreceiver.TypeStr] = nil
776
}
777
778
receiversMap := map[string]interface{}(c.Receivers)
779
780
otelMapStructure["extensions"] = extensions
781
otelMapStructure["exporters"] = exporters
782
otelMapStructure["processors"] = processors
783
otelMapStructure["receivers"] = receiversMap
784
785
// pipelines
786
serviceMap := map[string]interface{}{
787
"pipelines": pipelines,
788
}
789
if len(extensionsNames) > 0 {
790
serviceMap["extensions"] = extensionsNames
791
}
792
otelMapStructure["service"] = serviceMap
793
794
factories, err := tracingFactories()
795
if err != nil {
796
return nil, fmt.Errorf("failed to create factories: %w", err)
797
}
798
799
if err := validateConfigFromFactories(factories); err != nil {
800
return nil, fmt.Errorf("failed to validate factories: %w", err)
801
}
802
803
configMap := confmap.NewFromStringMap(otelMapStructure)
804
otelCfg, err := configunmarshaler.Unmarshal(configMap, factories)
805
if err != nil {
806
return nil, fmt.Errorf("failed to load OTel config: %w", err)
807
}
808
809
return otelCfg, nil
810
}
811
812
// tracingFactories() only creates the needed factories. if we decide to add support for a new
813
// processor, exporter, receiver we need to add it here
814
func tracingFactories() (component.Factories, error) {
815
extensions, err := component.MakeExtensionFactoryMap(
816
oauth2clientauthextension.NewFactory(),
817
)
818
if err != nil {
819
return component.Factories{}, err
820
}
821
822
receivers, err := component.MakeReceiverFactoryMap(
823
jaegerreceiver.NewFactory(),
824
zipkinreceiver.NewFactory(),
825
otlpreceiver.NewFactory(),
826
opencensusreceiver.NewFactory(),
827
kafkareceiver.NewFactory(),
828
noopreceiver.NewFactory(),
829
pushreceiver.NewFactory(),
830
)
831
if err != nil {
832
return component.Factories{}, err
833
}
834
835
exporters, err := component.MakeExporterFactoryMap(
836
otlpexporter.NewFactory(),
837
otlphttpexporter.NewFactory(),
838
jaegerexporter.NewFactory(),
839
loadbalancingexporter.NewFactory(),
840
prometheusexporter.NewFactory(),
841
remotewriteexporter.NewFactory(),
842
)
843
if err != nil {
844
return component.Factories{}, err
845
}
846
847
processors, err := component.MakeProcessorFactoryMap(
848
batchprocessor.NewFactory(),
849
attributesprocessor.NewFactory(),
850
promsdprocessor.NewFactory(),
851
spanmetricsprocessor.NewFactory(),
852
automaticloggingprocessor.NewFactory(),
853
tailsamplingprocessor.NewFactory(),
854
servicegraphprocessor.NewFactory(),
855
)
856
if err != nil {
857
return component.Factories{}, err
858
}
859
860
return component.Factories{
861
Extensions: extensions,
862
Receivers: receivers,
863
Processors: processors,
864
Exporters: exporters,
865
}, nil
866
}
867
868
// orders the passed processors into their preferred order in a tracing pipeline. pass
869
// true to splitPipelines if this function should split the input pipelines into two
870
// sets: before and after load balancing
871
func orderProcessors(processors []string, splitPipelines bool) [][]string {
872
order := map[string]int{
873
"attributes": 0,
874
"spanmetrics": 1,
875
"service_graphs": 2,
876
"tail_sampling": 3,
877
"automatic_logging": 4,
878
"batch": 5,
879
}
880
881
sort.Slice(processors, func(i, j int) bool {
882
iVal := order[processors[i]]
883
jVal := order[processors[j]]
884
885
return iVal < jVal
886
})
887
888
if !splitPipelines {
889
return [][]string{
890
processors,
891
}
892
}
893
894
// if we're splitting pipelines we have to look for the first processor that belongs in the second
895
// stage and split on that. if nothing belongs in the second stage just leave them all in the first
896
foundAt := len(processors)
897
for i, processor := range processors {
898
if processor == "batch" ||
899
processor == "tail_sampling" ||
900
processor == "automatic_logging" ||
901
processor == "service_graphs" {
902
903
foundAt = i
904
break
905
}
906
}
907
908
return [][]string{
909
processors[:foundAt],
910
processors[foundAt:],
911
}
912
}
913
914
// Code taken from OTel's service/configcheck.go
915
// https://github.com/grafana/opentelemetry-collector/blob/0.40-grafana/service/configcheck.go#L26-L43
916
func validateConfigFromFactories(factories component.Factories) error {
917
var errs error
918
919
for _, factory := range factories.Receivers {
920
errs = multierr.Append(errs, configtest.CheckConfigStruct(factory.CreateDefaultConfig()))
921
}
922
for _, factory := range factories.Processors {
923
errs = multierr.Append(errs, configtest.CheckConfigStruct(factory.CreateDefaultConfig()))
924
}
925
for _, factory := range factories.Exporters {
926
errs = multierr.Append(errs, configtest.CheckConfigStruct(factory.CreateDefaultConfig()))
927
}
928
for _, factory := range factories.Extensions {
929
errs = multierr.Append(errs, configtest.CheckConfigStruct(factory.CreateDefaultConfig()))
930
}
931
932
return errs
933
}
934
935