Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/traces/promsdprocessor/prom_sd_processor.go
4094 views
1
package promsdprocessor
2
3
import (
4
"context"
5
"fmt"
6
"net"
7
"strings"
8
"sync"
9
10
util "github.com/cortexproject/cortex/pkg/util/log"
11
"github.com/go-kit/log"
12
"github.com/go-kit/log/level"
13
"github.com/prometheus/common/model"
14
"github.com/prometheus/prometheus/config"
15
"github.com/prometheus/prometheus/discovery"
16
"github.com/prometheus/prometheus/discovery/targetgroup"
17
"github.com/prometheus/prometheus/model/labels"
18
"github.com/prometheus/prometheus/model/relabel"
19
"go.opentelemetry.io/collector/client"
20
"go.opentelemetry.io/collector/component"
21
"go.opentelemetry.io/collector/consumer"
22
"go.opentelemetry.io/collector/pdata/pcommon"
23
"go.opentelemetry.io/collector/pdata/ptrace"
24
semconv "go.opentelemetry.io/collector/semconv/v1.6.1"
25
)
26
27
type promServiceDiscoProcessor struct {
28
nextConsumer consumer.Traces
29
discoveryMgr *discovery.Manager
30
discoveryMgrStop context.CancelFunc
31
discoveryMgrCtx context.Context
32
33
relabelConfigs map[string][]*relabel.Config
34
hostLabels map[string]model.LabelSet
35
mtx sync.Mutex
36
37
operationType string
38
podAssociations []string
39
40
logger log.Logger
41
}
42
43
func newTraceProcessor(nextConsumer consumer.Traces, operationType string, podAssociations []string, scrapeConfigs []*config.ScrapeConfig) (component.TracesProcessor, error) {
44
ctx, cancel := context.WithCancel(context.Background())
45
46
logger := log.With(util.Logger, "component", "traces service disco")
47
mgr := discovery.NewManager(ctx, logger, discovery.Name("traces service disco"))
48
49
relabelConfigs := map[string][]*relabel.Config{}
50
managerConfig := map[string]discovery.Configs{}
51
for _, v := range scrapeConfigs {
52
managerConfig[v.JobName] = v.ServiceDiscoveryConfigs
53
relabelConfigs[v.JobName] = v.RelabelConfigs
54
}
55
56
err := mgr.ApplyConfig(managerConfig)
57
if err != nil {
58
cancel()
59
return nil, err
60
}
61
62
switch operationType {
63
case OperationTypeUpsert, OperationTypeInsert, OperationTypeUpdate:
64
case "": // Use Upsert by default
65
operationType = OperationTypeUpsert
66
default:
67
cancel()
68
return nil, fmt.Errorf("unknown operation type %s", operationType)
69
}
70
71
for _, podAssociation := range podAssociations {
72
switch podAssociation {
73
case podAssociationIPLabel, podAssociationOTelIPLabel, podAssociationk8sIPLabel, podAssociationHostnameLabel, podAssociationConnectionIP:
74
default:
75
cancel()
76
return nil, fmt.Errorf("unknown pod association %s", podAssociation)
77
}
78
}
79
80
if len(podAssociations) == 0 {
81
podAssociations = []string{podAssociationIPLabel, podAssociationOTelIPLabel, podAssociationk8sIPLabel, podAssociationHostnameLabel, podAssociationConnectionIP}
82
}
83
84
if nextConsumer == nil {
85
cancel()
86
return nil, component.ErrNilNextConsumer
87
}
88
return &promServiceDiscoProcessor{
89
nextConsumer: nextConsumer,
90
discoveryMgr: mgr,
91
discoveryMgrStop: cancel,
92
discoveryMgrCtx: ctx,
93
relabelConfigs: relabelConfigs,
94
hostLabels: make(map[string]model.LabelSet),
95
logger: logger,
96
operationType: operationType,
97
podAssociations: podAssociations,
98
}, nil
99
}
100
101
func (p *promServiceDiscoProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
102
rss := td.ResourceSpans()
103
for i := 0; i < rss.Len(); i++ {
104
rs := rss.At(i)
105
106
p.processAttributes(ctx, rs.Resource().Attributes())
107
}
108
109
return p.nextConsumer.ConsumeTraces(ctx, td)
110
}
111
112
func stringAttributeFromMap(attrs pcommon.Map, key string) string {
113
if attr, ok := attrs.Get(key); ok {
114
if attr.Type() == pcommon.ValueTypeStr {
115
return attr.Str()
116
}
117
}
118
return ""
119
}
120
121
func (p *promServiceDiscoProcessor) getConnectionIP(ctx context.Context) string {
122
c := client.FromContext(ctx)
123
if c.Addr == nil {
124
return ""
125
}
126
127
host := c.Addr.String()
128
if strings.Contains(host, ":") {
129
var err error
130
splitHost, _, err := net.SplitHostPort(host)
131
if err != nil {
132
// It's normal for this to fail for IPv6 address strings that don't actually include a port.
133
level.Debug(p.logger).Log("msg", "unable to split connection host and port", "host", host, "err", err)
134
} else {
135
host = splitHost
136
}
137
}
138
139
return host
140
}
141
142
func (p *promServiceDiscoProcessor) getPodIP(ctx context.Context, attrs pcommon.Map) string {
143
for _, podAssociation := range p.podAssociations {
144
switch podAssociation {
145
case podAssociationIPLabel, podAssociationOTelIPLabel, podAssociationk8sIPLabel:
146
ip := stringAttributeFromMap(attrs, podAssociation)
147
if ip != "" {
148
return ip
149
}
150
case podAssociationHostnameLabel:
151
hostname := stringAttributeFromMap(attrs, semconv.AttributeHostName)
152
if net.ParseIP(hostname) != nil {
153
return hostname
154
}
155
case podAssociationConnectionIP:
156
ip := p.getConnectionIP(ctx)
157
if ip != "" {
158
return ip
159
}
160
}
161
}
162
return ""
163
}
164
165
func (p *promServiceDiscoProcessor) processAttributes(ctx context.Context, attrs pcommon.Map) {
166
ip := p.getPodIP(ctx, attrs)
167
// have to have an ip for labels lookup
168
if ip == "" {
169
level.Debug(p.logger).Log("msg", "unable to find ip in span attributes, skipping attribute addition")
170
return
171
}
172
173
p.mtx.Lock()
174
defer p.mtx.Unlock()
175
176
labels, ok := p.hostLabels[ip]
177
if !ok {
178
level.Debug(p.logger).Log("msg", "unable to find matching hostLabels", "ip", ip)
179
return
180
}
181
182
for k, v := range labels {
183
switch p.operationType {
184
case OperationTypeUpsert:
185
attrs.PutStr(string(k), string(v))
186
case OperationTypeInsert:
187
if _, ok := attrs.Get(string(k)); !ok {
188
attrs.PutStr(string(k), string(v))
189
}
190
case OperationTypeUpdate:
191
if toVal, ok := attrs.Get(string(k)); ok {
192
toVal.SetStr(string(v))
193
}
194
}
195
}
196
}
197
198
func (p *promServiceDiscoProcessor) Capabilities() consumer.Capabilities {
199
return consumer.Capabilities{MutatesData: true}
200
}
201
202
// Start is invoked during service startup.
203
func (p *promServiceDiscoProcessor) Start(_ context.Context, _ component.Host) error {
204
go p.watchServiceDiscovery()
205
206
go func() {
207
err := p.discoveryMgr.Run()
208
if err != nil && err != context.Canceled {
209
level.Error(p.logger).Log("msg", "failed to start prom svc disco. relabeling disabled", "err", err)
210
}
211
}()
212
213
return nil
214
}
215
216
// Shutdown is invoked during service shutdown.
217
func (p *promServiceDiscoProcessor) Shutdown(context.Context) error {
218
if p.discoveryMgrStop != nil {
219
p.discoveryMgrStop()
220
}
221
return nil
222
}
223
224
func (p *promServiceDiscoProcessor) watchServiceDiscovery() {
225
for {
226
// p.discoveryMgr.SyncCh() is never closed so we need to watch the context as well to properly exit this goroutine
227
select {
228
case targetGroups := <-p.discoveryMgr.SyncCh():
229
hostLabels := make(map[string]model.LabelSet)
230
level.Debug(p.logger).Log("msg", "syncing target groups", "count", len(targetGroups))
231
for jobName, groups := range targetGroups {
232
p.syncGroups(jobName, groups, hostLabels)
233
}
234
p.mtx.Lock()
235
p.hostLabels = hostLabels
236
p.mtx.Unlock()
237
case <-p.discoveryMgrCtx.Done():
238
return
239
}
240
}
241
}
242
243
func (p *promServiceDiscoProcessor) syncGroups(jobName string, groups []*targetgroup.Group, hostLabels map[string]model.LabelSet) {
244
level.Debug(p.logger).Log("msg", "syncing target group", "jobName", jobName)
245
for _, g := range groups {
246
p.syncTargets(jobName, g, hostLabels)
247
}
248
}
249
250
func (p *promServiceDiscoProcessor) syncTargets(jobName string, group *targetgroup.Group, hostLabels map[string]model.LabelSet) {
251
level.Debug(p.logger).Log("msg", "syncing targets", "count", len(group.Targets))
252
253
relabelConfig := p.relabelConfigs[jobName]
254
if relabelConfig == nil {
255
level.Warn(p.logger).Log("msg", "relabel config not found for job. skipping labeling", "jobName", jobName)
256
return
257
}
258
259
for _, t := range group.Targets {
260
discoveredLabels := group.Labels.Merge(t)
261
262
level.Debug(p.logger).Log("discoveredLabels", discoveredLabels)
263
var labelMap = make(map[string]string)
264
for k, v := range discoveredLabels.Clone() {
265
labelMap[string(k)] = string(v)
266
}
267
processedLabels, keep := relabel.Process(labels.FromMap(labelMap), relabelConfig...)
268
level.Debug(p.logger).Log("processedLabels", processedLabels)
269
if !keep {
270
continue
271
}
272
273
var labels = make(model.LabelSet)
274
for k, v := range processedLabels.Map() {
275
labels[model.LabelName(k)] = model.LabelValue(v)
276
}
277
278
address, ok := labels[model.AddressLabel]
279
if !ok {
280
level.Warn(p.logger).Log("msg", "ignoring target, unable to find address", "labels", labels.String())
281
continue
282
}
283
284
host := string(address)
285
if strings.Contains(host, ":") {
286
var err error
287
host, _, err = net.SplitHostPort(host)
288
if err != nil {
289
level.Warn(p.logger).Log("msg", "unable to split host port", "address", address, "err", err)
290
continue
291
}
292
}
293
294
for k := range labels {
295
if strings.HasPrefix(string(k), "__") {
296
delete(labels, k)
297
}
298
}
299
300
level.Debug(p.logger).Log("msg", "adding host to hostLabels", "host", host)
301
hostLabels[host] = labels
302
}
303
}
304
305