Path: blob/main/component/common/loki/positions/positions.go
4096 views
package positions12// This code is copied from Promtail. The positions package allows logging3// components to keep track of read file offsets on disk and continue from the4// same place in case of a restart.56import (7"flag"8"fmt"9"os"10"path/filepath"11"strconv"12"strings"13"sync"14"time"1516"github.com/go-kit/log"17"github.com/go-kit/log/level"18yaml "gopkg.in/yaml.v2"19)2021const (22positionFileMode = 060023cursorKeyPrefix = "cursor-"24journalKeyPrefix = "journal-"25)2627// Config describes where to get position information from.28type Config struct {29SyncPeriod time.Duration `mapstructure:"sync_period" yaml:"sync_period"`30PositionsFile string `mapstructure:"filename" yaml:"filename"`31IgnoreInvalidYaml bool `mapstructure:"ignore_invalid_yaml" yaml:"ignore_invalid_yaml"`32ReadOnly bool `mapstructure:"-" yaml:"-"`33}3435// RegisterFlagsWithPrefix registers flags where every name is prefixed by36// prefix. If prefix is a non-empty string, prefix should end with a period.37func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {38f.DurationVar(&cfg.SyncPeriod, prefix+"positions.sync-period", 10*time.Second, "Period with this to sync the position file.")39f.StringVar(&cfg.PositionsFile, prefix+"positions.file", "/var/log/positions.yaml", "Location to read/write positions from.")40f.BoolVar(&cfg.IgnoreInvalidYaml, prefix+"positions.ignore-invalid-yaml", false, "whether to ignore & later overwrite positions files that are corrupted")41}4243// RegisterFlags register flags.44func (cfg *Config) RegisterFlags(flags *flag.FlagSet) {45cfg.RegisterFlagsWithPrefix("", flags)46}4748// Positions tracks how far through each file we've read.49type positions struct {50logger log.Logger51cfg Config52mtx sync.Mutex53positions map[Entry]string54quit chan struct{}55done chan struct{}56}5758// Entry describes a positions file entry consisting of an absolute file path and59// the matching label set.60// An entry expects the string representation of a LabelSet or a Labels slice61// so that it can be utilized as a YAML key. The caller should make sure that62// the order and structure of the passed string representation is reproducible,63// and maintains the same format for both reading and writing from/to the64// positions file.65type Entry struct {66Path string `yaml:"path"`67Labels string `yaml:"labels"`68}6970// File format for the positions data.71type File struct {72Positions map[Entry]string `yaml:"positions"`73}7475type Positions interface {76// GetString returns how far we've through a file as a string.77// JournalTarget writes a journal cursor to the positions file, while78// FileTarget writes an integer offset. Use Get to read the integer79// offset.80GetString(path, labels string) string81// Get returns how far we've read through a file. Returns an error82// if the value stored for the file is not an integer.83Get(path, labels string) (int64, error)84// PutString records (asynchronously) how far we've read through a file.85// Unlike Put, it records a string offset and is only useful for86// JournalTargets which doesn't have integer offsets.87PutString(path, labels string, pos string)88// Put records (asynchronously) how far we've read through a file.89Put(path, labels string, pos int64)90// Remove removes the position tracking for a filepath91Remove(path, labels string)92// SyncPeriod returns how often the positions file gets resynced93SyncPeriod() time.Duration94// Stop the Position tracker.95Stop()96}9798// New makes a new Positions.99func New(logger log.Logger, cfg Config) (Positions, error) {100positionData, err := readPositionsFile(cfg, logger)101if err != nil {102return nil, err103}104105p := &positions{106logger: logger,107cfg: cfg,108positions: positionData,109quit: make(chan struct{}),110done: make(chan struct{}),111}112113go p.run()114return p, nil115}116117func (p *positions) Stop() {118close(p.quit)119<-p.done120}121122func (p *positions) PutString(path, labels string, pos string) {123p.mtx.Lock()124defer p.mtx.Unlock()125p.positions[Entry{path, labels}] = pos126}127128func (p *positions) Put(path, labels string, pos int64) {129p.PutString(path, labels, strconv.FormatInt(pos, 10))130}131132func (p *positions) GetString(path, labels string) string {133p.mtx.Lock()134defer p.mtx.Unlock()135return p.positions[Entry{path, labels}]136}137138func (p *positions) Get(path, labels string) (int64, error) {139p.mtx.Lock()140defer p.mtx.Unlock()141pos, ok := p.positions[Entry{path, labels}]142if !ok {143return 0, nil144}145return strconv.ParseInt(pos, 10, 64)146}147148func (p *positions) Remove(path, labels string) {149p.mtx.Lock()150defer p.mtx.Unlock()151p.remove(path, labels)152}153154func (p *positions) remove(path, labels string) {155delete(p.positions, Entry{path, labels})156}157158func (p *positions) SyncPeriod() time.Duration {159return p.cfg.SyncPeriod160}161162func (p *positions) run() {163defer func() {164p.save()165level.Debug(p.logger).Log("msg", "positions saved")166close(p.done)167}()168169ticker := time.NewTicker(p.cfg.SyncPeriod)170for {171select {172case <-p.quit:173return174case <-ticker.C:175p.save()176p.cleanup()177}178}179}180181func (p *positions) save() {182if p.cfg.ReadOnly {183return184}185p.mtx.Lock()186positions := make(map[Entry]string, len(p.positions))187for k, v := range p.positions {188positions[k] = v189}190p.mtx.Unlock()191192if err := writePositionFile(p.cfg.PositionsFile, positions); err != nil {193level.Error(p.logger).Log("msg", "error writing positions file", "error", err)194}195}196197// CursorKey returns a key that can be saved as a cursor that is never deleted.198func CursorKey(key string) string {199return fmt.Sprintf("%s%s", cursorKeyPrefix, key)200}201202func (p *positions) cleanup() {203p.mtx.Lock()204defer p.mtx.Unlock()205toRemove := []Entry{}206for k := range p.positions {207// If the position file is prefixed with cursor, it's a208// cursor and not a file on disk.209// We still have to support journal files, so we keep the previous check to avoid breaking change.210if strings.HasPrefix(k.Path, cursorKeyPrefix) || strings.HasPrefix(k.Path, journalKeyPrefix) {211continue212}213214if _, err := os.Stat(k.Path); err != nil {215if os.IsNotExist(err) {216// File no longer exists.217toRemove = append(toRemove, k)218} else {219// Can't determine if file exists or not, some other error.220level.Warn(p.logger).Log("msg", "could not determine if log file "+221"still exists while cleaning positions file", "error", err)222}223}224}225for _, tr := range toRemove {226p.remove(tr.Path, tr.Labels)227}228}229230func readPositionsFile(cfg Config, logger log.Logger) (map[Entry]string, error) {231cleanfn := filepath.Clean(cfg.PositionsFile)232buf, err := os.ReadFile(cleanfn)233if err != nil {234if os.IsNotExist(err) {235return map[Entry]string{}, nil236}237return nil, err238}239240var p File241err = yaml.UnmarshalStrict(buf, &p)242if err != nil {243// return empty if cfg option enabled244if cfg.IgnoreInvalidYaml {245level.Debug(logger).Log("msg", "ignoring invalid positions file", "file", cleanfn, "error", err)246return map[Entry]string{}, nil247}248249return nil, fmt.Errorf("invalid yaml positions file [%s]: %v", cleanfn, err)250}251252// p.Positions will be nil if the file exists but is empty253if p.Positions == nil {254p.Positions = map[Entry]string{}255}256257return p.Positions, nil258}259260261