Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/journal/journal.go
4096 views
1
//go:build linux && cgo && promtail_journal_enabled
2
3
package journal
4
5
import (
6
"context"
7
"os"
8
"path/filepath"
9
"sync"
10
"time"
11
12
"github.com/grafana/agent/component/common/loki"
13
"github.com/grafana/agent/component/common/loki/positions"
14
flow_relabel "github.com/grafana/agent/component/common/relabel"
15
"github.com/grafana/agent/component/loki/source/journal/internal/target"
16
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
17
"github.com/prometheus/common/model"
18
19
"github.com/grafana/agent/component"
20
)
21
22
func init() {
23
component.Register(component.Registration{
24
Name: "loki.source.journal",
25
Args: Arguments{},
26
27
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
28
return New(opts, args.(Arguments))
29
},
30
})
31
}
32
33
var _ component.Component = (*Component)(nil)
34
35
// Component represents reading from a journal
36
type Component struct {
37
mut sync.RWMutex
38
t *target.JournalTarget
39
metrics *target.Metrics
40
o component.Options
41
handler chan loki.Entry
42
positions positions.Positions
43
receivers []loki.LogsReceiver
44
}
45
46
// New creates a new component.
47
func New(o component.Options, args Arguments) (component.Component, error) {
48
positionsFile, err := positions.New(o.Logger, positions.Config{
49
SyncPeriod: 10 * time.Second,
50
PositionsFile: filepath.Join(o.DataPath, "positions.yml"),
51
IgnoreInvalidYaml: false,
52
ReadOnly: false,
53
})
54
if err != nil {
55
return nil, err
56
}
57
err = os.MkdirAll(o.DataPath, 0644)
58
if err != nil {
59
return nil, err
60
}
61
c := &Component{
62
metrics: target.NewMetrics(o.Registerer),
63
o: o,
64
handler: make(chan loki.Entry),
65
positions: positionsFile,
66
receivers: args.Receivers,
67
}
68
err = c.Update(args)
69
return c, err
70
}
71
72
// Run starts the component.
73
func (c *Component) Run(ctx context.Context) error {
74
defer func() {
75
c.mut.RLock()
76
if c.t != nil {
77
c.t.Stop()
78
}
79
c.mut.RUnlock()
80
81
}()
82
for {
83
select {
84
case <-ctx.Done():
85
return nil
86
case entry := <-c.handler:
87
c.mut.RLock()
88
lokiEntry := loki.Entry{
89
Labels: entry.Labels,
90
Entry: entry.Entry,
91
}
92
for _, r := range c.receivers {
93
r <- lokiEntry
94
}
95
c.mut.RUnlock()
96
}
97
}
98
}
99
100
// Update updates the fields of the component.
101
func (c *Component) Update(args component.Arguments) error {
102
newArgs := args.(Arguments)
103
c.mut.Lock()
104
defer c.mut.Unlock()
105
if c.t != nil {
106
err := c.t.Stop()
107
if err != nil {
108
return err
109
}
110
}
111
rcs := flow_relabel.ComponentToPromRelabelConfigs(newArgs.RelabelRules)
112
entryHandler := loki.NewEntryHandler(c.handler, func() {})
113
114
newTarget, err := target.NewJournalTarget(c.metrics, c.o.Logger, entryHandler, c.positions, c.o.ID, rcs, convertArgs(c.o.ID, newArgs))
115
if err != nil {
116
return err
117
}
118
c.t = newTarget
119
return nil
120
}
121
122
func convertArgs(job string, a Arguments) *scrapeconfig.JournalTargetConfig {
123
return &scrapeconfig.JournalTargetConfig{
124
MaxAge: a.MaxAge.String(),
125
JSON: a.FormatAsJson,
126
Labels: model.LabelSet{"job": model.LabelValue(job)},
127
Path: a.Path,
128
Matches: a.Matches,
129
}
130
}
131
132