Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/journal/internal/target/journaltarget.go
4096 views
1
//go:build linux && cgo && promtail_journal_enabled
2
3
package target
4
5
// This code is copied from Promtail with minor edits. The target package is used to
6
// configure and run the targets that can read journal entries and forward them
7
// to other loki components.
8
9
import (
10
"fmt"
11
"io"
12
"strings"
13
"syscall"
14
"time"
15
16
"github.com/grafana/agent/component/common/loki"
17
"github.com/grafana/agent/component/common/loki/positions"
18
19
"github.com/coreos/go-systemd/sdjournal"
20
"github.com/go-kit/log"
21
"github.com/go-kit/log/level"
22
jsoniter "github.com/json-iterator/go"
23
"github.com/pkg/errors"
24
"github.com/prometheus/common/model"
25
"github.com/prometheus/prometheus/model/labels"
26
"github.com/prometheus/prometheus/model/relabel"
27
28
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
29
"github.com/grafana/loki/clients/pkg/promtail/targets/target"
30
31
"github.com/grafana/loki/pkg/logproto"
32
)
33
34
const (
35
// journalEmptyStr is represented as a single-character space because
36
// returning an empty string from sdjournal.JournalReaderConfig's
37
// Formatter causes an immediate EOF and induces performance issues
38
// with how that is handled in sdjournal.
39
journalEmptyStr = " "
40
41
// journalDefaultMaxAgeTime represents the default earliest entry that
42
// will be read by the journal reader if there is no saved position
43
// newer than the "max_age" time.
44
journalDefaultMaxAgeTime = time.Hour * 7
45
)
46
47
const (
48
noMessageError = "no_message"
49
emptyLabelsError = "empty_labels"
50
)
51
52
type journalReader interface {
53
io.Closer
54
Follow(until <-chan time.Time, writer io.Writer) error
55
}
56
57
// Abstracted functions for interacting with the journal, used for mocking in tests:
58
type (
59
journalReaderFunc func(sdjournal.JournalReaderConfig) (journalReader, error)
60
journalEntryFunc func(cfg sdjournal.JournalReaderConfig, cursor string) (*sdjournal.JournalEntry, error)
61
)
62
63
// Default implementations of abstracted functions:
64
var defaultJournalReaderFunc = func(c sdjournal.JournalReaderConfig) (journalReader, error) {
65
return sdjournal.NewJournalReader(c)
66
}
67
68
var defaultJournalEntryFunc = func(c sdjournal.JournalReaderConfig, cursor string) (*sdjournal.JournalEntry, error) {
69
var (
70
journal *sdjournal.Journal
71
err error
72
)
73
74
if c.Path != "" {
75
journal, err = sdjournal.NewJournalFromDir(c.Path)
76
} else {
77
journal, err = sdjournal.NewJournal()
78
}
79
80
if err != nil {
81
return nil, err
82
} else if err := journal.SeekCursor(cursor); err != nil {
83
return nil, err
84
}
85
86
// Just seeking the cursor won't give us the entry. We should call Next() or Previous()
87
// to get the closest following or the closest preceding entry. We have chosen here to call Next(),
88
// reason being, if we call Previous() we would re read an already read entry.
89
// More info here https://www.freedesktop.org/software/systemd/man/sd_journal_seek_cursor.html#
90
_, err = journal.Next()
91
if err != nil {
92
return nil, err
93
}
94
95
return journal.GetEntry()
96
}
97
98
// JournalTarget tails systemd journal entries.
99
// nolint
100
type JournalTarget struct {
101
metrics *Metrics
102
logger log.Logger
103
handler loki.EntryHandler
104
positions positions.Positions
105
positionPath string
106
relabelConfig []*relabel.Config
107
config *scrapeconfig.JournalTargetConfig
108
labels model.LabelSet
109
110
r journalReader
111
until chan time.Time
112
}
113
114
// NewJournalTarget configures a new JournalTarget.
115
func NewJournalTarget(
116
metrics *Metrics,
117
logger log.Logger,
118
handler loki.EntryHandler,
119
positions positions.Positions,
120
jobName string,
121
relabelConfig []*relabel.Config,
122
targetConfig *scrapeconfig.JournalTargetConfig,
123
) (*JournalTarget, error) {
124
125
return journalTargetWithReader(
126
metrics,
127
logger,
128
handler,
129
positions,
130
jobName,
131
relabelConfig,
132
targetConfig,
133
defaultJournalReaderFunc,
134
defaultJournalEntryFunc,
135
)
136
}
137
138
func journalTargetWithReader(
139
metrics *Metrics,
140
logger log.Logger,
141
handler loki.EntryHandler,
142
pos positions.Positions,
143
jobName string,
144
relabelConfig []*relabel.Config,
145
targetConfig *scrapeconfig.JournalTargetConfig,
146
readerFunc journalReaderFunc,
147
entryFunc journalEntryFunc,
148
) (*JournalTarget, error) {
149
150
positionPath := positions.CursorKey(jobName)
151
position := pos.GetString(positionPath, "")
152
153
if readerFunc == nil {
154
readerFunc = defaultJournalReaderFunc
155
}
156
if entryFunc == nil {
157
entryFunc = defaultJournalEntryFunc
158
}
159
160
until := make(chan time.Time)
161
t := &JournalTarget{
162
metrics: metrics,
163
logger: logger,
164
handler: handler,
165
positions: pos,
166
positionPath: positionPath,
167
relabelConfig: relabelConfig,
168
labels: targetConfig.Labels,
169
config: targetConfig,
170
171
until: until,
172
}
173
174
var maxAge time.Duration
175
var err error
176
if targetConfig.MaxAge == "" {
177
maxAge = journalDefaultMaxAgeTime
178
} else {
179
maxAge, err = time.ParseDuration(targetConfig.MaxAge)
180
}
181
if err != nil {
182
return nil, errors.Wrap(err, "parsing journal reader 'max_age' config value")
183
}
184
185
cb := journalConfigBuilder{
186
JournalPath: targetConfig.Path,
187
Position: position,
188
MaxAge: maxAge,
189
EntryFunc: entryFunc,
190
}
191
192
matches := strings.Fields(targetConfig.Matches)
193
for _, m := range matches {
194
fv := strings.Split(m, "=")
195
if len(fv) != 2 {
196
return nil, errors.New("Error parsing journal reader 'matches' config value")
197
}
198
cb.Matches = append(cb.Matches, sdjournal.Match{
199
Field: fv[0],
200
Value: fv[1],
201
})
202
}
203
204
cfg := t.generateJournalConfig(cb)
205
t.r, err = readerFunc(cfg)
206
if err != nil {
207
return nil, errors.Wrap(err, "creating journal reader")
208
}
209
210
go func() {
211
for {
212
err := t.r.Follow(until, io.Discard)
213
if err != nil {
214
level.Error(t.logger).Log("msg", "received error during sdjournal follow", "err", err.Error())
215
216
if err == sdjournal.ErrExpired || err == syscall.EBADMSG || err == io.EOF {
217
level.Error(t.logger).Log("msg", "unable to follow journal", "err", err.Error())
218
return
219
}
220
}
221
222
// prevent tight loop
223
time.Sleep(100 * time.Millisecond)
224
}
225
}()
226
227
return t, nil
228
}
229
230
type journalConfigBuilder struct {
231
JournalPath string
232
Position string
233
Matches []sdjournal.Match
234
MaxAge time.Duration
235
EntryFunc journalEntryFunc
236
}
237
238
// generateJournalConfig generates a journal config by trying to intelligently
239
// determine if a time offset or the cursor should be used for the starting
240
// position in the reader.
241
func (t *JournalTarget) generateJournalConfig(
242
cb journalConfigBuilder,
243
) sdjournal.JournalReaderConfig {
244
245
cfg := sdjournal.JournalReaderConfig{
246
Path: cb.JournalPath,
247
Matches: cb.Matches,
248
Formatter: t.formatter,
249
}
250
251
// When generating the JournalReaderConfig, we want to preferably
252
// use the Cursor, since it's guaranteed unique to a given journal
253
// entry. When we don't know the cursor position (or want to set
254
// a start time), we'll fall back to the less-precise Since, which
255
// takes a negative duration back from the current system time.
256
//
257
// The presence of Since takes precedence over Cursor, so we only
258
// ever set one and not both here.
259
260
if cb.Position == "" {
261
cfg.Since = -1 * cb.MaxAge
262
return cfg
263
}
264
265
// We have a saved position and need to get that entry to see if it's
266
// older than cb.MaxAge. If it _is_ older, then we need to use cfg.Since
267
// rather than cfg.Cursor.
268
entry, err := cb.EntryFunc(cfg, cb.Position)
269
if err != nil {
270
level.Error(t.logger).Log("msg", "received error reading saved journal position", "err", err.Error())
271
cfg.Since = -1 * cb.MaxAge
272
return cfg
273
}
274
275
ts := time.Unix(0, int64(entry.RealtimeTimestamp)*int64(time.Microsecond))
276
if time.Since(ts) > cb.MaxAge {
277
cfg.Since = -1 * cb.MaxAge
278
return cfg
279
}
280
281
cfg.Cursor = cb.Position
282
return cfg
283
}
284
285
func (t *JournalTarget) formatter(entry *sdjournal.JournalEntry) (string, error) {
286
ts := time.Unix(0, int64(entry.RealtimeTimestamp)*int64(time.Microsecond))
287
288
var msg string
289
290
if t.config.JSON {
291
json := jsoniter.ConfigCompatibleWithStandardLibrary
292
293
bb, err := json.Marshal(entry.Fields)
294
if err != nil {
295
level.Error(t.logger).Log("msg", "could not marshal journal fields to JSON", "err", err, "unit", entry.Fields["_SYSTEMD_UNIT"])
296
return journalEmptyStr, nil
297
}
298
msg = string(bb)
299
} else {
300
var ok bool
301
msg, ok = entry.Fields["MESSAGE"]
302
if !ok {
303
level.Debug(t.logger).Log("msg", "received journal entry with no MESSAGE field", "unit", entry.Fields["_SYSTEMD_UNIT"])
304
t.metrics.journalErrors.WithLabelValues(noMessageError).Inc()
305
return journalEmptyStr, nil
306
}
307
}
308
309
entryLabels := makeJournalFields(entry.Fields)
310
311
// Add constant labels
312
for k, v := range t.labels {
313
entryLabels[string(k)] = string(v)
314
}
315
316
processedLabels, _ := relabel.Process(labels.FromMap(entryLabels), t.relabelConfig...)
317
318
processedLabelsMap := processedLabels.Map()
319
lbls := make(model.LabelSet, len(processedLabelsMap))
320
for k, v := range processedLabelsMap {
321
if k[0:2] == "__" {
322
continue
323
}
324
325
lbls[model.LabelName(k)] = model.LabelValue(v)
326
}
327
if len(lbls) == 0 {
328
// No labels, drop journal entry
329
level.Debug(t.logger).Log("msg", "received journal entry with no labels", "unit", entry.Fields["_SYSTEMD_UNIT"])
330
t.metrics.journalErrors.WithLabelValues(emptyLabelsError).Inc()
331
return journalEmptyStr, nil
332
}
333
334
t.metrics.journalLines.Inc()
335
t.positions.PutString(t.positionPath, "", entry.Cursor)
336
t.handler.Chan() <- loki.Entry{
337
Labels: lbls,
338
Entry: logproto.Entry{
339
Line: msg,
340
Timestamp: ts,
341
},
342
}
343
return journalEmptyStr, nil
344
}
345
346
// Type returns JournalTargetType.
347
func (t *JournalTarget) Type() target.TargetType {
348
return target.JournalTargetType
349
}
350
351
// Ready indicates whether or not the journal is ready to be
352
// read from.
353
func (t *JournalTarget) Ready() bool {
354
return true
355
}
356
357
// DiscoveredLabels returns the set of labels discovered by
358
// the JournalTarget, which is always nil. Implements
359
// Target.
360
func (t *JournalTarget) DiscoveredLabels() model.LabelSet {
361
return nil
362
}
363
364
// Labels returns the set of labels that statically apply to
365
// all log entries produced by the JournalTarget.
366
func (t *JournalTarget) Labels() model.LabelSet {
367
return t.labels
368
}
369
370
// Details returns target-specific details.
371
func (t *JournalTarget) Details() interface{} {
372
return map[string]string{
373
"position": t.positions.GetString(t.positionPath, ""),
374
}
375
}
376
377
// Stop shuts down the JournalTarget.
378
func (t *JournalTarget) Stop() error {
379
t.until <- time.Now()
380
err := t.r.Close()
381
t.handler.Stop()
382
return err
383
}
384
385
func makeJournalFields(fields map[string]string) map[string]string {
386
result := make(map[string]string, len(fields))
387
for k, v := range fields {
388
if k == "PRIORITY" {
389
result[fmt.Sprintf("__journal_%s_%s", strings.ToLower(k), "keyword")] = makeJournalPriority(v)
390
}
391
result[fmt.Sprintf("__journal_%s", strings.ToLower(k))] = v
392
}
393
return result
394
}
395
396
func makeJournalPriority(priority string) string {
397
switch priority {
398
case "0":
399
return "emerg"
400
case "1":
401
return "alert"
402
case "2":
403
return "crit"
404
case "3":
405
return "error"
406
case "4":
407
return "warning"
408
case "5":
409
return "notice"
410
case "6":
411
return "info"
412
case "7":
413
return "debug"
414
}
415
return priority
416
}
417
418