package file
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/go-kit/log/level"
"github.com/grafana/agent/component"
"github.com/grafana/agent/component/common/loki"
"github.com/grafana/agent/component/common/loki/positions"
"github.com/grafana/agent/component/discovery"
"github.com/prometheus/common/model"
)
func init() {
component.Register(component.Registration{
Name: "loki.source.file",
Args: Arguments{},
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
return New(opts, args.(Arguments))
},
})
}
const (
pathLabel = "__path__"
filenameLabel = "filename"
)
type Arguments struct {
Targets []discovery.Target `river:"targets,attr"`
ForwardTo []loki.LogsReceiver `river:"forward_to,attr"`
}
var (
_ component.Component = (*Component)(nil)
)
type Component struct {
opts component.Options
metrics *metrics
updateMut sync.Mutex
mut sync.RWMutex
args Arguments
handler loki.LogsReceiver
receivers []loki.LogsReceiver
posFile positions.Positions
readers map[positions.Entry]reader
}
func New(o component.Options, args Arguments) (*Component, error) {
err := os.MkdirAll(o.DataPath, 0750)
if err != nil && !os.IsExist(err) {
return nil, err
}
positionsFile, err := positions.New(o.Logger, positions.Config{
SyncPeriod: 10 * time.Second,
PositionsFile: filepath.Join(o.DataPath, "positions.yml"),
IgnoreInvalidYaml: false,
ReadOnly: false,
})
if err != nil {
return nil, err
}
c := &Component{
opts: o,
metrics: newMetrics(o.Registerer),
handler: make(loki.LogsReceiver),
receivers: args.ForwardTo,
posFile: positionsFile,
readers: make(map[positions.Entry]reader),
}
if err := c.Update(args); err != nil {
return nil, err
}
return c, nil
}
func (c *Component) Run(ctx context.Context) error {
defer func() {
level.Info(c.opts.Logger).Log("msg", "loki.source.file component shutting down, stopping readers and positions file")
c.mut.RLock()
for _, r := range c.readers {
r.Stop()
}
c.posFile.Stop()
close(c.handler)
c.mut.RUnlock()
}()
for {
select {
case <-ctx.Done():
return nil
case entry := <-c.handler:
c.mut.RLock()
for _, receiver := range c.receivers {
receiver <- entry
}
c.mut.RUnlock()
}
}
}
func (c *Component) Update(args component.Arguments) error {
c.updateMut.Lock()
defer c.updateMut.Unlock()
oldPaths := c.stopReaders()
newArgs := args.(Arguments)
c.mut.Lock()
defer c.mut.Unlock()
c.args = newArgs
c.receivers = newArgs.ForwardTo
c.readers = make(map[positions.Entry]reader)
if len(newArgs.Targets) == 0 {
level.Debug(c.opts.Logger).Log("msg", "no files targets were passed, nothing will be tailed")
return nil
}
for _, target := range newArgs.Targets {
path := target[pathLabel]
var labels = make(model.LabelSet)
for k, v := range target {
if strings.HasPrefix(k, model.ReservedLabelPrefix) {
continue
}
labels[model.LabelName(k)] = model.LabelValue(v)
}
readersKey := positions.Entry{Path: path, Labels: labels.String()}
if _, exist := c.readers[readersKey]; exist {
continue
}
c.reportSize(path, labels.String())
handler := loki.AddLabelsMiddleware(labels).Wrap(loki.NewEntryHandler(c.handler, func() {}))
reader, err := c.startTailing(path, labels, handler)
if err != nil {
continue
}
c.readers[readersKey] = readerWithHandler{
reader: reader,
handler: handler,
}
}
for r := range missing(c.readers, oldPaths) {
c.posFile.Remove(r.Path, r.Labels)
}
return nil
}
type readerWithHandler struct {
reader
handler loki.EntryHandler
}
func (r readerWithHandler) Stop() {
r.reader.Stop()
r.handler.Stop()
}
func (c *Component) stopReaders() map[positions.Entry]struct{} {
c.mut.RLock()
defer c.mut.RUnlock()
stoppedPaths := make(map[positions.Entry]struct{}, len(c.readers))
for p, r := range c.readers {
stoppedPaths[p] = struct{}{}
r.Stop()
}
return stoppedPaths
}
func (c *Component) DebugInfo() interface{} {
var res readerDebugInfo
for e, reader := range c.readers {
offset, _ := c.posFile.Get(e.Path, e.Labels)
res.TargetsInfo = append(res.TargetsInfo, targetInfo{
Path: e.Path,
Labels: e.Labels,
IsRunning: reader.IsRunning(),
ReadOffset: offset,
})
}
return res
}
type readerDebugInfo struct {
TargetsInfo []targetInfo `river:"targets_info,block"`
}
type targetInfo struct {
Path string `river:"path,attr"`
Labels string `river:"labels,attr"`
IsRunning bool `river:"is_running,attr"`
ReadOffset int64 `river:"read_offset,attr"`
}
func missing(as map[positions.Entry]reader, bs map[positions.Entry]struct{}) map[positions.Entry]struct{} {
c := map[positions.Entry]struct{}{}
for a := range bs {
if _, ok := as[a]; !ok {
c[a] = struct{}{}
}
}
return c
}
func (c *Component) startTailing(path string, labels model.LabelSet, handler loki.EntryHandler) (reader, error) {
fi, err := os.Stat(path)
if err != nil {
level.Error(c.opts.Logger).Log("msg", "failed to tail file, stat failed", "error", err, "filename", path)
c.metrics.totalBytes.DeleteLabelValues(path)
return nil, fmt.Errorf("failed to stat path %s", path)
}
if fi.IsDir() {
level.Info(c.opts.Logger).Log("msg", "failed to tail file", "error", "file is a directory", "filename", path)
c.metrics.totalBytes.DeleteLabelValues(path)
return nil, fmt.Errorf("failed to tail file, it was a directory %s", path)
}
var reader reader
if isCompressed(path) {
level.Debug(c.opts.Logger).Log("msg", "reading from compressed file", "filename", path)
decompressor, err := newDecompressor(
c.metrics,
c.opts.Logger,
handler,
c.posFile,
path,
labels.String(),
"",
)
if err != nil {
level.Error(c.opts.Logger).Log("msg", "failed to start decompressor", "error", err, "filename", path)
return nil, fmt.Errorf("failed to start decompressor %s", err)
}
reader = decompressor
} else {
level.Debug(c.opts.Logger).Log("msg", "tailing new file", "filename", path)
tailer, err := newTailer(
c.metrics,
c.opts.Logger,
handler,
c.posFile,
path,
labels.String(),
"",
)
if err != nil {
level.Error(c.opts.Logger).Log("msg", "failed to start tailer", "error", err, "filename", path)
return nil, fmt.Errorf("failed to start tailer %s", err)
}
reader = tailer
}
return reader, nil
}
func (c *Component) reportSize(path, labels string) {
if reader, ok := c.readers[positions.Entry{Path: path, Labels: labels}]; ok {
err := reader.MarkPositionAndSize()
if err != nil {
level.Warn(c.opts.Logger).Log("msg", "failed to get file size from existing reader, ", "file", path, "error", err)
return
}
} else {
fi, err := os.Stat(path)
if err != nil {
return
}
c.metrics.totalBytes.WithLabelValues(path).Set(float64(fi.Size()))
}
}