Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/file/decompresser.go
4096 views
1
package file
2
3
// This code is copied from Promtail. decompressor implements the reader
4
// interface and is used to read compressed log files. It uses the Go stdlib's
5
// compress/* packages for decoding.
6
7
import (
8
"bufio"
9
"compress/bzip2"
10
"compress/gzip"
11
"compress/zlib"
12
"fmt"
13
"io"
14
"os"
15
"path/filepath"
16
"strings"
17
"sync"
18
"time"
19
"unsafe"
20
21
"github.com/go-kit/log"
22
"github.com/go-kit/log/level"
23
"github.com/grafana/agent/component/common/loki"
24
"github.com/grafana/agent/component/common/loki/positions"
25
"github.com/grafana/loki/pkg/logproto"
26
"github.com/prometheus/common/model"
27
"go.uber.org/atomic"
28
"golang.org/x/text/encoding"
29
"golang.org/x/text/encoding/ianaindex"
30
"golang.org/x/text/transform"
31
)
32
33
func supportedCompressedFormats() map[string]struct{} {
34
return map[string]struct{}{
35
".gz": {},
36
".tar.gz": {},
37
".z": {},
38
".bz2": {},
39
// TODO: add support for .zip extension.
40
}
41
}
42
43
type decompressor struct {
44
metrics *metrics
45
logger log.Logger
46
handler loki.EntryHandler
47
positions positions.Positions
48
49
path string
50
labels string
51
52
posAndSizeMtx sync.Mutex
53
stopOnce sync.Once
54
55
running *atomic.Bool
56
posquit chan struct{}
57
posdone chan struct{}
58
done chan struct{}
59
60
decoder *encoding.Decoder
61
62
position int64
63
size int64
64
}
65
66
func newDecompressor(metrics *metrics, logger log.Logger, handler loki.EntryHandler, positions positions.Positions, path string, labels string, encodingFormat string) (*decompressor, error) {
67
logger = log.With(logger, "component", "decompressor")
68
69
pos, err := positions.Get(path, labels)
70
if err != nil {
71
return nil, fmt.Errorf("failed to get positions: %w", err)
72
}
73
74
var decoder *encoding.Decoder
75
if encodingFormat != "" {
76
level.Info(logger).Log("msg", "decompressor will decode messages", "from", encodingFormat, "to", "UTF8")
77
encoder, err := ianaindex.IANA.Encoding(encodingFormat)
78
if err != nil {
79
return nil, fmt.Errorf("failed to get IANA encoding %s: %w", encodingFormat, err)
80
}
81
decoder = encoder.NewDecoder()
82
}
83
84
decompressor := &decompressor{
85
metrics: metrics,
86
logger: logger,
87
handler: loki.AddLabelsMiddleware(model.LabelSet{filenameLabel: model.LabelValue(path)}).Wrap(handler),
88
positions: positions,
89
path: path,
90
labels: labels,
91
running: atomic.NewBool(false),
92
posquit: make(chan struct{}),
93
posdone: make(chan struct{}),
94
done: make(chan struct{}),
95
position: pos,
96
decoder: decoder,
97
}
98
99
go decompressor.readLines()
100
go decompressor.updatePosition()
101
metrics.filesActive.Add(1.)
102
return decompressor, nil
103
}
104
105
// mountReader instantiate a reader ready to be used by the decompressor.
106
//
107
// The selected reader implementation is based on the extension of the given file name.
108
// It'll error if the extension isn't supported.
109
func mountReader(f *os.File, logger log.Logger) (reader io.Reader, err error) {
110
ext := filepath.Ext(f.Name())
111
var decompressLib string
112
113
if strings.Contains(ext, "gz") { // .gz, .tar.gz
114
decompressLib = "compress/gzip"
115
reader, err = gzip.NewReader(f)
116
} else if ext == ".z" {
117
decompressLib = "compress/zlib"
118
reader, err = zlib.NewReader(f)
119
} else if ext == ".bz2" {
120
decompressLib = "bzip2"
121
reader = bzip2.NewReader(f)
122
}
123
// TODO: add support for .zip extension.
124
125
level.Debug(logger).Log("msg", fmt.Sprintf("using %q to decompress file %q", decompressLib, f.Name()))
126
127
if reader != nil {
128
return reader, nil
129
}
130
131
if err != nil && err != io.EOF {
132
return nil, err
133
}
134
135
supportedExtsList := strings.Builder{}
136
for ext := range supportedCompressedFormats() {
137
supportedExtsList.WriteString(ext)
138
}
139
return nil, fmt.Errorf("file %q has unsupported extension, it has to be one of %q", f.Name(), supportedExtsList.String())
140
}
141
142
func (d *decompressor) updatePosition() {
143
positionSyncPeriod := d.positions.SyncPeriod()
144
positionWait := time.NewTicker(positionSyncPeriod)
145
defer func() {
146
positionWait.Stop()
147
level.Info(d.logger).Log("msg", "position timer: exited", "path", d.path)
148
close(d.posdone)
149
}()
150
151
for {
152
select {
153
case <-positionWait.C:
154
if err := d.MarkPositionAndSize(); err != nil {
155
level.Error(d.logger).Log("msg", "position timer: error getting position and/or size, stopping decompressor", "path", d.path, "error", err)
156
return
157
}
158
case <-d.posquit:
159
return
160
}
161
}
162
}
163
164
// readLines read all existing lines of the given compressed file.
165
//
166
// It first decompresses the file as a whole using a reader and then it will iterate
167
// over its chunks, separated by '\n'.
168
// During each iteration, the parsed and decoded log line is then sent to the API with the current timestamp.
169
func (d *decompressor) readLines() {
170
level.Info(d.logger).Log("msg", "read lines routine: started", "path", d.path)
171
d.running.Store(true)
172
173
defer func() {
174
d.cleanupMetrics()
175
level.Info(d.logger).Log("msg", "read lines routine finished", "path", d.path)
176
close(d.done)
177
}()
178
entries := d.handler.Chan()
179
180
f, err := os.Open(d.path)
181
if err != nil {
182
level.Error(d.logger).Log("msg", "error reading file", "path", d.path, "error", err)
183
return
184
}
185
defer f.Close()
186
187
r, err := mountReader(f, d.logger)
188
if err != nil {
189
level.Error(d.logger).Log("msg", "error mounting new reader", "err", err)
190
return
191
}
192
193
level.Info(d.logger).Log("msg", "successfully mounted reader", "path", d.path, "ext", filepath.Ext(d.path))
194
195
bufferSize := 4096
196
buffer := make([]byte, bufferSize)
197
maxLoglineSize := 2000000 // 2 MB
198
scanner := bufio.NewScanner(r)
199
scanner.Buffer(buffer, maxLoglineSize)
200
for line := 1; ; line++ {
201
if !scanner.Scan() {
202
break
203
}
204
205
if scannerErr := scanner.Err(); scannerErr != nil {
206
if scannerErr != io.EOF {
207
level.Error(d.logger).Log("msg", "error scanning", "err", scannerErr)
208
}
209
210
break
211
}
212
213
if line <= int(d.position) {
214
// skip already seen lines.
215
continue
216
}
217
218
text := scanner.Text()
219
var finalText string
220
if d.decoder != nil {
221
var err error
222
finalText, err = d.convertToUTF8(text)
223
if err != nil {
224
level.Debug(d.logger).Log("msg", "failed to convert encoding", "error", err)
225
d.metrics.encodingFailures.WithLabelValues(d.path).Inc()
226
finalText = fmt.Sprintf("the requested encoding conversion for this line failed in Grafana Agent: %s", err.Error())
227
}
228
} else {
229
finalText = text
230
}
231
232
d.metrics.readLines.WithLabelValues(d.path).Inc()
233
234
entries <- loki.Entry{
235
Labels: model.LabelSet{},
236
Entry: logproto.Entry{
237
Timestamp: time.Now(),
238
Line: finalText,
239
},
240
}
241
242
d.size = int64(unsafe.Sizeof(finalText))
243
d.position++
244
}
245
}
246
247
func (d *decompressor) MarkPositionAndSize() error {
248
// Lock this update as there are 2 timers calling this routine, the sync in filetarget and the positions sync in this file.
249
d.posAndSizeMtx.Lock()
250
defer d.posAndSizeMtx.Unlock()
251
252
d.metrics.totalBytes.WithLabelValues(d.path).Set(float64(d.size))
253
d.metrics.readBytes.WithLabelValues(d.path).Set(float64(d.position))
254
d.positions.Put(d.path, d.labels, d.position)
255
256
return nil
257
}
258
259
func (d *decompressor) Stop() {
260
// stop can be called by two separate threads in filetarget, to avoid a panic closing channels more than once
261
// we wrap the stop in a sync.Once.
262
d.stopOnce.Do(func() {
263
// Shut down the position marker thread
264
close(d.posquit)
265
<-d.posdone
266
267
// Save the current position before shutting down reader
268
if err := d.MarkPositionAndSize(); err != nil {
269
level.Error(d.logger).Log("msg", "error marking file position when stopping decompressor", "path", d.path, "error", err)
270
}
271
272
// Wait for readLines() to consume all the remaining messages and exit when the channel is closed
273
<-d.done
274
level.Info(d.logger).Log("msg", "stopped decompressor", "path", d.path)
275
d.handler.Stop()
276
})
277
}
278
279
func (d *decompressor) IsRunning() bool {
280
return d.running.Load()
281
}
282
283
func (d *decompressor) convertToUTF8(text string) (string, error) {
284
res, _, err := transform.String(d.decoder, text)
285
if err != nil {
286
return "", fmt.Errorf("failed to decode text to UTF8: %w", err)
287
}
288
289
return res, nil
290
}
291
292
// cleanupMetrics removes all metrics exported by this reader
293
func (d *decompressor) cleanupMetrics() {
294
// When we stop tailing the file, un-export metrics related to the file.
295
d.metrics.filesActive.Add(-1.)
296
d.metrics.readLines.DeleteLabelValues(d.path)
297
d.metrics.readBytes.DeleteLabelValues(d.path)
298
d.metrics.totalBytes.DeleteLabelValues(d.path)
299
}
300
301
func (d *decompressor) Path() string {
302
return d.path
303
}
304
305
func isCompressed(p string) bool {
306
ext := filepath.Ext(p)
307
308
for format := range supportedCompressedFormats() {
309
if ext == format {
310
return true
311
}
312
}
313
314
return false
315
}
316
317