Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/traces/promsdprocessor/prom_sd_processor_test.go
4094 views
1
package promsdprocessor
2
3
import (
4
"context"
5
"net"
6
"testing"
7
8
"github.com/go-kit/log"
9
"github.com/prometheus/common/model"
10
"github.com/prometheus/prometheus/discovery/targetgroup"
11
"github.com/prometheus/prometheus/model/relabel"
12
"github.com/stretchr/testify/assert"
13
"github.com/stretchr/testify/require"
14
"go.opentelemetry.io/collector/client"
15
"go.opentelemetry.io/collector/consumer/consumertest"
16
"go.opentelemetry.io/collector/pdata/pcommon"
17
semconv "go.opentelemetry.io/collector/semconv/v1.6.1"
18
)
19
20
func TestSyncGroups(t *testing.T) {
21
tests := []struct {
22
name string
23
jobToSync string
24
relabelCfgs map[string][]*relabel.Config
25
targets []model.LabelSet
26
expected map[string]model.LabelSet
27
}{
28
{
29
name: "empty",
30
jobToSync: "",
31
relabelCfgs: map[string][]*relabel.Config{},
32
targets: []model.LabelSet{},
33
expected: map[string]model.LabelSet{},
34
},
35
{
36
name: "no relabeling",
37
jobToSync: "job",
38
relabelCfgs: map[string][]*relabel.Config{
39
"job": {},
40
},
41
targets: []model.LabelSet{
42
{
43
"__address__": "127.0.0.1",
44
},
45
},
46
expected: map[string]model.LabelSet{
47
"127.0.0.1": {},
48
},
49
},
50
{
51
name: "strip port",
52
jobToSync: "job",
53
relabelCfgs: map[string][]*relabel.Config{
54
"job": {},
55
},
56
targets: []model.LabelSet{
57
{
58
"__address__": "127.0.0.1:8888",
59
"label": "val",
60
},
61
},
62
expected: map[string]model.LabelSet{
63
"127.0.0.1": {
64
"label": "val",
65
},
66
},
67
},
68
{
69
name: "passthrough",
70
jobToSync: "job",
71
relabelCfgs: map[string][]*relabel.Config{
72
"job": {},
73
},
74
targets: []model.LabelSet{
75
{
76
"__address__": "127.0.0.1",
77
"label": "val",
78
},
79
},
80
expected: map[string]model.LabelSet{
81
"127.0.0.1": {
82
"label": "val",
83
},
84
},
85
},
86
{
87
name: "ignore metadata",
88
jobToSync: "job",
89
relabelCfgs: map[string][]*relabel.Config{
90
"job": {},
91
},
92
targets: []model.LabelSet{
93
{
94
"__address__": "127.0.0.1",
95
"__ignore": "ignore",
96
},
97
},
98
expected: map[string]model.LabelSet{
99
"127.0.0.1": {},
100
},
101
},
102
}
103
104
for _, tc := range tests {
105
t.Run(tc.name, func(t *testing.T) {
106
groups := []*targetgroup.Group{
107
{
108
Targets: tc.targets,
109
},
110
}
111
112
p := &promServiceDiscoProcessor{
113
logger: log.NewNopLogger(),
114
relabelConfigs: tc.relabelCfgs,
115
}
116
117
hostLabels := make(map[string]model.LabelSet)
118
p.syncGroups(tc.jobToSync, groups, hostLabels)
119
120
assert.Equal(t, tc.expected, hostLabels)
121
})
122
}
123
}
124
125
func TestOperationType(t *testing.T) {
126
const (
127
attrKey = "key"
128
attrIP = "1.1.1.1"
129
)
130
testCases := []struct {
131
name string
132
operationType string
133
attributeExists bool
134
newValue string
135
expectedValue string
136
}{
137
{
138
name: "Upsert updates the attribute already exists",
139
operationType: OperationTypeUpsert,
140
attributeExists: true,
141
newValue: "new-value",
142
expectedValue: "new-value",
143
},
144
{
145
name: "Update updates the attribute already exists",
146
operationType: OperationTypeUpdate,
147
attributeExists: true,
148
newValue: "new-value",
149
expectedValue: "new-value",
150
},
151
{
152
name: "Insert does not update the attribute if it's already present",
153
operationType: OperationTypeInsert,
154
attributeExists: true,
155
newValue: "new-value",
156
expectedValue: "old-value",
157
},
158
{
159
name: "Upsert updates the attribute if it isn't present",
160
operationType: OperationTypeUpsert,
161
attributeExists: false,
162
newValue: "new-value",
163
expectedValue: "new-value",
164
},
165
{
166
name: "Update updates the attribute already exists",
167
operationType: OperationTypeUpdate,
168
attributeExists: false,
169
newValue: "new-value",
170
expectedValue: "",
171
},
172
{
173
name: "Insert updates the attribute if it isn't present",
174
operationType: OperationTypeInsert,
175
attributeExists: false,
176
newValue: "new-value",
177
expectedValue: "new-value",
178
},
179
}
180
181
for _, tc := range testCases {
182
t.Run(tc.name, func(t *testing.T) {
183
mockProcessor := new(consumertest.TracesSink)
184
p, err := newTraceProcessor(mockProcessor, tc.operationType, nil, nil)
185
require.NoError(t, err)
186
187
attrMap := pcommon.NewMap()
188
if tc.attributeExists {
189
attrMap.PutStr(attrKey, "old-value")
190
}
191
attrMap.PutStr(semconv.AttributeNetHostIP, attrIP)
192
193
hostLabels := map[string]model.LabelSet{
194
attrIP: {
195
attrKey: model.LabelValue(tc.newValue),
196
},
197
}
198
p.(*promServiceDiscoProcessor).hostLabels = hostLabels
199
p.(*promServiceDiscoProcessor).processAttributes(context.TODO(), attrMap)
200
201
actualAttrValue, _ := attrMap.Get(attrKey)
202
assert.Equal(t, tc.expectedValue, actualAttrValue.Str())
203
})
204
}
205
}
206
207
func TestPodAssociation(t *testing.T) {
208
const ipStr = "1.1.1.1"
209
210
testCases := []struct {
211
name string
212
podAssociations []string
213
ctxFn func(t *testing.T) context.Context
214
attrMapFn func(t *testing.T) pcommon.Map
215
expectedIP string
216
}{
217
{
218
name: "connection IP (HTTP & gRPC)",
219
ctxFn: func(t *testing.T) context.Context {
220
info := client.Info{
221
Addr: &net.IPAddr{
222
IP: net.ParseIP(net.ParseIP(ipStr).String()),
223
},
224
}
225
return client.NewContext(context.Background(), info)
226
},
227
attrMapFn: func(*testing.T) pcommon.Map { return pcommon.NewMap() },
228
expectedIP: ipStr,
229
},
230
{
231
name: "connection IP that includes a port number",
232
ctxFn: func(t *testing.T) context.Context {
233
info := client.Info{
234
Addr: &net.TCPAddr{
235
IP: net.ParseIP(net.ParseIP(ipStr).String()),
236
Port: 1234,
237
},
238
}
239
return client.NewContext(context.Background(), info)
240
},
241
attrMapFn: func(*testing.T) pcommon.Map { return pcommon.NewMap() },
242
expectedIP: ipStr,
243
},
244
{
245
name: "connection IP is empty",
246
podAssociations: []string{podAssociationConnectionIP},
247
ctxFn: func(t *testing.T) context.Context {
248
c := client.FromContext(context.Background())
249
return client.NewContext(context.Background(), c)
250
},
251
attrMapFn: func(*testing.T) pcommon.Map { return pcommon.NewMap() },
252
expectedIP: "",
253
},
254
{
255
name: "ip attribute",
256
ctxFn: func(t *testing.T) context.Context { return context.Background() },
257
attrMapFn: func(*testing.T) pcommon.Map {
258
attrMap := pcommon.NewMap()
259
attrMap.PutStr("ip", ipStr)
260
return attrMap
261
},
262
expectedIP: ipStr,
263
},
264
{
265
name: "net.host.ip attribute",
266
ctxFn: func(t *testing.T) context.Context { return context.Background() },
267
attrMapFn: func(*testing.T) pcommon.Map {
268
attrMap := pcommon.NewMap()
269
attrMap.PutStr(semconv.AttributeNetHostIP, ipStr)
270
return attrMap
271
},
272
expectedIP: ipStr,
273
},
274
{
275
name: "k8s ip attribute",
276
ctxFn: func(t *testing.T) context.Context { return context.Background() },
277
attrMapFn: func(*testing.T) pcommon.Map {
278
attrMap := pcommon.NewMap()
279
attrMap.PutStr("k8s.pod.ip", ipStr)
280
return attrMap
281
},
282
expectedIP: ipStr,
283
},
284
{
285
name: "ip from hostname",
286
ctxFn: func(t *testing.T) context.Context { return context.Background() },
287
attrMapFn: func(*testing.T) pcommon.Map {
288
attrMap := pcommon.NewMap()
289
attrMap.PutStr(semconv.AttributeHostName, ipStr)
290
return attrMap
291
},
292
expectedIP: ipStr,
293
},
294
{
295
name: "uses attr before context (default associations)",
296
ctxFn: func(t *testing.T) context.Context {
297
info := client.Info{
298
Addr: &net.IPAddr{
299
IP: net.ParseIP("2.2.2.2"),
300
},
301
}
302
return client.NewContext(context.Background(), info)
303
},
304
attrMapFn: func(*testing.T) pcommon.Map {
305
attrMap := pcommon.NewMap()
306
attrMap.PutStr(semconv.AttributeNetHostIP, ipStr)
307
return attrMap
308
},
309
expectedIP: ipStr,
310
},
311
{
312
name: "uses attr before hostname (default associations)",
313
ctxFn: func(t *testing.T) context.Context { return context.Background() },
314
attrMapFn: func(*testing.T) pcommon.Map {
315
attrMap := pcommon.NewMap()
316
attrMap.PutStr(semconv.AttributeNetHostIP, ipStr)
317
attrMap.PutStr(semconv.AttributeHostName, "3.3.3.3")
318
return attrMap
319
},
320
expectedIP: ipStr,
321
},
322
{
323
name: "ip attribute but not as pod association",
324
podAssociations: []string{podAssociationk8sIPLabel},
325
ctxFn: func(t *testing.T) context.Context { return context.Background() },
326
attrMapFn: func(*testing.T) pcommon.Map {
327
attrMap := pcommon.NewMap()
328
attrMap.PutStr("ip", ipStr)
329
return attrMap
330
},
331
expectedIP: "",
332
},
333
{
334
name: "uses hostname before attribute (reverse order from default)",
335
podAssociations: []string{podAssociationHostnameLabel, podAssociationOTelIPLabel},
336
ctxFn: func(t *testing.T) context.Context { return context.Background() },
337
attrMapFn: func(*testing.T) pcommon.Map {
338
attrMap := pcommon.NewMap()
339
attrMap.PutStr(semconv.AttributeNetHostIP, "3.3.3.3")
340
attrMap.PutStr(semconv.AttributeHostName, ipStr)
341
return attrMap
342
},
343
expectedIP: ipStr,
344
},
345
}
346
347
for _, tc := range testCases {
348
t.Run(tc.name, func(t *testing.T) {
349
mockProcessor := new(consumertest.TracesSink)
350
p, err := newTraceProcessor(mockProcessor, "", tc.podAssociations, nil)
351
require.NoError(t, err)
352
353
ip := p.(*promServiceDiscoProcessor).getPodIP(tc.ctxFn(t), tc.attrMapFn(t))
354
assert.Equal(t, tc.expectedIP, ip)
355
})
356
}
357
}
358
359