Path: blob/main/component/loki/source/journal/journal.go
4096 views
//go:build linux && cgo && promtail_journal_enabled12package journal34import (5"context"6"os"7"path/filepath"8"sync"9"time"1011"github.com/grafana/agent/component/common/loki"12"github.com/grafana/agent/component/common/loki/positions"13flow_relabel "github.com/grafana/agent/component/common/relabel"14"github.com/grafana/agent/component/loki/source/journal/internal/target"15"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"16"github.com/prometheus/common/model"1718"github.com/grafana/agent/component"19)2021func init() {22component.Register(component.Registration{23Name: "loki.source.journal",24Args: Arguments{},2526Build: func(opts component.Options, args component.Arguments) (component.Component, error) {27return New(opts, args.(Arguments))28},29})30}3132var _ component.Component = (*Component)(nil)3334// Component represents reading from a journal35type Component struct {36mut sync.RWMutex37t *target.JournalTarget38metrics *target.Metrics39o component.Options40handler chan loki.Entry41positions positions.Positions42receivers []loki.LogsReceiver43}4445// New creates a new component.46func New(o component.Options, args Arguments) (component.Component, error) {47positionsFile, err := positions.New(o.Logger, positions.Config{48SyncPeriod: 10 * time.Second,49PositionsFile: filepath.Join(o.DataPath, "positions.yml"),50IgnoreInvalidYaml: false,51ReadOnly: false,52})53if err != nil {54return nil, err55}56err = os.MkdirAll(o.DataPath, 0644)57if err != nil {58return nil, err59}60c := &Component{61metrics: target.NewMetrics(o.Registerer),62o: o,63handler: make(chan loki.Entry),64positions: positionsFile,65receivers: args.Receivers,66}67err = c.Update(args)68return c, err69}7071// Run starts the component.72func (c *Component) Run(ctx context.Context) error {73defer func() {74c.mut.RLock()75if c.t != nil {76c.t.Stop()77}78c.mut.RUnlock()7980}()81for {82select {83case <-ctx.Done():84return nil85case entry := <-c.handler:86c.mut.RLock()87lokiEntry := loki.Entry{88Labels: entry.Labels,89Entry: entry.Entry,90}91for _, r := range c.receivers {92r <- lokiEntry93}94c.mut.RUnlock()95}96}97}9899// Update updates the fields of the component.100func (c *Component) Update(args component.Arguments) error {101newArgs := args.(Arguments)102c.mut.Lock()103defer c.mut.Unlock()104if c.t != nil {105err := c.t.Stop()106if err != nil {107return err108}109}110rcs := flow_relabel.ComponentToPromRelabelConfigs(newArgs.RelabelRules)111entryHandler := loki.NewEntryHandler(c.handler, func() {})112113newTarget, err := target.NewJournalTarget(c.metrics, c.o.Logger, entryHandler, c.positions, c.o.ID, rcs, convertArgs(c.o.ID, newArgs))114if err != nil {115return err116}117c.t = newTarget118return nil119}120121func convertArgs(job string, a Arguments) *scrapeconfig.JournalTargetConfig {122return &scrapeconfig.JournalTargetConfig{123MaxAge: a.MaxAge.String(),124JSON: a.FormatAsJson,125Labels: model.LabelSet{"job": model.LabelValue(job)},126Path: a.Path,127Matches: a.Matches,128}129}130131132