Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/file/file.go
4096 views
1
package file
2
3
import (
4
"context"
5
"fmt"
6
"os"
7
"path/filepath"
8
"strings"
9
"sync"
10
"time"
11
12
"github.com/go-kit/log/level"
13
"github.com/grafana/agent/component"
14
"github.com/grafana/agent/component/common/loki"
15
"github.com/grafana/agent/component/common/loki/positions"
16
"github.com/grafana/agent/component/discovery"
17
"github.com/prometheus/common/model"
18
)
19
20
func init() {
21
component.Register(component.Registration{
22
Name: "loki.source.file",
23
Args: Arguments{},
24
25
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
26
return New(opts, args.(Arguments))
27
},
28
})
29
}
30
31
const (
32
pathLabel = "__path__"
33
filenameLabel = "filename"
34
)
35
36
// Arguments holds values which are used to configure the loki.source.file
37
// component.
38
// TODO(@tpaschalis) Allow users to configure the encoding of the tailed files.
39
type Arguments struct {
40
Targets []discovery.Target `river:"targets,attr"`
41
ForwardTo []loki.LogsReceiver `river:"forward_to,attr"`
42
}
43
44
var (
45
_ component.Component = (*Component)(nil)
46
)
47
48
// Component implements the loki.source.file component.
49
type Component struct {
50
opts component.Options
51
metrics *metrics
52
53
updateMut sync.Mutex
54
55
mut sync.RWMutex
56
args Arguments
57
handler loki.LogsReceiver
58
receivers []loki.LogsReceiver
59
posFile positions.Positions
60
readers map[positions.Entry]reader
61
}
62
63
// New creates a new loki.source.file component.
64
func New(o component.Options, args Arguments) (*Component, error) {
65
err := os.MkdirAll(o.DataPath, 0750)
66
if err != nil && !os.IsExist(err) {
67
return nil, err
68
}
69
positionsFile, err := positions.New(o.Logger, positions.Config{
70
SyncPeriod: 10 * time.Second,
71
PositionsFile: filepath.Join(o.DataPath, "positions.yml"),
72
IgnoreInvalidYaml: false,
73
ReadOnly: false,
74
})
75
if err != nil {
76
return nil, err
77
}
78
79
c := &Component{
80
opts: o,
81
metrics: newMetrics(o.Registerer),
82
83
handler: make(loki.LogsReceiver),
84
receivers: args.ForwardTo,
85
posFile: positionsFile,
86
readers: make(map[positions.Entry]reader),
87
}
88
89
// Call to Update() to start readers and set receivers once at the start.
90
if err := c.Update(args); err != nil {
91
return nil, err
92
}
93
94
return c, nil
95
}
96
97
// Run implements component.Component.
98
// TODO(@tpaschalis). Should we periodically re-check? What happens if a target
99
// comes alive _after_ it's been passed to us and we never receive another
100
// Update()? Or should it be a responsibility of the discovery component?
101
func (c *Component) Run(ctx context.Context) error {
102
defer func() {
103
level.Info(c.opts.Logger).Log("msg", "loki.source.file component shutting down, stopping readers and positions file")
104
c.mut.RLock()
105
for _, r := range c.readers {
106
r.Stop()
107
}
108
c.posFile.Stop()
109
close(c.handler)
110
c.mut.RUnlock()
111
}()
112
113
for {
114
select {
115
case <-ctx.Done():
116
return nil
117
case entry := <-c.handler:
118
c.mut.RLock()
119
for _, receiver := range c.receivers {
120
receiver <- entry
121
}
122
c.mut.RUnlock()
123
}
124
}
125
}
126
127
// Update implements component.Component.
128
func (c *Component) Update(args component.Arguments) error {
129
c.updateMut.Lock()
130
defer c.updateMut.Unlock()
131
132
// Stop all readers so we can recreate them below. This *must* be done before
133
// c.mut is held to avoid a race condition where stopping a reader is
134
// flushing its data, but the flush never succeeds because the Run goroutine
135
// fails to get a read lock.
136
//
137
// Stopping the readers avoids the issue we saw with stranded wrapped
138
// handlers staying behind until they were GC'ed and sending duplicate
139
// message to the global handler. It also makes sure that we update
140
// everything with the new labels. Simply zeroing out the c.readers map did
141
// not work correctly to shut down the wrapped handlers in time.
142
//
143
// TODO (@tpaschalis) We should be able to optimize this somehow and eg.
144
// cache readers for paths we already know about, and whose labels have not
145
// changed. Once we do that we should:
146
//
147
// * Call to c.pruneStoppedReaders to give cached but errored readers a
148
// chance to restart.
149
// * Stop tailing any files that were no longer in the new targets
150
// and conditionally remove their readers only by calling toStopTailing
151
// and c.stopTailingAndRemovePosition.
152
oldPaths := c.stopReaders()
153
154
newArgs := args.(Arguments)
155
156
c.mut.Lock()
157
defer c.mut.Unlock()
158
c.args = newArgs
159
c.receivers = newArgs.ForwardTo
160
161
c.readers = make(map[positions.Entry]reader)
162
163
if len(newArgs.Targets) == 0 {
164
level.Debug(c.opts.Logger).Log("msg", "no files targets were passed, nothing will be tailed")
165
return nil
166
}
167
168
for _, target := range newArgs.Targets {
169
path := target[pathLabel]
170
171
var labels = make(model.LabelSet)
172
for k, v := range target {
173
if strings.HasPrefix(k, model.ReservedLabelPrefix) {
174
continue
175
}
176
labels[model.LabelName(k)] = model.LabelValue(v)
177
}
178
179
// Deduplicate targets which have the same public label set.
180
readersKey := positions.Entry{Path: path, Labels: labels.String()}
181
if _, exist := c.readers[readersKey]; exist {
182
continue
183
}
184
185
c.reportSize(path, labels.String())
186
187
handler := loki.AddLabelsMiddleware(labels).Wrap(loki.NewEntryHandler(c.handler, func() {}))
188
reader, err := c.startTailing(path, labels, handler)
189
if err != nil {
190
continue
191
}
192
193
c.readers[readersKey] = readerWithHandler{
194
reader: reader,
195
handler: handler,
196
}
197
}
198
199
// Remove from the positions file any entries that had a Reader before, but
200
// are no longer in the updated set of Targets.
201
for r := range missing(c.readers, oldPaths) {
202
c.posFile.Remove(r.Path, r.Labels)
203
}
204
205
return nil
206
}
207
208
// readerWithHandler combines a reader with an entry handler associated with
209
// it. Closing the reader will also close the handler.
210
type readerWithHandler struct {
211
reader
212
handler loki.EntryHandler
213
}
214
215
func (r readerWithHandler) Stop() {
216
r.reader.Stop()
217
r.handler.Stop()
218
}
219
220
// stopReaders stops existing readers and returns the set of paths which were
221
// stopped.
222
func (c *Component) stopReaders() map[positions.Entry]struct{} {
223
c.mut.RLock()
224
defer c.mut.RUnlock()
225
226
stoppedPaths := make(map[positions.Entry]struct{}, len(c.readers))
227
228
for p, r := range c.readers {
229
stoppedPaths[p] = struct{}{}
230
r.Stop()
231
}
232
233
return stoppedPaths
234
}
235
236
// DebugInfo returns information about the status of tailed targets.
237
// TODO(@tpaschalis) Decorate with more debug information once it's made
238
// available, such as the last time a log line was read.
239
func (c *Component) DebugInfo() interface{} {
240
var res readerDebugInfo
241
for e, reader := range c.readers {
242
offset, _ := c.posFile.Get(e.Path, e.Labels)
243
res.TargetsInfo = append(res.TargetsInfo, targetInfo{
244
Path: e.Path,
245
Labels: e.Labels,
246
IsRunning: reader.IsRunning(),
247
ReadOffset: offset,
248
})
249
}
250
return res
251
}
252
253
type readerDebugInfo struct {
254
TargetsInfo []targetInfo `river:"targets_info,block"`
255
}
256
257
type targetInfo struct {
258
Path string `river:"path,attr"`
259
Labels string `river:"labels,attr"`
260
IsRunning bool `river:"is_running,attr"`
261
ReadOffset int64 `river:"read_offset,attr"`
262
}
263
264
// Returns the elements from set b which are missing from set a
265
func missing(as map[positions.Entry]reader, bs map[positions.Entry]struct{}) map[positions.Entry]struct{} {
266
c := map[positions.Entry]struct{}{}
267
for a := range bs {
268
if _, ok := as[a]; !ok {
269
c[a] = struct{}{}
270
}
271
}
272
return c
273
}
274
275
// startTailing starts and returns a reader for the given path. For most files,
276
// this will be a tailer implementation. If the file suffix alludes to it being
277
// a compressed file, then a decompressor will be started instead.
278
func (c *Component) startTailing(path string, labels model.LabelSet, handler loki.EntryHandler) (reader, error) {
279
fi, err := os.Stat(path)
280
if err != nil {
281
level.Error(c.opts.Logger).Log("msg", "failed to tail file, stat failed", "error", err, "filename", path)
282
c.metrics.totalBytes.DeleteLabelValues(path)
283
return nil, fmt.Errorf("failed to stat path %s", path)
284
}
285
286
if fi.IsDir() {
287
level.Info(c.opts.Logger).Log("msg", "failed to tail file", "error", "file is a directory", "filename", path)
288
c.metrics.totalBytes.DeleteLabelValues(path)
289
return nil, fmt.Errorf("failed to tail file, it was a directory %s", path)
290
}
291
292
var reader reader
293
if isCompressed(path) {
294
level.Debug(c.opts.Logger).Log("msg", "reading from compressed file", "filename", path)
295
decompressor, err := newDecompressor(
296
c.metrics,
297
c.opts.Logger,
298
handler,
299
c.posFile,
300
path,
301
labels.String(),
302
"",
303
)
304
if err != nil {
305
level.Error(c.opts.Logger).Log("msg", "failed to start decompressor", "error", err, "filename", path)
306
return nil, fmt.Errorf("failed to start decompressor %s", err)
307
}
308
reader = decompressor
309
} else {
310
level.Debug(c.opts.Logger).Log("msg", "tailing new file", "filename", path)
311
tailer, err := newTailer(
312
c.metrics,
313
c.opts.Logger,
314
handler,
315
c.posFile,
316
path,
317
labels.String(),
318
"",
319
)
320
if err != nil {
321
level.Error(c.opts.Logger).Log("msg", "failed to start tailer", "error", err, "filename", path)
322
return nil, fmt.Errorf("failed to start tailer %s", err)
323
}
324
reader = tailer
325
}
326
327
return reader, nil
328
}
329
330
func (c *Component) reportSize(path, labels string) {
331
// Ask the reader to update the size if a reader exists, this keeps
332
// position and size metrics in sync.
333
if reader, ok := c.readers[positions.Entry{Path: path, Labels: labels}]; ok {
334
err := reader.MarkPositionAndSize()
335
if err != nil {
336
level.Warn(c.opts.Logger).Log("msg", "failed to get file size from existing reader, ", "file", path, "error", err)
337
return
338
}
339
} else {
340
// Must be a new file, just directly read the size of it
341
fi, err := os.Stat(path)
342
if err != nil {
343
return
344
}
345
c.metrics.totalBytes.WithLabelValues(path).Set(float64(fi.Size()))
346
}
347
}
348
349