Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/local/file/detector.go
4095 views
1
package file
2
3
import (
4
"context"
5
"encoding"
6
"fmt"
7
"sync"
8
"time"
9
10
"github.com/fsnotify/fsnotify"
11
"github.com/go-kit/log"
12
"github.com/go-kit/log/level"
13
)
14
15
// Detector is used to specify how changes to the file should be detected.
16
type Detector int
17
18
const (
19
// DetectorInvalid indicates an invalid UpdateType.
20
DetectorInvalid Detector = iota
21
// DetectorFSNotify uses filesystem events to wait for changes to the file.
22
DetectorFSNotify
23
// DetectorPoll will re-read the file on an interval to detect changes.
24
DetectorPoll
25
26
// DetectorDefault holds the default UpdateType.
27
DetectorDefault = DetectorFSNotify
28
)
29
30
var (
31
_ encoding.TextMarshaler = Detector(0)
32
_ encoding.TextUnmarshaler = (*Detector)(nil)
33
)
34
35
// String returns the string representation of the UpdateType.
36
func (ut Detector) String() string {
37
switch ut {
38
case DetectorFSNotify:
39
return "fsnotify"
40
case DetectorPoll:
41
return "poll"
42
default:
43
return fmt.Sprintf("Detector(%d)", ut)
44
}
45
}
46
47
// MarshalText implements encoding.TextMarshaler.
48
func (ut Detector) MarshalText() (text []byte, err error) {
49
return []byte(ut.String()), nil
50
}
51
52
// UnmarshalText implements encoding.TextUnmarshaler.
53
func (ut *Detector) UnmarshalText(text []byte) error {
54
switch string(text) {
55
case "":
56
*ut = DetectorDefault
57
case "fsnotify":
58
*ut = DetectorFSNotify
59
case "poll":
60
*ut = DetectorPoll
61
default:
62
return fmt.Errorf("unrecognized detector %q, expected fsnotify or poll", string(text))
63
}
64
return nil
65
}
66
67
type fsNotify struct {
68
opts fsNotifyOptions
69
cancel context.CancelFunc
70
71
// watcherMut is needed to prevent race conditions on Windows. This can be
72
// removed once fsnotify/fsnotify#454 is merged and included in a patch
73
// release.
74
watcherMut sync.Mutex
75
watcher *fsnotify.Watcher
76
}
77
78
type fsNotifyOptions struct {
79
Logger log.Logger
80
Filename string
81
ReloadFile func() // Callback to request file reload.
82
PollFrequency time.Duration // How often to do fallback polling
83
}
84
85
// newFSNotify creates a new fsnotify detector which uses filesystem events to
86
// detect that a file has changed.
87
func newFSNotify(opts fsNotifyOptions) (*fsNotify, error) {
88
w, err := fsnotify.NewWatcher()
89
if err != nil {
90
return nil, err
91
}
92
if err := w.Add(opts.Filename); err != nil {
93
// It's possible that the file already got deleted by the time our fsnotify
94
// was created. We'll log the error and wait for our polling fallback for
95
// the file to be recreated.
96
level.Warn(opts.Logger).Log("msg", "failed to watch file", "err", err)
97
}
98
99
ctx, cancel := context.WithCancel(context.Background())
100
101
wd := &fsNotify{
102
opts: opts,
103
watcher: w,
104
cancel: cancel,
105
}
106
107
go wd.wait(ctx)
108
return wd, nil
109
}
110
111
func (fsn *fsNotify) wait(ctx context.Context) {
112
pollTick := time.NewTicker(fsn.opts.PollFrequency)
113
defer pollTick.Stop()
114
115
for {
116
select {
117
case <-ctx.Done():
118
return
119
case <-pollTick.C:
120
// fsnotify falls back to polling in case the watch stopped (i.e., the
121
// file got deleted) or failed.
122
//
123
// We'll use the poll period to re-establish the watch in case it was
124
// stopped. This is a no-op if the watch is already active.
125
fsn.watcherMut.Lock()
126
err := fsn.watcher.Add(fsn.opts.Filename)
127
fsn.watcherMut.Unlock()
128
129
if err != nil {
130
level.Warn(fsn.opts.Logger).Log("msg", "failed re-watch file", "err", err)
131
}
132
133
fsn.opts.ReloadFile()
134
135
case err := <-fsn.watcher.Errors:
136
// The fsnotify watcher can generate errors for OS-level reasons (watched
137
// failed, failed when closing the file, etc.). We don't know if the error
138
// is related to the file, so we always treat it as if the file updated.
139
//
140
// This will force the component to reload the file and report the error
141
// directly to the user via the component health.
142
if err != nil {
143
level.Warn(fsn.opts.Logger).Log("msg", "got error from fsnotify watcher; treating as file updated event", "err", err)
144
fsn.opts.ReloadFile()
145
}
146
case ev := <-fsn.watcher.Events:
147
level.Debug(fsn.opts.Logger).Log("msg", "got fsnotify event", "op", ev.Op.String())
148
fsn.opts.ReloadFile()
149
}
150
}
151
}
152
153
func (fsn *fsNotify) Close() error {
154
fsn.watcherMut.Lock()
155
defer fsn.watcherMut.Unlock()
156
157
fsn.cancel()
158
return fsn.watcher.Close()
159
}
160
161
type poller struct {
162
opts pollerOptions
163
cancel context.CancelFunc
164
}
165
166
type pollerOptions struct {
167
Filename string
168
ReloadFile func() // Callback to request file reload.
169
PollFrequency time.Duration
170
}
171
172
// newPoller creates a new poll-based file update detector.
173
func newPoller(opts pollerOptions) *poller {
174
ctx, cancel := context.WithCancel(context.Background())
175
176
pw := &poller{
177
opts: opts,
178
cancel: cancel,
179
}
180
181
go pw.run(ctx)
182
return pw
183
}
184
185
func (p *poller) run(ctx context.Context) {
186
t := time.NewTicker(p.opts.PollFrequency)
187
defer t.Stop()
188
189
for {
190
select {
191
case <-ctx.Done():
192
return
193
case <-t.C:
194
// Always tell the component to re-check the file. This avoids situations
195
// where the file changed without changing any of the stats (like modify
196
// time).
197
p.opts.ReloadFile()
198
}
199
}
200
}
201
202
// Close terminates the poller.
203
func (p *poller) Close() error {
204
p.cancel()
205
return nil
206
}
207
208