Path: blob/main/component/loki/source/file/decompresser.go
4096 views
package file12// This code is copied from Promtail. decompressor implements the reader3// interface and is used to read compressed log files. It uses the Go stdlib's4// compress/* packages for decoding.56import (7"bufio"8"compress/bzip2"9"compress/gzip"10"compress/zlib"11"fmt"12"io"13"os"14"path/filepath"15"strings"16"sync"17"time"18"unsafe"1920"github.com/go-kit/log"21"github.com/go-kit/log/level"22"github.com/grafana/agent/component/common/loki"23"github.com/grafana/agent/component/common/loki/positions"24"github.com/grafana/loki/pkg/logproto"25"github.com/prometheus/common/model"26"go.uber.org/atomic"27"golang.org/x/text/encoding"28"golang.org/x/text/encoding/ianaindex"29"golang.org/x/text/transform"30)3132func supportedCompressedFormats() map[string]struct{} {33return map[string]struct{}{34".gz": {},35".tar.gz": {},36".z": {},37".bz2": {},38// TODO: add support for .zip extension.39}40}4142type decompressor struct {43metrics *metrics44logger log.Logger45handler loki.EntryHandler46positions positions.Positions4748path string49labels string5051posAndSizeMtx sync.Mutex52stopOnce sync.Once5354running *atomic.Bool55posquit chan struct{}56posdone chan struct{}57done chan struct{}5859decoder *encoding.Decoder6061position int6462size int6463}6465func newDecompressor(metrics *metrics, logger log.Logger, handler loki.EntryHandler, positions positions.Positions, path string, labels string, encodingFormat string) (*decompressor, error) {66logger = log.With(logger, "component", "decompressor")6768pos, err := positions.Get(path, labels)69if err != nil {70return nil, fmt.Errorf("failed to get positions: %w", err)71}7273var decoder *encoding.Decoder74if encodingFormat != "" {75level.Info(logger).Log("msg", "decompressor will decode messages", "from", encodingFormat, "to", "UTF8")76encoder, err := ianaindex.IANA.Encoding(encodingFormat)77if err != nil {78return nil, fmt.Errorf("failed to get IANA encoding %s: %w", encodingFormat, err)79}80decoder = encoder.NewDecoder()81}8283decompressor := &decompressor{84metrics: metrics,85logger: logger,86handler: loki.AddLabelsMiddleware(model.LabelSet{filenameLabel: model.LabelValue(path)}).Wrap(handler),87positions: positions,88path: path,89labels: labels,90running: atomic.NewBool(false),91posquit: make(chan struct{}),92posdone: make(chan struct{}),93done: make(chan struct{}),94position: pos,95decoder: decoder,96}9798go decompressor.readLines()99go decompressor.updatePosition()100metrics.filesActive.Add(1.)101return decompressor, nil102}103104// mountReader instantiate a reader ready to be used by the decompressor.105//106// The selected reader implementation is based on the extension of the given file name.107// It'll error if the extension isn't supported.108func mountReader(f *os.File, logger log.Logger) (reader io.Reader, err error) {109ext := filepath.Ext(f.Name())110var decompressLib string111112if strings.Contains(ext, "gz") { // .gz, .tar.gz113decompressLib = "compress/gzip"114reader, err = gzip.NewReader(f)115} else if ext == ".z" {116decompressLib = "compress/zlib"117reader, err = zlib.NewReader(f)118} else if ext == ".bz2" {119decompressLib = "bzip2"120reader = bzip2.NewReader(f)121}122// TODO: add support for .zip extension.123124level.Debug(logger).Log("msg", fmt.Sprintf("using %q to decompress file %q", decompressLib, f.Name()))125126if reader != nil {127return reader, nil128}129130if err != nil && err != io.EOF {131return nil, err132}133134supportedExtsList := strings.Builder{}135for ext := range supportedCompressedFormats() {136supportedExtsList.WriteString(ext)137}138return nil, fmt.Errorf("file %q has unsupported extension, it has to be one of %q", f.Name(), supportedExtsList.String())139}140141func (d *decompressor) updatePosition() {142positionSyncPeriod := d.positions.SyncPeriod()143positionWait := time.NewTicker(positionSyncPeriod)144defer func() {145positionWait.Stop()146level.Info(d.logger).Log("msg", "position timer: exited", "path", d.path)147close(d.posdone)148}()149150for {151select {152case <-positionWait.C:153if err := d.MarkPositionAndSize(); err != nil {154level.Error(d.logger).Log("msg", "position timer: error getting position and/or size, stopping decompressor", "path", d.path, "error", err)155return156}157case <-d.posquit:158return159}160}161}162163// readLines read all existing lines of the given compressed file.164//165// It first decompresses the file as a whole using a reader and then it will iterate166// over its chunks, separated by '\n'.167// During each iteration, the parsed and decoded log line is then sent to the API with the current timestamp.168func (d *decompressor) readLines() {169level.Info(d.logger).Log("msg", "read lines routine: started", "path", d.path)170d.running.Store(true)171172defer func() {173d.cleanupMetrics()174level.Info(d.logger).Log("msg", "read lines routine finished", "path", d.path)175close(d.done)176}()177entries := d.handler.Chan()178179f, err := os.Open(d.path)180if err != nil {181level.Error(d.logger).Log("msg", "error reading file", "path", d.path, "error", err)182return183}184defer f.Close()185186r, err := mountReader(f, d.logger)187if err != nil {188level.Error(d.logger).Log("msg", "error mounting new reader", "err", err)189return190}191192level.Info(d.logger).Log("msg", "successfully mounted reader", "path", d.path, "ext", filepath.Ext(d.path))193194bufferSize := 4096195buffer := make([]byte, bufferSize)196maxLoglineSize := 2000000 // 2 MB197scanner := bufio.NewScanner(r)198scanner.Buffer(buffer, maxLoglineSize)199for line := 1; ; line++ {200if !scanner.Scan() {201break202}203204if scannerErr := scanner.Err(); scannerErr != nil {205if scannerErr != io.EOF {206level.Error(d.logger).Log("msg", "error scanning", "err", scannerErr)207}208209break210}211212if line <= int(d.position) {213// skip already seen lines.214continue215}216217text := scanner.Text()218var finalText string219if d.decoder != nil {220var err error221finalText, err = d.convertToUTF8(text)222if err != nil {223level.Debug(d.logger).Log("msg", "failed to convert encoding", "error", err)224d.metrics.encodingFailures.WithLabelValues(d.path).Inc()225finalText = fmt.Sprintf("the requested encoding conversion for this line failed in Grafana Agent: %s", err.Error())226}227} else {228finalText = text229}230231d.metrics.readLines.WithLabelValues(d.path).Inc()232233entries <- loki.Entry{234Labels: model.LabelSet{},235Entry: logproto.Entry{236Timestamp: time.Now(),237Line: finalText,238},239}240241d.size = int64(unsafe.Sizeof(finalText))242d.position++243}244}245246func (d *decompressor) MarkPositionAndSize() error {247// Lock this update as there are 2 timers calling this routine, the sync in filetarget and the positions sync in this file.248d.posAndSizeMtx.Lock()249defer d.posAndSizeMtx.Unlock()250251d.metrics.totalBytes.WithLabelValues(d.path).Set(float64(d.size))252d.metrics.readBytes.WithLabelValues(d.path).Set(float64(d.position))253d.positions.Put(d.path, d.labels, d.position)254255return nil256}257258func (d *decompressor) Stop() {259// stop can be called by two separate threads in filetarget, to avoid a panic closing channels more than once260// we wrap the stop in a sync.Once.261d.stopOnce.Do(func() {262// Shut down the position marker thread263close(d.posquit)264<-d.posdone265266// Save the current position before shutting down reader267if err := d.MarkPositionAndSize(); err != nil {268level.Error(d.logger).Log("msg", "error marking file position when stopping decompressor", "path", d.path, "error", err)269}270271// Wait for readLines() to consume all the remaining messages and exit when the channel is closed272<-d.done273level.Info(d.logger).Log("msg", "stopped decompressor", "path", d.path)274d.handler.Stop()275})276}277278func (d *decompressor) IsRunning() bool {279return d.running.Load()280}281282func (d *decompressor) convertToUTF8(text string) (string, error) {283res, _, err := transform.String(d.decoder, text)284if err != nil {285return "", fmt.Errorf("failed to decode text to UTF8: %w", err)286}287288return res, nil289}290291// cleanupMetrics removes all metrics exported by this reader292func (d *decompressor) cleanupMetrics() {293// When we stop tailing the file, un-export metrics related to the file.294d.metrics.filesActive.Add(-1.)295d.metrics.readLines.DeleteLabelValues(d.path)296d.metrics.readBytes.DeleteLabelValues(d.path)297d.metrics.totalBytes.DeleteLabelValues(d.path)298}299300func (d *decompressor) Path() string {301return d.path302}303304func isCompressed(p string) bool {305ext := filepath.Ext(p)306307for format := range supportedCompressedFormats() {308if ext == format {309return true310}311}312313return false314}315316317