Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/journal/internal/target/journaltarget_test.go
4096 views
1
//go:build linux && cgo && promtail_journal_enabled
2
3
package target
4
5
// This code is copied from Promtail with minor edits. The target package is used to
6
// configure and run the targets that can read journal entries and forward them
7
// to other loki components.
8
9
import (
10
"io"
11
"os"
12
"strings"
13
"testing"
14
"time"
15
16
"github.com/grafana/agent/component/common/loki/client/fake"
17
18
"github.com/coreos/go-systemd/sdjournal"
19
"github.com/go-kit/log"
20
"github.com/grafana/agent/component/common/loki/positions"
21
"github.com/prometheus/client_golang/prometheus"
22
"github.com/prometheus/client_golang/prometheus/testutil"
23
"github.com/prometheus/prometheus/model/relabel"
24
"github.com/stretchr/testify/assert"
25
"github.com/stretchr/testify/require"
26
"gopkg.in/yaml.v2"
27
28
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
29
"github.com/grafana/loki/clients/pkg/promtail/targets/testutils"
30
)
31
32
type mockJournalReader struct {
33
config sdjournal.JournalReaderConfig
34
t *testing.T
35
}
36
37
func newMockJournalReader(c sdjournal.JournalReaderConfig) (journalReader, error) {
38
return &mockJournalReader{config: c}, nil
39
}
40
41
func (r *mockJournalReader) Close() error {
42
return nil
43
}
44
45
func (r *mockJournalReader) Follow(until <-chan time.Time, writer io.Writer) error {
46
<-until
47
return nil
48
}
49
50
func newMockJournalEntry(entry *sdjournal.JournalEntry) journalEntryFunc {
51
return func(c sdjournal.JournalReaderConfig, cursor string) (*sdjournal.JournalEntry, error) {
52
return entry, nil
53
}
54
}
55
56
func (r *mockJournalReader) Write(fields map[string]string) {
57
allFields := make(map[string]string, len(fields))
58
for k, v := range fields {
59
allFields[k] = v
60
}
61
62
ts := uint64(time.Now().UnixNano())
63
64
_, err := r.config.Formatter(&sdjournal.JournalEntry{
65
Fields: allFields,
66
MonotonicTimestamp: ts,
67
RealtimeTimestamp: ts,
68
})
69
assert.NoError(r.t, err)
70
}
71
72
func TestJournalTarget(t *testing.T) {
73
w := log.NewSyncWriter(os.Stderr)
74
logger := log.NewLogfmtLogger(w)
75
76
testutils.InitRandom()
77
dirName := "/tmp/" + testutils.RandName()
78
positionsFileName := dirName + "/positions.yml"
79
80
// Set the sync period to a really long value, to guarantee the sync timer
81
// never runs, this way we know everything saved was done through channel
82
// notifications when target.stop() was called.
83
ps, err := positions.New(logger, positions.Config{
84
SyncPeriod: 10 * time.Second,
85
PositionsFile: positionsFileName,
86
})
87
if err != nil {
88
t.Fatal(err)
89
}
90
91
client := fake.NewClient(func() {})
92
93
relabelCfg := `
94
- source_labels: ['__journal_code_file']
95
regex: 'journaltarget_test\.go'
96
action: 'keep'
97
- source_labels: ['__journal_code_file']
98
target_label: 'code_file'`
99
100
var relabels []*relabel.Config
101
err = yaml.Unmarshal([]byte(relabelCfg), &relabels)
102
require.NoError(t, err)
103
104
registry := prometheus.NewRegistry()
105
jt, err := journalTargetWithReader(NewMetrics(registry), logger, client, ps, "test", relabels,
106
&scrapeconfig.JournalTargetConfig{}, newMockJournalReader, newMockJournalEntry(nil))
107
require.NoError(t, err)
108
109
r := jt.r.(*mockJournalReader)
110
r.t = t
111
112
for i := 0; i < 10; i++ {
113
r.Write(map[string]string{
114
"MESSAGE": "ping",
115
"CODE_FILE": "journaltarget_test.go",
116
})
117
assert.NoError(t, err)
118
}
119
require.NoError(t, jt.Stop())
120
client.Stop()
121
122
expectedMetrics := `# HELP loki_source_journal_target_lines_total Total number of successful journal lines read
123
# TYPE loki_source_journal_target_lines_total counter
124
loki_source_journal_target_lines_total 10
125
`
126
127
if err := testutil.GatherAndCompare(registry,
128
strings.NewReader(expectedMetrics)); err != nil {
129
t.Fatalf("mismatch metrics: %v", err)
130
}
131
assert.Len(t, client.Received(), 10)
132
}
133
134
func TestJournalTargetParsingErrors(t *testing.T) {
135
w := log.NewSyncWriter(os.Stderr)
136
logger := log.NewLogfmtLogger(w)
137
138
testutils.InitRandom()
139
dirName := "/tmp/" + testutils.RandName()
140
positionsFileName := dirName + "/positions.yml"
141
142
// Set the sync period to a really long value, to guarantee the sync timer
143
// never runs, this way we know everything saved was done through channel
144
// notifications when target.stop() was called.
145
ps, err := positions.New(logger, positions.Config{
146
SyncPeriod: 10 * time.Second,
147
PositionsFile: positionsFileName,
148
})
149
if err != nil {
150
t.Fatal(err)
151
}
152
153
client := fake.NewClient(func() {})
154
155
// We specify no relabel rules, so that we end up with an empty labelset
156
var relabels []*relabel.Config
157
158
registry := prometheus.NewRegistry()
159
jt, err := journalTargetWithReader(NewMetrics(registry), logger, client, ps, "test", relabels,
160
&scrapeconfig.JournalTargetConfig{}, newMockJournalReader, newMockJournalEntry(nil))
161
require.NoError(t, err)
162
163
r := jt.r.(*mockJournalReader)
164
r.t = t
165
166
// No labels but correct message
167
for i := 0; i < 10; i++ {
168
r.Write(map[string]string{
169
"MESSAGE": "ping",
170
"CODE_FILE": "journaltarget_test.go",
171
})
172
assert.NoError(t, err)
173
}
174
175
// No labels and no message
176
for i := 0; i < 10; i++ {
177
r.Write(map[string]string{
178
"CODE_FILE": "journaltarget_test.go",
179
})
180
assert.NoError(t, err)
181
}
182
require.NoError(t, jt.Stop())
183
client.Stop()
184
185
expectedMetrics := `# HELP loki_source_journal_target_lines_total Total number of successful journal lines read
186
# TYPE loki_source_journal_target_lines_total counter
187
loki_source_journal_target_lines_total 0
188
# HELP loki_source_journal_target_parsing_errors_total Total number of parsing errors while reading journal messages
189
# TYPE loki_source_journal_target_parsing_errors_total counter
190
loki_source_journal_target_parsing_errors_total{error="empty_labels"} 10
191
loki_source_journal_target_parsing_errors_total{error="no_message"} 10
192
`
193
194
if err := testutil.GatherAndCompare(registry,
195
strings.NewReader(expectedMetrics)); err != nil {
196
t.Fatalf("mismatch metrics: %v", err)
197
}
198
199
assert.Len(t, client.Received(), 0)
200
}
201
202
func TestJournalTarget_JSON(t *testing.T) {
203
w := log.NewSyncWriter(os.Stderr)
204
logger := log.NewLogfmtLogger(w)
205
206
testutils.InitRandom()
207
dirName := "/tmp/" + testutils.RandName()
208
positionsFileName := dirName + "/positions.yml"
209
210
// Set the sync period to a really long value, to guarantee the sync timer
211
// never runs, this way we know everything saved was done through channel
212
// notifications when target.stop() was called.
213
ps, err := positions.New(logger, positions.Config{
214
SyncPeriod: 10 * time.Second,
215
PositionsFile: positionsFileName,
216
})
217
if err != nil {
218
t.Fatal(err)
219
}
220
221
client := fake.NewClient(func() {})
222
223
relabelCfg := `
224
- source_labels: ['__journal_code_file']
225
regex: 'journaltarget_test\.go'
226
action: 'keep'
227
- source_labels: ['__journal_code_file']
228
target_label: 'code_file'`
229
230
var relabels []*relabel.Config
231
err = yaml.Unmarshal([]byte(relabelCfg), &relabels)
232
require.NoError(t, err)
233
234
cfg := &scrapeconfig.JournalTargetConfig{JSON: true}
235
236
jt, err := journalTargetWithReader(NewMetrics(prometheus.NewRegistry()), logger, client, ps, "test", relabels,
237
cfg, newMockJournalReader, newMockJournalEntry(nil))
238
require.NoError(t, err)
239
240
r := jt.r.(*mockJournalReader)
241
r.t = t
242
243
for i := 0; i < 10; i++ {
244
r.Write(map[string]string{
245
"MESSAGE": "ping",
246
"CODE_FILE": "journaltarget_test.go",
247
"OTHER_FIELD": "foobar",
248
})
249
assert.NoError(t, err)
250
251
}
252
expectMsg := `{"CODE_FILE":"journaltarget_test.go","MESSAGE":"ping","OTHER_FIELD":"foobar"}`
253
require.NoError(t, jt.Stop())
254
client.Stop()
255
256
assert.Len(t, client.Received(), 10)
257
for i := 0; i < 10; i++ {
258
require.Equal(t, expectMsg, client.Received()[i].Line)
259
}
260
}
261
262
func TestJournalTarget_Since(t *testing.T) {
263
w := log.NewSyncWriter(os.Stderr)
264
logger := log.NewLogfmtLogger(w)
265
266
testutils.InitRandom()
267
dirName := "/tmp/" + testutils.RandName()
268
positionsFileName := dirName + "/positions.yml"
269
270
// Set the sync period to a really long value, to guarantee the sync timer
271
// never runs, this way we know everything saved was done through channel
272
// notifications when target.stop() was called.
273
ps, err := positions.New(logger, positions.Config{
274
SyncPeriod: 10 * time.Second,
275
PositionsFile: positionsFileName,
276
})
277
if err != nil {
278
t.Fatal(err)
279
}
280
281
client := fake.NewClient(func() {})
282
283
cfg := scrapeconfig.JournalTargetConfig{
284
MaxAge: "4h",
285
}
286
287
jt, err := journalTargetWithReader(NewMetrics(prometheus.NewRegistry()), logger, client, ps, "test", nil,
288
&cfg, newMockJournalReader, newMockJournalEntry(nil))
289
require.NoError(t, err)
290
291
r := jt.r.(*mockJournalReader)
292
require.Equal(t, r.config.Since, -1*time.Hour*4)
293
client.Stop()
294
}
295
296
func TestJournalTarget_Cursor_TooOld(t *testing.T) {
297
w := log.NewSyncWriter(os.Stderr)
298
logger := log.NewLogfmtLogger(w)
299
300
testutils.InitRandom()
301
dirName := "/tmp/" + testutils.RandName()
302
positionsFileName := dirName + "/positions.yml"
303
304
// Set the sync period to a really long value, to guarantee the sync timer
305
// never runs, this way we know everything saved was done through channel
306
// notifications when target.stop() was called.
307
ps, err := positions.New(logger, positions.Config{
308
SyncPeriod: 10 * time.Second,
309
PositionsFile: positionsFileName,
310
})
311
if err != nil {
312
t.Fatal(err)
313
}
314
ps.PutString("journal-test", "", "foobar")
315
316
client := fake.NewClient(func() {})
317
318
cfg := scrapeconfig.JournalTargetConfig{}
319
320
entryTs := time.Date(1980, time.July, 3, 12, 0, 0, 0, time.UTC)
321
journalEntry := newMockJournalEntry(&sdjournal.JournalEntry{
322
Cursor: "foobar",
323
Fields: nil,
324
RealtimeTimestamp: uint64(entryTs.UnixNano()),
325
})
326
327
jt, err := journalTargetWithReader(NewMetrics(prometheus.NewRegistry()), logger, client, ps, "test", nil,
328
&cfg, newMockJournalReader, journalEntry)
329
require.NoError(t, err)
330
331
r := jt.r.(*mockJournalReader)
332
require.Equal(t, r.config.Since, -1*time.Hour*7)
333
client.Stop()
334
}
335
336
func TestJournalTarget_Cursor_NotTooOld(t *testing.T) {
337
w := log.NewSyncWriter(os.Stderr)
338
logger := log.NewLogfmtLogger(w)
339
340
testutils.InitRandom()
341
dirName := "/tmp/" + testutils.RandName()
342
positionsFileName := dirName + "/positions.yml"
343
344
// Set the sync period to a really long value, to guarantee the sync timer
345
// never runs, this way we know everything saved was done through channel
346
// notifications when target.stop() was called.
347
ps, err := positions.New(logger, positions.Config{
348
SyncPeriod: 10 * time.Second,
349
PositionsFile: positionsFileName,
350
})
351
if err != nil {
352
t.Fatal(err)
353
}
354
ps.PutString(positions.CursorKey("test"), "", "foobar")
355
356
client := fake.NewClient(func() {})
357
358
cfg := scrapeconfig.JournalTargetConfig{}
359
360
entryTs := time.Now().Add(-time.Hour)
361
journalEntry := newMockJournalEntry(&sdjournal.JournalEntry{
362
Cursor: "foobar",
363
Fields: nil,
364
RealtimeTimestamp: uint64(entryTs.UnixNano() / int64(time.Microsecond)),
365
})
366
367
jt, err := journalTargetWithReader(NewMetrics(prometheus.NewRegistry()), logger, client, ps, "test", nil,
368
&cfg, newMockJournalReader, journalEntry)
369
require.NoError(t, err)
370
371
r := jt.r.(*mockJournalReader)
372
require.Equal(t, r.config.Since, time.Duration(0))
373
require.Equal(t, r.config.Cursor, "foobar")
374
client.Stop()
375
}
376
377
func Test_MakeJournalFields(t *testing.T) {
378
entryFields := map[string]string{
379
"CODE_FILE": "journaltarget_test.go",
380
"OTHER_FIELD": "foobar",
381
"PRIORITY": "6",
382
}
383
receivedFields := makeJournalFields(entryFields)
384
expectedFields := map[string]string{
385
"__journal_code_file": "journaltarget_test.go",
386
"__journal_other_field": "foobar",
387
"__journal_priority": "6",
388
"__journal_priority_keyword": "info",
389
}
390
assert.Equal(t, expectedFields, receivedFields)
391
}
392
393
func TestJournalTarget_Matches(t *testing.T) {
394
w := log.NewSyncWriter(os.Stderr)
395
logger := log.NewLogfmtLogger(w)
396
397
testutils.InitRandom()
398
dirName := "/tmp/" + testutils.RandName()
399
positionsFileName := dirName + "/positions.yml"
400
401
// Set the sync period to a really long value, to guarantee the sync timer
402
// never runs, this way we know everything saved was done through channel
403
// notifications when target.stop() was called.
404
ps, err := positions.New(logger, positions.Config{
405
SyncPeriod: 10 * time.Second,
406
PositionsFile: positionsFileName,
407
})
408
if err != nil {
409
t.Fatal(err)
410
}
411
412
client := fake.NewClient(func() {})
413
414
cfg := scrapeconfig.JournalTargetConfig{
415
Matches: "UNIT=foo.service PRIORITY=1",
416
}
417
418
jt, err := journalTargetWithReader(NewMetrics(prometheus.NewRegistry()), logger, client, ps, "test", nil,
419
&cfg, newMockJournalReader, newMockJournalEntry(nil))
420
require.NoError(t, err)
421
422
r := jt.r.(*mockJournalReader)
423
matches := []sdjournal.Match{{Field: "UNIT", Value: "foo.service"}, {Field: "PRIORITY", Value: "1"}}
424
require.Equal(t, r.config.Matches, matches)
425
client.Stop()
426
}
427
428