Path: blob/main/component/loki/source/journal/internal/target/journaltarget.go
4096 views
//go:build linux && cgo && promtail_journal_enabled12package target34// This code is copied from Promtail with minor edits. The target package is used to5// configure and run the targets that can read journal entries and forward them6// to other loki components.78import (9"fmt"10"io"11"strings"12"syscall"13"time"1415"github.com/grafana/agent/component/common/loki"16"github.com/grafana/agent/component/common/loki/positions"1718"github.com/coreos/go-systemd/sdjournal"19"github.com/go-kit/log"20"github.com/go-kit/log/level"21jsoniter "github.com/json-iterator/go"22"github.com/pkg/errors"23"github.com/prometheus/common/model"24"github.com/prometheus/prometheus/model/labels"25"github.com/prometheus/prometheus/model/relabel"2627"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"28"github.com/grafana/loki/clients/pkg/promtail/targets/target"2930"github.com/grafana/loki/pkg/logproto"31)3233const (34// journalEmptyStr is represented as a single-character space because35// returning an empty string from sdjournal.JournalReaderConfig's36// Formatter causes an immediate EOF and induces performance issues37// with how that is handled in sdjournal.38journalEmptyStr = " "3940// journalDefaultMaxAgeTime represents the default earliest entry that41// will be read by the journal reader if there is no saved position42// newer than the "max_age" time.43journalDefaultMaxAgeTime = time.Hour * 744)4546const (47noMessageError = "no_message"48emptyLabelsError = "empty_labels"49)5051type journalReader interface {52io.Closer53Follow(until <-chan time.Time, writer io.Writer) error54}5556// Abstracted functions for interacting with the journal, used for mocking in tests:57type (58journalReaderFunc func(sdjournal.JournalReaderConfig) (journalReader, error)59journalEntryFunc func(cfg sdjournal.JournalReaderConfig, cursor string) (*sdjournal.JournalEntry, error)60)6162// Default implementations of abstracted functions:63var defaultJournalReaderFunc = func(c sdjournal.JournalReaderConfig) (journalReader, error) {64return sdjournal.NewJournalReader(c)65}6667var defaultJournalEntryFunc = func(c sdjournal.JournalReaderConfig, cursor string) (*sdjournal.JournalEntry, error) {68var (69journal *sdjournal.Journal70err error71)7273if c.Path != "" {74journal, err = sdjournal.NewJournalFromDir(c.Path)75} else {76journal, err = sdjournal.NewJournal()77}7879if err != nil {80return nil, err81} else if err := journal.SeekCursor(cursor); err != nil {82return nil, err83}8485// Just seeking the cursor won't give us the entry. We should call Next() or Previous()86// to get the closest following or the closest preceding entry. We have chosen here to call Next(),87// reason being, if we call Previous() we would re read an already read entry.88// More info here https://www.freedesktop.org/software/systemd/man/sd_journal_seek_cursor.html#89_, err = journal.Next()90if err != nil {91return nil, err92}9394return journal.GetEntry()95}9697// JournalTarget tails systemd journal entries.98// nolint99type JournalTarget struct {100metrics *Metrics101logger log.Logger102handler loki.EntryHandler103positions positions.Positions104positionPath string105relabelConfig []*relabel.Config106config *scrapeconfig.JournalTargetConfig107labels model.LabelSet108109r journalReader110until chan time.Time111}112113// NewJournalTarget configures a new JournalTarget.114func NewJournalTarget(115metrics *Metrics,116logger log.Logger,117handler loki.EntryHandler,118positions positions.Positions,119jobName string,120relabelConfig []*relabel.Config,121targetConfig *scrapeconfig.JournalTargetConfig,122) (*JournalTarget, error) {123124return journalTargetWithReader(125metrics,126logger,127handler,128positions,129jobName,130relabelConfig,131targetConfig,132defaultJournalReaderFunc,133defaultJournalEntryFunc,134)135}136137func journalTargetWithReader(138metrics *Metrics,139logger log.Logger,140handler loki.EntryHandler,141pos positions.Positions,142jobName string,143relabelConfig []*relabel.Config,144targetConfig *scrapeconfig.JournalTargetConfig,145readerFunc journalReaderFunc,146entryFunc journalEntryFunc,147) (*JournalTarget, error) {148149positionPath := positions.CursorKey(jobName)150position := pos.GetString(positionPath, "")151152if readerFunc == nil {153readerFunc = defaultJournalReaderFunc154}155if entryFunc == nil {156entryFunc = defaultJournalEntryFunc157}158159until := make(chan time.Time)160t := &JournalTarget{161metrics: metrics,162logger: logger,163handler: handler,164positions: pos,165positionPath: positionPath,166relabelConfig: relabelConfig,167labels: targetConfig.Labels,168config: targetConfig,169170until: until,171}172173var maxAge time.Duration174var err error175if targetConfig.MaxAge == "" {176maxAge = journalDefaultMaxAgeTime177} else {178maxAge, err = time.ParseDuration(targetConfig.MaxAge)179}180if err != nil {181return nil, errors.Wrap(err, "parsing journal reader 'max_age' config value")182}183184cb := journalConfigBuilder{185JournalPath: targetConfig.Path,186Position: position,187MaxAge: maxAge,188EntryFunc: entryFunc,189}190191matches := strings.Fields(targetConfig.Matches)192for _, m := range matches {193fv := strings.Split(m, "=")194if len(fv) != 2 {195return nil, errors.New("Error parsing journal reader 'matches' config value")196}197cb.Matches = append(cb.Matches, sdjournal.Match{198Field: fv[0],199Value: fv[1],200})201}202203cfg := t.generateJournalConfig(cb)204t.r, err = readerFunc(cfg)205if err != nil {206return nil, errors.Wrap(err, "creating journal reader")207}208209go func() {210for {211err := t.r.Follow(until, io.Discard)212if err != nil {213level.Error(t.logger).Log("msg", "received error during sdjournal follow", "err", err.Error())214215if err == sdjournal.ErrExpired || err == syscall.EBADMSG || err == io.EOF {216level.Error(t.logger).Log("msg", "unable to follow journal", "err", err.Error())217return218}219}220221// prevent tight loop222time.Sleep(100 * time.Millisecond)223}224}()225226return t, nil227}228229type journalConfigBuilder struct {230JournalPath string231Position string232Matches []sdjournal.Match233MaxAge time.Duration234EntryFunc journalEntryFunc235}236237// generateJournalConfig generates a journal config by trying to intelligently238// determine if a time offset or the cursor should be used for the starting239// position in the reader.240func (t *JournalTarget) generateJournalConfig(241cb journalConfigBuilder,242) sdjournal.JournalReaderConfig {243244cfg := sdjournal.JournalReaderConfig{245Path: cb.JournalPath,246Matches: cb.Matches,247Formatter: t.formatter,248}249250// When generating the JournalReaderConfig, we want to preferably251// use the Cursor, since it's guaranteed unique to a given journal252// entry. When we don't know the cursor position (or want to set253// a start time), we'll fall back to the less-precise Since, which254// takes a negative duration back from the current system time.255//256// The presence of Since takes precedence over Cursor, so we only257// ever set one and not both here.258259if cb.Position == "" {260cfg.Since = -1 * cb.MaxAge261return cfg262}263264// We have a saved position and need to get that entry to see if it's265// older than cb.MaxAge. If it _is_ older, then we need to use cfg.Since266// rather than cfg.Cursor.267entry, err := cb.EntryFunc(cfg, cb.Position)268if err != nil {269level.Error(t.logger).Log("msg", "received error reading saved journal position", "err", err.Error())270cfg.Since = -1 * cb.MaxAge271return cfg272}273274ts := time.Unix(0, int64(entry.RealtimeTimestamp)*int64(time.Microsecond))275if time.Since(ts) > cb.MaxAge {276cfg.Since = -1 * cb.MaxAge277return cfg278}279280cfg.Cursor = cb.Position281return cfg282}283284func (t *JournalTarget) formatter(entry *sdjournal.JournalEntry) (string, error) {285ts := time.Unix(0, int64(entry.RealtimeTimestamp)*int64(time.Microsecond))286287var msg string288289if t.config.JSON {290json := jsoniter.ConfigCompatibleWithStandardLibrary291292bb, err := json.Marshal(entry.Fields)293if err != nil {294level.Error(t.logger).Log("msg", "could not marshal journal fields to JSON", "err", err, "unit", entry.Fields["_SYSTEMD_UNIT"])295return journalEmptyStr, nil296}297msg = string(bb)298} else {299var ok bool300msg, ok = entry.Fields["MESSAGE"]301if !ok {302level.Debug(t.logger).Log("msg", "received journal entry with no MESSAGE field", "unit", entry.Fields["_SYSTEMD_UNIT"])303t.metrics.journalErrors.WithLabelValues(noMessageError).Inc()304return journalEmptyStr, nil305}306}307308entryLabels := makeJournalFields(entry.Fields)309310// Add constant labels311for k, v := range t.labels {312entryLabels[string(k)] = string(v)313}314315processedLabels, _ := relabel.Process(labels.FromMap(entryLabels), t.relabelConfig...)316317processedLabelsMap := processedLabels.Map()318lbls := make(model.LabelSet, len(processedLabelsMap))319for k, v := range processedLabelsMap {320if k[0:2] == "__" {321continue322}323324lbls[model.LabelName(k)] = model.LabelValue(v)325}326if len(lbls) == 0 {327// No labels, drop journal entry328level.Debug(t.logger).Log("msg", "received journal entry with no labels", "unit", entry.Fields["_SYSTEMD_UNIT"])329t.metrics.journalErrors.WithLabelValues(emptyLabelsError).Inc()330return journalEmptyStr, nil331}332333t.metrics.journalLines.Inc()334t.positions.PutString(t.positionPath, "", entry.Cursor)335t.handler.Chan() <- loki.Entry{336Labels: lbls,337Entry: logproto.Entry{338Line: msg,339Timestamp: ts,340},341}342return journalEmptyStr, nil343}344345// Type returns JournalTargetType.346func (t *JournalTarget) Type() target.TargetType {347return target.JournalTargetType348}349350// Ready indicates whether or not the journal is ready to be351// read from.352func (t *JournalTarget) Ready() bool {353return true354}355356// DiscoveredLabels returns the set of labels discovered by357// the JournalTarget, which is always nil. Implements358// Target.359func (t *JournalTarget) DiscoveredLabels() model.LabelSet {360return nil361}362363// Labels returns the set of labels that statically apply to364// all log entries produced by the JournalTarget.365func (t *JournalTarget) Labels() model.LabelSet {366return t.labels367}368369// Details returns target-specific details.370func (t *JournalTarget) Details() interface{} {371return map[string]string{372"position": t.positions.GetString(t.positionPath, ""),373}374}375376// Stop shuts down the JournalTarget.377func (t *JournalTarget) Stop() error {378t.until <- time.Now()379err := t.r.Close()380t.handler.Stop()381return err382}383384func makeJournalFields(fields map[string]string) map[string]string {385result := make(map[string]string, len(fields))386for k, v := range fields {387if k == "PRIORITY" {388result[fmt.Sprintf("__journal_%s_%s", strings.ToLower(k), "keyword")] = makeJournalPriority(v)389}390result[fmt.Sprintf("__journal_%s", strings.ToLower(k))] = v391}392return result393}394395func makeJournalPriority(priority string) string {396switch priority {397case "0":398return "emerg"399case "1":400return "alert"401case "2":402return "crit"403case "3":404return "error"405case "4":406return "warning"407case "5":408return "notice"409case "6":410return "info"411case "7":412return "debug"413}414return priority415}416417418