Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/process/process_test.go
4096 views
1
package process
2
3
import (
4
"context"
5
"testing"
6
"time"
7
8
"github.com/grafana/agent/component"
9
"github.com/grafana/agent/component/common/loki"
10
"github.com/grafana/agent/component/loki/process/internal/stages"
11
"github.com/grafana/agent/pkg/river"
12
"github.com/grafana/agent/pkg/util"
13
"github.com/grafana/loki/pkg/logproto"
14
"github.com/prometheus/client_golang/prometheus"
15
"github.com/prometheus/common/model"
16
"github.com/stretchr/testify/require"
17
"go.uber.org/goleak"
18
)
19
20
func TestJSONLabelsStage(t *testing.T) {
21
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))
22
23
// The following stages will attempt to parse input lines as JSON.
24
// The first stage _extract_ any fields found with the correct names:
25
// Since 'source' is empty, it implies that we want to parse the log line
26
// itself.
27
// log --> output
28
// stream --> stream
29
// time --> timestamp
30
// extra --> extra
31
//
32
// The second stage will parse the 'extra' field as JSON, and extract the
33
// 'user' field from the 'extra' field. If the expression value field is
34
// empty, it is inferred we want to use the same name as the key.
35
// user --> extra.user
36
//
37
// The third stage will set some labels from the extracted values above.
38
// Again, if the value is empty, it is inferred that we want to use the
39
// populate the label with extracted value of the same name.
40
stg := `stage.json {
41
expressions = {"output" = "log", stream = "stream", timestamp = "time", "extra" = "" }
42
drop_malformed = true
43
}
44
stage.json {
45
expressions = { "user" = "" }
46
source = "extra"
47
}
48
stage.labels {
49
values = {
50
stream = "",
51
user = "",
52
ts = "timestamp",
53
}
54
}`
55
56
// Unmarshal the River relabel rules into a custom struct, as we don't have
57
// an easy way to refer to a loki.LogsReceiver value for the forward_to
58
// argument.
59
type cfg struct {
60
Stages []stages.StageConfig `river:"stage,enum"`
61
}
62
var stagesCfg cfg
63
err := river.Unmarshal([]byte(stg), &stagesCfg)
64
require.NoError(t, err)
65
66
ch1, ch2 := make(loki.LogsReceiver), make(loki.LogsReceiver)
67
68
// Create and run the component, so that it can process and forwards logs.
69
opts := component.Options{
70
Logger: util.TestFlowLogger(t),
71
Registerer: prometheus.NewRegistry(),
72
OnStateChange: func(e component.Exports) {},
73
}
74
args := Arguments{
75
ForwardTo: []loki.LogsReceiver{ch1, ch2},
76
Stages: stagesCfg.Stages,
77
}
78
79
c, err := New(opts, args)
80
require.NoError(t, err)
81
ctx, cancel := context.WithCancel(context.Background())
82
defer cancel()
83
go c.Run(ctx)
84
85
// Send a log entry to the component's receiver.
86
ts := time.Now()
87
logline := `{"log":"log message\n","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z","extra":"{\"user\":\"smith\"}"}`
88
logEntry := loki.Entry{
89
Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "foo": "bar"},
90
Entry: logproto.Entry{
91
Timestamp: ts,
92
Line: logline,
93
},
94
}
95
96
c.receiver <- logEntry
97
98
wantLabelSet := model.LabelSet{
99
"filename": "/var/log/pods/agent/agent/1.log",
100
"foo": "bar",
101
"stream": "stderr",
102
"ts": "2019-04-30T02:12:41.8443515Z",
103
"user": "smith",
104
}
105
106
// The log entry should be received in both channels, with the processing
107
// stages correctly applied.
108
for i := 0; i < 2; i++ {
109
select {
110
case logEntry := <-ch1:
111
require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)
112
require.Equal(t, logline, logEntry.Line)
113
require.Equal(t, wantLabelSet, logEntry.Labels)
114
case logEntry := <-ch2:
115
require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)
116
require.Equal(t, logline, logEntry.Line)
117
require.Equal(t, wantLabelSet, logEntry.Labels)
118
case <-time.After(5 * time.Second):
119
require.FailNow(t, "failed waiting for log line")
120
}
121
}
122
}
123
124
func TestStaticLabelsLabelAllowLabelDrop(t *testing.T) {
125
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))
126
127
// The following stages manipulate the label set of a log entry.
128
// The first stage will define a static set of labels (foo, bar, baz, qux)
129
// to add to the entry along the `filename` and `dev` labels.
130
// The second stage will drop the foo and bar labels.
131
// The third stage will keep only a subset of the remaining labels.
132
stg := `
133
stage.static_labels {
134
values = { "foo" = "fooval", "bar" = "barval", "baz" = "bazval", "qux" = "quxval" }
135
}
136
stage.label_drop {
137
values = [ "foo", "bar" ]
138
}
139
stage.label_keep {
140
values = [ "foo", "baz", "filename" ]
141
}`
142
143
// Unmarshal the River relabel rules into a custom struct, as we don't have
144
// an easy way to refer to a loki.LogsReceiver value for the forward_to
145
// argument.
146
type cfg struct {
147
Stages []stages.StageConfig `river:"stage,enum"`
148
}
149
var stagesCfg cfg
150
err := river.Unmarshal([]byte(stg), &stagesCfg)
151
require.NoError(t, err)
152
153
ch1, ch2 := make(loki.LogsReceiver), make(loki.LogsReceiver)
154
155
// Create and run the component, so that it can process and forwards logs.
156
opts := component.Options{
157
Logger: util.TestFlowLogger(t),
158
Registerer: prometheus.NewRegistry(),
159
OnStateChange: func(e component.Exports) {},
160
}
161
args := Arguments{
162
ForwardTo: []loki.LogsReceiver{ch1, ch2},
163
Stages: stagesCfg.Stages,
164
}
165
166
c, err := New(opts, args)
167
require.NoError(t, err)
168
ctx, cancel := context.WithCancel(context.Background())
169
defer cancel()
170
go c.Run(ctx)
171
172
// Send a log entry to the component's receiver.
173
ts := time.Now()
174
logline := `{"log":"log message\n","stream":"stderr","time":"2022-01-09T08:37:45.8233626Z"}`
175
logEntry := loki.Entry{
176
Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "env": "dev"},
177
Entry: logproto.Entry{
178
Timestamp: ts,
179
Line: logline,
180
},
181
}
182
183
c.receiver <- logEntry
184
185
wantLabelSet := model.LabelSet{
186
"filename": "/var/log/pods/agent/agent/1.log",
187
"baz": "bazval",
188
}
189
190
// The log entry should be received in both channels, with the processing
191
// stages correctly applied.
192
for i := 0; i < 2; i++ {
193
select {
194
case logEntry := <-ch1:
195
require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)
196
require.Equal(t, logline, logEntry.Line)
197
require.Equal(t, wantLabelSet, logEntry.Labels)
198
case logEntry := <-ch2:
199
require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)
200
require.Equal(t, logline, logEntry.Line)
201
require.Equal(t, wantLabelSet, logEntry.Labels)
202
case <-time.After(5 * time.Second):
203
require.FailNow(t, "failed waiting for log line")
204
}
205
}
206
}
207
208
func TestRegexTimestampOutput(t *testing.T) {
209
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))
210
211
// The first stage will attempt to parse the input line using a regular
212
// expression with named capture groups. The three capture groups (time,
213
// stream and content) will be extracted in the shared map of values.
214
// Since 'source' is empty, it implies that we want to parse the log line
215
// itself.
216
//
217
// The second stage will parse the extracted `time` value as Unix epoch
218
// time and set it to the log entry timestamp.
219
//
220
// The third stage will set the `content` value as the message value.
221
//
222
// The fourth and final stage will set the `stream` value as the label.
223
stg := `
224
stage.regex {
225
expression = "^(?s)(?P<time>\\S+?) (?P<stream>stdout|stderr) (?P<content>.*)$"
226
}
227
stage.timestamp {
228
source = "time"
229
format = "RFC3339"
230
}
231
stage.output {
232
source = "content"
233
}
234
stage.labels {
235
values = { src = "stream" }
236
}`
237
238
// Unmarshal the River relabel rules into a custom struct, as we don't have
239
// an easy way to refer to a loki.LogsReceiver value for the forward_to
240
// argument.
241
type cfg struct {
242
Stages []stages.StageConfig `river:"stage,enum"`
243
}
244
var stagesCfg cfg
245
err := river.Unmarshal([]byte(stg), &stagesCfg)
246
require.NoError(t, err)
247
248
ch1, ch2 := make(loki.LogsReceiver), make(loki.LogsReceiver)
249
250
// Create and run the component, so that it can process and forwards logs.
251
opts := component.Options{
252
Logger: util.TestFlowLogger(t),
253
Registerer: prometheus.NewRegistry(),
254
OnStateChange: func(e component.Exports) {},
255
}
256
args := Arguments{
257
ForwardTo: []loki.LogsReceiver{ch1, ch2},
258
Stages: stagesCfg.Stages,
259
}
260
261
c, err := New(opts, args)
262
require.NoError(t, err)
263
ctx, cancel := context.WithCancel(context.Background())
264
defer cancel()
265
go c.Run(ctx)
266
267
// Send a log entry to the component's receiver.
268
ts := time.Now()
269
logline := `2022-01-17T08:17:42-07:00 stderr somewhere, somehow, an error occurred`
270
logEntry := loki.Entry{
271
Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "foo": "bar"},
272
Entry: logproto.Entry{
273
Timestamp: ts,
274
Line: logline,
275
},
276
}
277
278
c.receiver <- logEntry
279
280
wantLabelSet := model.LabelSet{
281
"filename": "/var/log/pods/agent/agent/1.log",
282
"foo": "bar",
283
"src": "stderr",
284
}
285
wantTimestamp, err := time.Parse(time.RFC3339, "2022-01-17T08:17:42-07:00")
286
wantLogline := `somewhere, somehow, an error occurred`
287
require.NoError(t, err)
288
289
// The log entry should be received in both channels, with the processing
290
// stages correctly applied.
291
for i := 0; i < 2; i++ {
292
select {
293
case logEntry := <-ch1:
294
require.Equal(t, wantLogline, logEntry.Line)
295
require.Equal(t, wantTimestamp, logEntry.Timestamp)
296
require.Equal(t, wantLabelSet, logEntry.Labels)
297
case logEntry := <-ch2:
298
require.Equal(t, wantLogline, logEntry.Line)
299
require.Equal(t, wantTimestamp, logEntry.Timestamp)
300
require.Equal(t, wantLabelSet, logEntry.Labels)
301
case <-time.After(5 * time.Second):
302
require.FailNow(t, "failed waiting for log line")
303
}
304
}
305
}
306
307