Path: blob/main/component/loki/source/journal/internal/target/journaltarget_test.go
4096 views
//go:build linux && cgo && promtail_journal_enabled12package target34// This code is copied from Promtail with minor edits. The target package is used to5// configure and run the targets that can read journal entries and forward them6// to other loki components.78import (9"io"10"os"11"strings"12"testing"13"time"1415"github.com/grafana/agent/component/common/loki/client/fake"1617"github.com/coreos/go-systemd/sdjournal"18"github.com/go-kit/log"19"github.com/grafana/agent/component/common/loki/positions"20"github.com/prometheus/client_golang/prometheus"21"github.com/prometheus/client_golang/prometheus/testutil"22"github.com/prometheus/prometheus/model/relabel"23"github.com/stretchr/testify/assert"24"github.com/stretchr/testify/require"25"gopkg.in/yaml.v2"2627"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"28"github.com/grafana/loki/clients/pkg/promtail/targets/testutils"29)3031type mockJournalReader struct {32config sdjournal.JournalReaderConfig33t *testing.T34}3536func newMockJournalReader(c sdjournal.JournalReaderConfig) (journalReader, error) {37return &mockJournalReader{config: c}, nil38}3940func (r *mockJournalReader) Close() error {41return nil42}4344func (r *mockJournalReader) Follow(until <-chan time.Time, writer io.Writer) error {45<-until46return nil47}4849func newMockJournalEntry(entry *sdjournal.JournalEntry) journalEntryFunc {50return func(c sdjournal.JournalReaderConfig, cursor string) (*sdjournal.JournalEntry, error) {51return entry, nil52}53}5455func (r *mockJournalReader) Write(fields map[string]string) {56allFields := make(map[string]string, len(fields))57for k, v := range fields {58allFields[k] = v59}6061ts := uint64(time.Now().UnixNano())6263_, err := r.config.Formatter(&sdjournal.JournalEntry{64Fields: allFields,65MonotonicTimestamp: ts,66RealtimeTimestamp: ts,67})68assert.NoError(r.t, err)69}7071func TestJournalTarget(t *testing.T) {72w := log.NewSyncWriter(os.Stderr)73logger := log.NewLogfmtLogger(w)7475testutils.InitRandom()76dirName := "/tmp/" + testutils.RandName()77positionsFileName := dirName + "/positions.yml"7879// Set the sync period to a really long value, to guarantee the sync timer80// never runs, this way we know everything saved was done through channel81// notifications when target.stop() was called.82ps, err := positions.New(logger, positions.Config{83SyncPeriod: 10 * time.Second,84PositionsFile: positionsFileName,85})86if err != nil {87t.Fatal(err)88}8990client := fake.NewClient(func() {})9192relabelCfg := `93- source_labels: ['__journal_code_file']94regex: 'journaltarget_test\.go'95action: 'keep'96- source_labels: ['__journal_code_file']97target_label: 'code_file'`9899var relabels []*relabel.Config100err = yaml.Unmarshal([]byte(relabelCfg), &relabels)101require.NoError(t, err)102103registry := prometheus.NewRegistry()104jt, err := journalTargetWithReader(NewMetrics(registry), logger, client, ps, "test", relabels,105&scrapeconfig.JournalTargetConfig{}, newMockJournalReader, newMockJournalEntry(nil))106require.NoError(t, err)107108r := jt.r.(*mockJournalReader)109r.t = t110111for i := 0; i < 10; i++ {112r.Write(map[string]string{113"MESSAGE": "ping",114"CODE_FILE": "journaltarget_test.go",115})116assert.NoError(t, err)117}118require.NoError(t, jt.Stop())119client.Stop()120121expectedMetrics := `# HELP loki_source_journal_target_lines_total Total number of successful journal lines read122# TYPE loki_source_journal_target_lines_total counter123loki_source_journal_target_lines_total 10124`125126if err := testutil.GatherAndCompare(registry,127strings.NewReader(expectedMetrics)); err != nil {128t.Fatalf("mismatch metrics: %v", err)129}130assert.Len(t, client.Received(), 10)131}132133func TestJournalTargetParsingErrors(t *testing.T) {134w := log.NewSyncWriter(os.Stderr)135logger := log.NewLogfmtLogger(w)136137testutils.InitRandom()138dirName := "/tmp/" + testutils.RandName()139positionsFileName := dirName + "/positions.yml"140141// Set the sync period to a really long value, to guarantee the sync timer142// never runs, this way we know everything saved was done through channel143// notifications when target.stop() was called.144ps, err := positions.New(logger, positions.Config{145SyncPeriod: 10 * time.Second,146PositionsFile: positionsFileName,147})148if err != nil {149t.Fatal(err)150}151152client := fake.NewClient(func() {})153154// We specify no relabel rules, so that we end up with an empty labelset155var relabels []*relabel.Config156157registry := prometheus.NewRegistry()158jt, err := journalTargetWithReader(NewMetrics(registry), logger, client, ps, "test", relabels,159&scrapeconfig.JournalTargetConfig{}, newMockJournalReader, newMockJournalEntry(nil))160require.NoError(t, err)161162r := jt.r.(*mockJournalReader)163r.t = t164165// No labels but correct message166for i := 0; i < 10; i++ {167r.Write(map[string]string{168"MESSAGE": "ping",169"CODE_FILE": "journaltarget_test.go",170})171assert.NoError(t, err)172}173174// No labels and no message175for i := 0; i < 10; i++ {176r.Write(map[string]string{177"CODE_FILE": "journaltarget_test.go",178})179assert.NoError(t, err)180}181require.NoError(t, jt.Stop())182client.Stop()183184expectedMetrics := `# HELP loki_source_journal_target_lines_total Total number of successful journal lines read185# TYPE loki_source_journal_target_lines_total counter186loki_source_journal_target_lines_total 0187# HELP loki_source_journal_target_parsing_errors_total Total number of parsing errors while reading journal messages188# TYPE loki_source_journal_target_parsing_errors_total counter189loki_source_journal_target_parsing_errors_total{error="empty_labels"} 10190loki_source_journal_target_parsing_errors_total{error="no_message"} 10191`192193if err := testutil.GatherAndCompare(registry,194strings.NewReader(expectedMetrics)); err != nil {195t.Fatalf("mismatch metrics: %v", err)196}197198assert.Len(t, client.Received(), 0)199}200201func TestJournalTarget_JSON(t *testing.T) {202w := log.NewSyncWriter(os.Stderr)203logger := log.NewLogfmtLogger(w)204205testutils.InitRandom()206dirName := "/tmp/" + testutils.RandName()207positionsFileName := dirName + "/positions.yml"208209// Set the sync period to a really long value, to guarantee the sync timer210// never runs, this way we know everything saved was done through channel211// notifications when target.stop() was called.212ps, err := positions.New(logger, positions.Config{213SyncPeriod: 10 * time.Second,214PositionsFile: positionsFileName,215})216if err != nil {217t.Fatal(err)218}219220client := fake.NewClient(func() {})221222relabelCfg := `223- source_labels: ['__journal_code_file']224regex: 'journaltarget_test\.go'225action: 'keep'226- source_labels: ['__journal_code_file']227target_label: 'code_file'`228229var relabels []*relabel.Config230err = yaml.Unmarshal([]byte(relabelCfg), &relabels)231require.NoError(t, err)232233cfg := &scrapeconfig.JournalTargetConfig{JSON: true}234235jt, err := journalTargetWithReader(NewMetrics(prometheus.NewRegistry()), logger, client, ps, "test", relabels,236cfg, newMockJournalReader, newMockJournalEntry(nil))237require.NoError(t, err)238239r := jt.r.(*mockJournalReader)240r.t = t241242for i := 0; i < 10; i++ {243r.Write(map[string]string{244"MESSAGE": "ping",245"CODE_FILE": "journaltarget_test.go",246"OTHER_FIELD": "foobar",247})248assert.NoError(t, err)249250}251expectMsg := `{"CODE_FILE":"journaltarget_test.go","MESSAGE":"ping","OTHER_FIELD":"foobar"}`252require.NoError(t, jt.Stop())253client.Stop()254255assert.Len(t, client.Received(), 10)256for i := 0; i < 10; i++ {257require.Equal(t, expectMsg, client.Received()[i].Line)258}259}260261func TestJournalTarget_Since(t *testing.T) {262w := log.NewSyncWriter(os.Stderr)263logger := log.NewLogfmtLogger(w)264265testutils.InitRandom()266dirName := "/tmp/" + testutils.RandName()267positionsFileName := dirName + "/positions.yml"268269// Set the sync period to a really long value, to guarantee the sync timer270// never runs, this way we know everything saved was done through channel271// notifications when target.stop() was called.272ps, err := positions.New(logger, positions.Config{273SyncPeriod: 10 * time.Second,274PositionsFile: positionsFileName,275})276if err != nil {277t.Fatal(err)278}279280client := fake.NewClient(func() {})281282cfg := scrapeconfig.JournalTargetConfig{283MaxAge: "4h",284}285286jt, err := journalTargetWithReader(NewMetrics(prometheus.NewRegistry()), logger, client, ps, "test", nil,287&cfg, newMockJournalReader, newMockJournalEntry(nil))288require.NoError(t, err)289290r := jt.r.(*mockJournalReader)291require.Equal(t, r.config.Since, -1*time.Hour*4)292client.Stop()293}294295func TestJournalTarget_Cursor_TooOld(t *testing.T) {296w := log.NewSyncWriter(os.Stderr)297logger := log.NewLogfmtLogger(w)298299testutils.InitRandom()300dirName := "/tmp/" + testutils.RandName()301positionsFileName := dirName + "/positions.yml"302303// Set the sync period to a really long value, to guarantee the sync timer304// never runs, this way we know everything saved was done through channel305// notifications when target.stop() was called.306ps, err := positions.New(logger, positions.Config{307SyncPeriod: 10 * time.Second,308PositionsFile: positionsFileName,309})310if err != nil {311t.Fatal(err)312}313ps.PutString("journal-test", "", "foobar")314315client := fake.NewClient(func() {})316317cfg := scrapeconfig.JournalTargetConfig{}318319entryTs := time.Date(1980, time.July, 3, 12, 0, 0, 0, time.UTC)320journalEntry := newMockJournalEntry(&sdjournal.JournalEntry{321Cursor: "foobar",322Fields: nil,323RealtimeTimestamp: uint64(entryTs.UnixNano()),324})325326jt, err := journalTargetWithReader(NewMetrics(prometheus.NewRegistry()), logger, client, ps, "test", nil,327&cfg, newMockJournalReader, journalEntry)328require.NoError(t, err)329330r := jt.r.(*mockJournalReader)331require.Equal(t, r.config.Since, -1*time.Hour*7)332client.Stop()333}334335func TestJournalTarget_Cursor_NotTooOld(t *testing.T) {336w := log.NewSyncWriter(os.Stderr)337logger := log.NewLogfmtLogger(w)338339testutils.InitRandom()340dirName := "/tmp/" + testutils.RandName()341positionsFileName := dirName + "/positions.yml"342343// Set the sync period to a really long value, to guarantee the sync timer344// never runs, this way we know everything saved was done through channel345// notifications when target.stop() was called.346ps, err := positions.New(logger, positions.Config{347SyncPeriod: 10 * time.Second,348PositionsFile: positionsFileName,349})350if err != nil {351t.Fatal(err)352}353ps.PutString(positions.CursorKey("test"), "", "foobar")354355client := fake.NewClient(func() {})356357cfg := scrapeconfig.JournalTargetConfig{}358359entryTs := time.Now().Add(-time.Hour)360journalEntry := newMockJournalEntry(&sdjournal.JournalEntry{361Cursor: "foobar",362Fields: nil,363RealtimeTimestamp: uint64(entryTs.UnixNano() / int64(time.Microsecond)),364})365366jt, err := journalTargetWithReader(NewMetrics(prometheus.NewRegistry()), logger, client, ps, "test", nil,367&cfg, newMockJournalReader, journalEntry)368require.NoError(t, err)369370r := jt.r.(*mockJournalReader)371require.Equal(t, r.config.Since, time.Duration(0))372require.Equal(t, r.config.Cursor, "foobar")373client.Stop()374}375376func Test_MakeJournalFields(t *testing.T) {377entryFields := map[string]string{378"CODE_FILE": "journaltarget_test.go",379"OTHER_FIELD": "foobar",380"PRIORITY": "6",381}382receivedFields := makeJournalFields(entryFields)383expectedFields := map[string]string{384"__journal_code_file": "journaltarget_test.go",385"__journal_other_field": "foobar",386"__journal_priority": "6",387"__journal_priority_keyword": "info",388}389assert.Equal(t, expectedFields, receivedFields)390}391392func TestJournalTarget_Matches(t *testing.T) {393w := log.NewSyncWriter(os.Stderr)394logger := log.NewLogfmtLogger(w)395396testutils.InitRandom()397dirName := "/tmp/" + testutils.RandName()398positionsFileName := dirName + "/positions.yml"399400// Set the sync period to a really long value, to guarantee the sync timer401// never runs, this way we know everything saved was done through channel402// notifications when target.stop() was called.403ps, err := positions.New(logger, positions.Config{404SyncPeriod: 10 * time.Second,405PositionsFile: positionsFileName,406})407if err != nil {408t.Fatal(err)409}410411client := fake.NewClient(func() {})412413cfg := scrapeconfig.JournalTargetConfig{414Matches: "UNIT=foo.service PRIORITY=1",415}416417jt, err := journalTargetWithReader(NewMetrics(prometheus.NewRegistry()), logger, client, ps, "test", nil,418&cfg, newMockJournalReader, newMockJournalEntry(nil))419require.NoError(t, err)420421r := jt.r.(*mockJournalReader)422matches := []sdjournal.Match{{Field: "UNIT", Value: "foo.service"}, {Field: "PRIORITY", Value: "1"}}423require.Equal(t, r.config.Matches, matches)424client.Stop()425}426427428