Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/traces/promsdprocessor/factory.go
4094 views
1
package promsdprocessor
2
3
import (
4
"context"
5
"fmt"
6
7
prom_config "github.com/prometheus/prometheus/config"
8
"go.opentelemetry.io/collector/component"
9
"go.opentelemetry.io/collector/config"
10
"go.opentelemetry.io/collector/consumer"
11
"gopkg.in/yaml.v2"
12
)
13
14
// TypeStr is the unique identifier for the Prometheus SD processor.
15
const TypeStr = "prom_sd_processor"
16
17
const (
18
// OperationTypeInsert inserts a new k/v if it isn't already present
19
OperationTypeInsert = "insert"
20
// OperationTypeUpdate only modifies an existing k/v
21
OperationTypeUpdate = "update"
22
// OperationTypeUpsert does both of above
23
OperationTypeUpsert = "upsert"
24
25
podAssociationIPLabel = "ip"
26
podAssociationOTelIPLabel = "net.host.ip"
27
podAssociationk8sIPLabel = "k8s.pod.ip"
28
podAssociationHostnameLabel = "hostname"
29
podAssociationConnectionIP = "connection"
30
)
31
32
// Config holds the configuration for the Prometheus SD processor.
33
type Config struct {
34
config.ProcessorSettings `mapstructure:",squash"`
35
ScrapeConfigs []interface{} `mapstructure:"scrape_configs"`
36
OperationType string `mapstructure:"operation_type"`
37
PodAssociations []string `mapstructure:"pod_associations"`
38
}
39
40
// NewFactory returns a new factory for the Attributes processor.
41
func NewFactory() component.ProcessorFactory {
42
return component.NewProcessorFactory(
43
TypeStr,
44
createDefaultConfig,
45
component.WithTracesProcessor(createTraceProcessor, component.StabilityLevelUndefined),
46
)
47
}
48
49
func createDefaultConfig() config.Processor {
50
return &Config{
51
ProcessorSettings: config.NewProcessorSettings(config.NewComponentIDWithName(TypeStr, TypeStr)),
52
}
53
}
54
55
func createTraceProcessor(
56
_ context.Context,
57
cp component.ProcessorCreateSettings,
58
cfg config.Processor,
59
nextConsumer consumer.Traces,
60
) (component.TracesProcessor, error) {
61
62
oCfg := cfg.(*Config)
63
out, err := yaml.Marshal(oCfg.ScrapeConfigs)
64
if err != nil {
65
return nil, fmt.Errorf("unable to marshal scrapeConfigs interface{} to yaml: %w", err)
66
}
67
68
scrapeConfigs := make([]*prom_config.ScrapeConfig, 0)
69
err = yaml.Unmarshal(out, &scrapeConfigs)
70
if err != nil {
71
return nil, fmt.Errorf("unable to unmarshal bytes to []*config.ScrapeConfig: %w", err)
72
}
73
74
return newTraceProcessor(nextConsumer, oCfg.OperationType, oCfg.PodAssociations, scrapeConfigs)
75
}
76
77