Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/common/loki/positions/positions.go
4096 views
1
package positions
2
3
// This code is copied from Promtail. The positions package allows logging
4
// components to keep track of read file offsets on disk and continue from the
5
// same place in case of a restart.
6
7
import (
8
"flag"
9
"fmt"
10
"os"
11
"path/filepath"
12
"strconv"
13
"strings"
14
"sync"
15
"time"
16
17
"github.com/go-kit/log"
18
"github.com/go-kit/log/level"
19
yaml "gopkg.in/yaml.v2"
20
)
21
22
const (
23
positionFileMode = 0600
24
cursorKeyPrefix = "cursor-"
25
journalKeyPrefix = "journal-"
26
)
27
28
// Config describes where to get position information from.
29
type Config struct {
30
SyncPeriod time.Duration `mapstructure:"sync_period" yaml:"sync_period"`
31
PositionsFile string `mapstructure:"filename" yaml:"filename"`
32
IgnoreInvalidYaml bool `mapstructure:"ignore_invalid_yaml" yaml:"ignore_invalid_yaml"`
33
ReadOnly bool `mapstructure:"-" yaml:"-"`
34
}
35
36
// RegisterFlagsWithPrefix registers flags where every name is prefixed by
37
// prefix. If prefix is a non-empty string, prefix should end with a period.
38
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
39
f.DurationVar(&cfg.SyncPeriod, prefix+"positions.sync-period", 10*time.Second, "Period with this to sync the position file.")
40
f.StringVar(&cfg.PositionsFile, prefix+"positions.file", "/var/log/positions.yaml", "Location to read/write positions from.")
41
f.BoolVar(&cfg.IgnoreInvalidYaml, prefix+"positions.ignore-invalid-yaml", false, "whether to ignore & later overwrite positions files that are corrupted")
42
}
43
44
// RegisterFlags register flags.
45
func (cfg *Config) RegisterFlags(flags *flag.FlagSet) {
46
cfg.RegisterFlagsWithPrefix("", flags)
47
}
48
49
// Positions tracks how far through each file we've read.
50
type positions struct {
51
logger log.Logger
52
cfg Config
53
mtx sync.Mutex
54
positions map[Entry]string
55
quit chan struct{}
56
done chan struct{}
57
}
58
59
// Entry describes a positions file entry consisting of an absolute file path and
60
// the matching label set.
61
// An entry expects the string representation of a LabelSet or a Labels slice
62
// so that it can be utilized as a YAML key. The caller should make sure that
63
// the order and structure of the passed string representation is reproducible,
64
// and maintains the same format for both reading and writing from/to the
65
// positions file.
66
type Entry struct {
67
Path string `yaml:"path"`
68
Labels string `yaml:"labels"`
69
}
70
71
// File format for the positions data.
72
type File struct {
73
Positions map[Entry]string `yaml:"positions"`
74
}
75
76
type Positions interface {
77
// GetString returns how far we've through a file as a string.
78
// JournalTarget writes a journal cursor to the positions file, while
79
// FileTarget writes an integer offset. Use Get to read the integer
80
// offset.
81
GetString(path, labels string) string
82
// Get returns how far we've read through a file. Returns an error
83
// if the value stored for the file is not an integer.
84
Get(path, labels string) (int64, error)
85
// PutString records (asynchronously) how far we've read through a file.
86
// Unlike Put, it records a string offset and is only useful for
87
// JournalTargets which doesn't have integer offsets.
88
PutString(path, labels string, pos string)
89
// Put records (asynchronously) how far we've read through a file.
90
Put(path, labels string, pos int64)
91
// Remove removes the position tracking for a filepath
92
Remove(path, labels string)
93
// SyncPeriod returns how often the positions file gets resynced
94
SyncPeriod() time.Duration
95
// Stop the Position tracker.
96
Stop()
97
}
98
99
// New makes a new Positions.
100
func New(logger log.Logger, cfg Config) (Positions, error) {
101
positionData, err := readPositionsFile(cfg, logger)
102
if err != nil {
103
return nil, err
104
}
105
106
p := &positions{
107
logger: logger,
108
cfg: cfg,
109
positions: positionData,
110
quit: make(chan struct{}),
111
done: make(chan struct{}),
112
}
113
114
go p.run()
115
return p, nil
116
}
117
118
func (p *positions) Stop() {
119
close(p.quit)
120
<-p.done
121
}
122
123
func (p *positions) PutString(path, labels string, pos string) {
124
p.mtx.Lock()
125
defer p.mtx.Unlock()
126
p.positions[Entry{path, labels}] = pos
127
}
128
129
func (p *positions) Put(path, labels string, pos int64) {
130
p.PutString(path, labels, strconv.FormatInt(pos, 10))
131
}
132
133
func (p *positions) GetString(path, labels string) string {
134
p.mtx.Lock()
135
defer p.mtx.Unlock()
136
return p.positions[Entry{path, labels}]
137
}
138
139
func (p *positions) Get(path, labels string) (int64, error) {
140
p.mtx.Lock()
141
defer p.mtx.Unlock()
142
pos, ok := p.positions[Entry{path, labels}]
143
if !ok {
144
return 0, nil
145
}
146
return strconv.ParseInt(pos, 10, 64)
147
}
148
149
func (p *positions) Remove(path, labels string) {
150
p.mtx.Lock()
151
defer p.mtx.Unlock()
152
p.remove(path, labels)
153
}
154
155
func (p *positions) remove(path, labels string) {
156
delete(p.positions, Entry{path, labels})
157
}
158
159
func (p *positions) SyncPeriod() time.Duration {
160
return p.cfg.SyncPeriod
161
}
162
163
func (p *positions) run() {
164
defer func() {
165
p.save()
166
level.Debug(p.logger).Log("msg", "positions saved")
167
close(p.done)
168
}()
169
170
ticker := time.NewTicker(p.cfg.SyncPeriod)
171
for {
172
select {
173
case <-p.quit:
174
return
175
case <-ticker.C:
176
p.save()
177
p.cleanup()
178
}
179
}
180
}
181
182
func (p *positions) save() {
183
if p.cfg.ReadOnly {
184
return
185
}
186
p.mtx.Lock()
187
positions := make(map[Entry]string, len(p.positions))
188
for k, v := range p.positions {
189
positions[k] = v
190
}
191
p.mtx.Unlock()
192
193
if err := writePositionFile(p.cfg.PositionsFile, positions); err != nil {
194
level.Error(p.logger).Log("msg", "error writing positions file", "error", err)
195
}
196
}
197
198
// CursorKey returns a key that can be saved as a cursor that is never deleted.
199
func CursorKey(key string) string {
200
return fmt.Sprintf("%s%s", cursorKeyPrefix, key)
201
}
202
203
func (p *positions) cleanup() {
204
p.mtx.Lock()
205
defer p.mtx.Unlock()
206
toRemove := []Entry{}
207
for k := range p.positions {
208
// If the position file is prefixed with cursor, it's a
209
// cursor and not a file on disk.
210
// We still have to support journal files, so we keep the previous check to avoid breaking change.
211
if strings.HasPrefix(k.Path, cursorKeyPrefix) || strings.HasPrefix(k.Path, journalKeyPrefix) {
212
continue
213
}
214
215
if _, err := os.Stat(k.Path); err != nil {
216
if os.IsNotExist(err) {
217
// File no longer exists.
218
toRemove = append(toRemove, k)
219
} else {
220
// Can't determine if file exists or not, some other error.
221
level.Warn(p.logger).Log("msg", "could not determine if log file "+
222
"still exists while cleaning positions file", "error", err)
223
}
224
}
225
}
226
for _, tr := range toRemove {
227
p.remove(tr.Path, tr.Labels)
228
}
229
}
230
231
func readPositionsFile(cfg Config, logger log.Logger) (map[Entry]string, error) {
232
cleanfn := filepath.Clean(cfg.PositionsFile)
233
buf, err := os.ReadFile(cleanfn)
234
if err != nil {
235
if os.IsNotExist(err) {
236
return map[Entry]string{}, nil
237
}
238
return nil, err
239
}
240
241
var p File
242
err = yaml.UnmarshalStrict(buf, &p)
243
if err != nil {
244
// return empty if cfg option enabled
245
if cfg.IgnoreInvalidYaml {
246
level.Debug(logger).Log("msg", "ignoring invalid positions file", "file", cleanfn, "error", err)
247
return map[Entry]string{}, nil
248
}
249
250
return nil, fmt.Errorf("invalid yaml positions file [%s]: %v", cleanfn, err)
251
}
252
253
// p.Positions will be nil if the file exists but is empty
254
if p.Positions == nil {
255
p.Positions = map[Entry]string{}
256
}
257
258
return p.Positions, nil
259
}
260
261