Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/file/tailer.go
4096 views
1
package file
2
3
// This code is copied from Promtail. tailer implements the reader interface by
4
// using the github.com/grafana/tail package to tail files.
5
6
import (
7
"fmt"
8
"os"
9
"sync"
10
"time"
11
12
"github.com/go-kit/log"
13
"github.com/go-kit/log/level"
14
"github.com/grafana/agent/component/common/loki"
15
"github.com/grafana/agent/component/common/loki/positions"
16
"github.com/grafana/loki/pkg/logproto"
17
"github.com/grafana/loki/pkg/util"
18
"github.com/grafana/tail"
19
"github.com/prometheus/common/model"
20
"go.uber.org/atomic"
21
"golang.org/x/text/encoding"
22
"golang.org/x/text/encoding/ianaindex"
23
"golang.org/x/text/transform"
24
)
25
26
type tailer struct {
27
metrics *metrics
28
logger log.Logger
29
handler loki.EntryHandler
30
positions positions.Positions
31
32
path string
33
labels string
34
tail *tail.Tail
35
36
posAndSizeMtx sync.Mutex
37
stopOnce sync.Once
38
39
running *atomic.Bool
40
posquit chan struct{}
41
posdone chan struct{}
42
done chan struct{}
43
44
decoder *encoding.Decoder
45
}
46
47
func newTailer(metrics *metrics, logger log.Logger, handler loki.EntryHandler, positions positions.Positions, path string, labels string, encoding string) (*tailer, error) {
48
// Simple check to make sure the file we are tailing doesn't
49
// have a position already saved which is past the end of the file.
50
fi, err := os.Stat(path)
51
if err != nil {
52
return nil, err
53
}
54
pos, err := positions.Get(path, labels)
55
if err != nil {
56
return nil, err
57
}
58
59
if fi.Size() < pos {
60
positions.Remove(path, labels)
61
}
62
63
tail, err := tail.TailFile(path, tail.Config{
64
Follow: true,
65
Poll: true,
66
ReOpen: true,
67
MustExist: true,
68
Location: &tail.SeekInfo{
69
Offset: pos,
70
Whence: 0,
71
},
72
Logger: util.NewLogAdapter(logger),
73
})
74
if err != nil {
75
return nil, err
76
}
77
78
logger = log.With(logger, "component", "tailer")
79
tailer := &tailer{
80
metrics: metrics,
81
logger: logger,
82
handler: loki.AddLabelsMiddleware(model.LabelSet{filenameLabel: model.LabelValue(path)}).Wrap(handler),
83
positions: positions,
84
path: path,
85
labels: labels,
86
tail: tail,
87
running: atomic.NewBool(false),
88
posquit: make(chan struct{}),
89
posdone: make(chan struct{}),
90
done: make(chan struct{}),
91
}
92
93
if encoding != "" {
94
level.Info(tailer.logger).Log("msg", "Will decode messages", "from", encoding, "to", "UTF8")
95
encoder, err := ianaindex.IANA.Encoding(encoding)
96
if err != nil {
97
return nil, fmt.Errorf("failed to get IANA encoding %s: %w", encoding, err)
98
}
99
decoder := encoder.NewDecoder()
100
tailer.decoder = decoder
101
}
102
103
go tailer.readLines()
104
go tailer.updatePosition()
105
metrics.filesActive.Add(1.)
106
return tailer, nil
107
}
108
109
// updatePosition is run in a goroutine and checks the current size of the file
110
// and saves it to the positions file at a regular interval. If there is ever
111
// an error it stops the tailer and exits, the tailer will be re-opened by the
112
// filetarget sync method if it still exists and will start reading from the
113
// last successful entry in the positions file.
114
func (t *tailer) updatePosition() {
115
positionSyncPeriod := t.positions.SyncPeriod()
116
positionWait := time.NewTicker(positionSyncPeriod)
117
defer func() {
118
positionWait.Stop()
119
level.Info(t.logger).Log("msg", "position timer: exited", "path", t.path)
120
close(t.posdone)
121
}()
122
123
for {
124
select {
125
case <-positionWait.C:
126
err := t.MarkPositionAndSize()
127
if err != nil {
128
level.Error(t.logger).Log("msg", "position timer: error getting tail position and/or size, stopping tailer", "path", t.path, "error", err)
129
err := t.tail.Stop()
130
if err != nil {
131
level.Error(t.logger).Log("msg", "position timer: error stopping tailer", "path", t.path, "error", err)
132
}
133
return
134
}
135
case <-t.posquit:
136
return
137
}
138
}
139
}
140
141
// readLines runs in a goroutine and consumes the t.tail.Lines channel from the
142
// underlying tailer. Et will only exit when that channel is closed. This is
143
// important to avoid a deadlock in the underlying tailer which can happen if
144
// there are unread lines in this channel and the Stop method on the tailer is
145
// called, the underlying tailer will never exit if there are unread lines in
146
// the t.tail.Lines channel
147
func (t *tailer) readLines() {
148
level.Info(t.logger).Log("msg", "tail routine: started", "path", t.path)
149
150
t.running.Store(true)
151
152
// This function runs in a goroutine, if it exits this tailer will never do any more tailing.
153
// Clean everything up.
154
defer func() {
155
t.cleanupMetrics()
156
t.running.Store(false)
157
level.Info(t.logger).Log("msg", "tail routine: exited", "path", t.path)
158
close(t.done)
159
}()
160
entries := t.handler.Chan()
161
for {
162
line, ok := <-t.tail.Lines
163
if !ok {
164
level.Info(t.logger).Log("msg", "tail routine: tail channel closed, stopping tailer", "path", t.path, "reason", t.tail.Tomb.Err())
165
return
166
}
167
168
// Note currently the tail implementation hardcodes Err to nil, this should never hit.
169
if line.Err != nil {
170
level.Error(t.logger).Log("msg", "tail routine: error reading line", "path", t.path, "error", line.Err)
171
continue
172
}
173
174
var text string
175
if t.decoder != nil {
176
var err error
177
text, err = t.convertToUTF8(line.Text)
178
if err != nil {
179
level.Debug(t.logger).Log("msg", "failed to convert encoding", "error", err)
180
t.metrics.encodingFailures.WithLabelValues(t.path).Inc()
181
text = fmt.Sprintf("the requested encoding conversion for this line failed in Grafana Agent Flow: %s", err.Error())
182
}
183
} else {
184
text = line.Text
185
}
186
187
t.metrics.readLines.WithLabelValues(t.path).Inc()
188
entries <- loki.Entry{
189
Labels: model.LabelSet{},
190
Entry: logproto.Entry{
191
Timestamp: line.Time,
192
Line: text,
193
},
194
}
195
}
196
}
197
198
func (t *tailer) MarkPositionAndSize() error {
199
// Lock this update as there are 2 timers calling this routine, the sync in filetarget and the positions sync in this file.
200
t.posAndSizeMtx.Lock()
201
defer t.posAndSizeMtx.Unlock()
202
203
size, err := t.tail.Size()
204
if err != nil {
205
// If the file no longer exists, no need to save position information
206
if err == os.ErrNotExist {
207
level.Info(t.logger).Log("msg", "skipping update of position for a file which does not currently exist", "path", t.path)
208
return nil
209
}
210
return err
211
}
212
t.metrics.totalBytes.WithLabelValues(t.path).Set(float64(size))
213
214
pos, err := t.tail.Tell()
215
if err != nil {
216
return err
217
}
218
t.metrics.readBytes.WithLabelValues(t.path).Set(float64(pos))
219
t.positions.Put(t.path, t.labels, pos)
220
221
return nil
222
}
223
224
func (t *tailer) Stop() {
225
// stop can be called by two separate threads in filetarget, to avoid a panic closing channels more than once
226
// we wrap the stop in a sync.Once.
227
t.stopOnce.Do(func() {
228
// Shut down the position marker thread
229
close(t.posquit)
230
<-t.posdone
231
232
// Save the current position before shutting down tailer
233
err := t.MarkPositionAndSize()
234
if err != nil {
235
level.Error(t.logger).Log("msg", "error marking file position when stopping tailer", "path", t.path, "error", err)
236
}
237
238
// Stop the underlying tailer
239
err = t.tail.Stop()
240
if err != nil {
241
level.Error(t.logger).Log("msg", "error stopping tailer", "path", t.path, "error", err)
242
}
243
// Wait for readLines() to consume all the remaining messages and exit when the channel is closed
244
<-t.done
245
level.Info(t.logger).Log("msg", "stopped tailing file", "path", t.path)
246
t.handler.Stop()
247
})
248
}
249
250
func (t *tailer) IsRunning() bool {
251
return t.running.Load()
252
}
253
254
func (t *tailer) convertToUTF8(text string) (string, error) {
255
res, _, err := transform.String(t.decoder, text)
256
if err != nil {
257
return "", fmt.Errorf("failed to decode text to UTF8: %w", err)
258
}
259
260
return res, nil
261
}
262
263
// cleanupMetrics removes all metrics exported by this tailer
264
func (t *tailer) cleanupMetrics() {
265
// When we stop tailing the file, also un-export metrics related to the file
266
t.metrics.filesActive.Add(-1.)
267
t.metrics.readLines.DeleteLabelValues(t.path)
268
t.metrics.readBytes.DeleteLabelValues(t.path)
269
t.metrics.totalBytes.DeleteLabelValues(t.path)
270
}
271
272
func (t *tailer) Path() string {
273
return t.path
274
}
275
276