Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/relabel/relabel_test.go
4096 views
1
package relabel
2
3
import (
4
"context"
5
"fmt"
6
"testing"
7
"time"
8
9
"github.com/grafana/agent/component"
10
"github.com/grafana/agent/component/common/loki"
11
flow_relabel "github.com/grafana/agent/component/common/relabel"
12
"github.com/grafana/agent/pkg/flow/componenttest"
13
"github.com/grafana/agent/pkg/river"
14
"github.com/grafana/agent/pkg/util"
15
"github.com/grafana/loki/pkg/logproto"
16
"github.com/prometheus/client_golang/prometheus"
17
"github.com/prometheus/common/model"
18
"github.com/prometheus/prometheus/model/relabel"
19
20
"github.com/stretchr/testify/require"
21
)
22
23
// Rename the kubernetes_(.*) labels without the suffix and remove them,
24
// then set the `environment` label to the value of the namespace.
25
var rc = `rule {
26
regex = "kubernetes_(.*)"
27
replacement = "$1"
28
action = "labelmap"
29
}
30
rule {
31
regex = "kubernetes_(.*)"
32
action = "labeldrop"
33
}
34
rule {
35
source_labels = ["namespace"]
36
target_label = "environment"
37
action = "replace"
38
}`
39
40
func TestRelabeling(t *testing.T) {
41
// Unmarshal the River relabel rules into a custom struct, as we don't have
42
// an easy way to refer to a loki.LogsReceiver value for the forward_to
43
// argument.
44
type cfg struct {
45
Rcs []*flow_relabel.Config `river:"rule,block,optional"`
46
}
47
var relabelConfigs cfg
48
err := river.Unmarshal([]byte(rc), &relabelConfigs)
49
require.NoError(t, err)
50
51
ch1, ch2 := make(loki.LogsReceiver), make(loki.LogsReceiver)
52
53
// Create and run the component, so that it relabels and forwards logs.
54
opts := component.Options{
55
Logger: util.TestFlowLogger(t),
56
Registerer: prometheus.NewRegistry(),
57
OnStateChange: func(e component.Exports) {},
58
}
59
args := Arguments{
60
ForwardTo: []loki.LogsReceiver{ch1, ch2},
61
RelabelConfigs: relabelConfigs.Rcs,
62
MaxCacheSize: 10,
63
}
64
65
c, err := New(opts, args)
66
require.NoError(t, err)
67
go c.Run(context.Background())
68
69
// Send a log entry to the component's receiver.
70
logEntry := loki.Entry{
71
Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "kubernetes_namespace": "dev", "kubernetes_pod_name": "agent", "foo": "bar"},
72
Entry: logproto.Entry{
73
Timestamp: time.Now(),
74
Line: "very important log",
75
},
76
}
77
78
c.receiver <- logEntry
79
80
wantLabelSet := model.LabelSet{
81
"filename": "/var/log/pods/agent/agent/1.log",
82
"namespace": "dev",
83
"pod_name": "agent",
84
"environment": "dev",
85
"foo": "bar",
86
}
87
88
// The log entry should be received in both channels, with the relabeling
89
// rules correctly applied.
90
for i := 0; i < 2; i++ {
91
select {
92
case logEntry := <-ch1:
93
require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)
94
require.Equal(t, "very important log", logEntry.Line)
95
require.Equal(t, wantLabelSet, logEntry.Labels)
96
case logEntry := <-ch2:
97
require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)
98
require.Equal(t, "very important log", logEntry.Line)
99
require.Equal(t, wantLabelSet, logEntry.Labels)
100
case <-time.After(5 * time.Second):
101
require.FailNow(t, "failed waiting for log line")
102
}
103
}
104
}
105
106
func BenchmarkRelabelComponent(b *testing.B) {
107
type cfg struct {
108
Rcs []*flow_relabel.Config `river:"rule,block,optional"`
109
}
110
var relabelConfigs cfg
111
_ = river.Unmarshal([]byte(rc), &relabelConfigs)
112
ch1 := make(loki.LogsReceiver)
113
114
// Create and run the component, so that it relabels and forwards logs.
115
opts := component.Options{
116
Logger: util.TestFlowLogger(b),
117
Registerer: prometheus.NewRegistry(),
118
OnStateChange: func(e component.Exports) {},
119
}
120
args := Arguments{
121
ForwardTo: []loki.LogsReceiver{ch1},
122
RelabelConfigs: relabelConfigs.Rcs,
123
MaxCacheSize: 500_000,
124
}
125
126
c, _ := New(opts, args)
127
ctx, cancel := context.WithCancel(context.Background())
128
go c.Run(ctx)
129
130
var entry loki.Entry
131
go func() {
132
for e := range ch1 {
133
entry = e
134
}
135
}()
136
137
now := time.Now()
138
for i := 0; i < b.N; i++ {
139
c.receiver <- loki.Entry{
140
Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/%d.log", "kubernetes_namespace": "dev", "kubernetes_pod_name": model.LabelValue(fmt.Sprintf("agent-%d", i)), "foo": "bar"},
141
Entry: logproto.Entry{
142
Timestamp: now,
143
Line: "very important log",
144
},
145
}
146
}
147
148
_ = entry
149
cancel()
150
}
151
152
func TestCache(t *testing.T) {
153
type cfg struct {
154
Rcs []*flow_relabel.Config `river:"rule,block,optional"`
155
}
156
var relabelConfigs cfg
157
err := river.Unmarshal([]byte(rc), &relabelConfigs)
158
require.NoError(t, err)
159
160
ch1 := make(loki.LogsReceiver)
161
162
// Create and run the component, so that it relabels and forwards logs.
163
opts := component.Options{
164
Logger: util.TestFlowLogger(t),
165
Registerer: prometheus.NewRegistry(),
166
OnStateChange: func(e component.Exports) {},
167
}
168
args := Arguments{
169
ForwardTo: []loki.LogsReceiver{ch1},
170
RelabelConfigs: []*flow_relabel.Config{
171
{
172
SourceLabels: []string{"name", "A"},
173
Regex: flow_relabel.Regexp(relabel.MustNewRegexp("(.+)")),
174
175
Action: "replace",
176
TargetLabel: "env",
177
Replacement: "staging",
178
}},
179
MaxCacheSize: 4,
180
}
181
182
c, err := New(opts, args)
183
require.NoError(t, err)
184
go c.Run(context.Background())
185
186
go func() {
187
for e := range ch1 {
188
require.Equal(t, "very important log", e.Line)
189
}
190
}()
191
192
e := getEntry()
193
194
lsets := []model.LabelSet{
195
{"name": "foo"},
196
{"name": "bar"},
197
{"name": "baz"},
198
{"name": "qux"},
199
{"name": "xyz"},
200
}
201
rlsets := []model.LabelSet{
202
{"env": "staging", "name": "foo"},
203
{"env": "staging", "name": "bar"},
204
{"env": "staging", "name": "baz"},
205
{"env": "staging", "name": "qux"},
206
{"env": "staging", "name": "xyz"},
207
}
208
// Send three entries with different label sets along the receiver.
209
e.Labels = lsets[0]
210
c.receiver <- e
211
e.Labels = lsets[1]
212
c.receiver <- e
213
e.Labels = lsets[2]
214
c.receiver <- e
215
216
time.Sleep(100 * time.Millisecond)
217
// Let's look into the cache's structure now!
218
// The cache should have stored each label set by its fingerprint.
219
for i := 0; i < 3; i++ {
220
val, ok := c.cache.Get(lsets[i].Fingerprint())
221
require.True(t, ok)
222
cached, ok := val.([]cacheItem)
223
require.True(t, ok)
224
225
// Each cache value should be a 1-item slice, with the correct initial
226
// and relabeled values applied to it.
227
require.Len(t, cached, 1)
228
require.Equal(t, cached[0].original, lsets[i])
229
require.Equal(t, cached[0].relabeled, rlsets[i])
230
}
231
232
// Let's send over an entry we've seen before.
233
// We should've hit the cached path, with no changes to the cache's length
234
// or the underlying stored value.
235
e.Labels = lsets[0]
236
c.receiver <- e
237
require.Equal(t, c.cache.Len(), 3)
238
val, _ := c.cache.Get(lsets[0].Fingerprint())
239
cachedVal := val.([]cacheItem)
240
require.Len(t, cachedVal, 1)
241
require.Equal(t, cachedVal[0].original, lsets[0])
242
require.Equal(t, cachedVal[0].relabeled, rlsets[0])
243
244
// Now, let's try to hit a collision.
245
// These LabelSets are known to collide (string: 8746e5b6c5f0fb60)
246
// https://github.com/pstibrany/fnv-1a-64bit-collisions
247
ls1 := model.LabelSet{"A": "K6sjsNNczPl"}
248
ls2 := model.LabelSet{"A": "cswpLMIZpwt"}
249
envls := model.LabelSet{"env": "staging"}
250
require.Equal(t, ls1.Fingerprint(), ls2.Fingerprint(), "expected labelset fingerprints to collide; have we changed the hashing algorithm?")
251
252
e.Labels = ls1
253
c.receiver <- e
254
255
e.Labels = ls2
256
c.receiver <- e
257
258
time.Sleep(100 * time.Millisecond)
259
// Both of these should be under a single, new cache key which will contain
260
// both entries.
261
require.Equal(t, c.cache.Len(), 4)
262
val, ok := c.cache.Get(ls1.Fingerprint())
263
require.True(t, ok)
264
cachedVal = val.([]cacheItem)
265
require.Len(t, cachedVal, 2)
266
267
require.Equal(t, cachedVal[0].original, ls1)
268
require.Equal(t, cachedVal[1].original, ls2)
269
require.Equal(t, cachedVal[0].relabeled, ls1.Merge(envls))
270
require.Equal(t, cachedVal[1].relabeled, ls2.Merge(envls))
271
272
// Finally, send two more entries, which should fill up the cache and evict
273
// the Least Recently Used items (lsets[1], and lsets[2]).
274
e.Labels = lsets[3]
275
c.receiver <- e
276
e.Labels = lsets[4]
277
c.receiver <- e
278
279
require.Equal(t, c.cache.Len(), 4)
280
wantKeys := []model.Fingerprint{lsets[0].Fingerprint(), ls1.Fingerprint(), lsets[3].Fingerprint(), lsets[4].Fingerprint()}
281
for i, k := range c.cache.Keys() { // Returns the cache keys in LRU order.
282
f, ok := k.(model.Fingerprint)
283
require.True(t, ok)
284
require.Equal(t, f, wantKeys[i])
285
}
286
}
287
288
func TestRuleGetter(t *testing.T) {
289
// Set up the component Arguments.
290
originalCfg := `rule {
291
action = "keep"
292
source_labels = ["__name__"]
293
regex = "up"
294
}
295
forward_to = []`
296
var args Arguments
297
require.NoError(t, river.Unmarshal([]byte(originalCfg), &args))
298
299
// Set up and start the component.
300
tc, err := componenttest.NewControllerFromID(util.TestLogger(t), "loki.relabel")
301
require.NoError(t, err)
302
go func() {
303
err = tc.Run(componenttest.TestContext(t), args)
304
require.NoError(t, err)
305
}()
306
require.NoError(t, tc.WaitExports(time.Second))
307
308
// Use the getter to retrieve the original relabeling rules.
309
exports := tc.Exports().(Exports)
310
gotOriginal := exports.Rules
311
312
// Update the component with new relabeling rules and retrieve them.
313
updatedCfg := `rule {
314
action = "drop"
315
source_labels = ["__name__"]
316
regex = "up"
317
}
318
forward_to = []`
319
require.NoError(t, river.Unmarshal([]byte(updatedCfg), &args))
320
321
require.NoError(t, tc.Update(args))
322
exports = tc.Exports().(Exports)
323
gotUpdated := exports.Rules
324
325
require.NotEqual(t, gotOriginal, gotUpdated)
326
require.Len(t, gotOriginal, 1)
327
require.Len(t, gotUpdated, 1)
328
329
require.Equal(t, gotOriginal[0].Action, flow_relabel.Keep)
330
require.Equal(t, gotUpdated[0].Action, flow_relabel.Drop)
331
require.Equal(t, gotUpdated[0].SourceLabels, gotOriginal[0].SourceLabels)
332
require.Equal(t, gotUpdated[0].Regex, gotOriginal[0].Regex)
333
}
334
335
func getEntry() loki.Entry {
336
return loki.Entry{
337
Labels: model.LabelSet{},
338
Entry: logproto.Entry{
339
Timestamp: time.Now(),
340
Line: "very important log",
341
},
342
}
343
}
344
345