Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/local/file/file.go
4095 views
1
package file
2
3
import (
4
"context"
5
"fmt"
6
"io"
7
"os"
8
"sync"
9
"time"
10
11
"github.com/prometheus/client_golang/prometheus"
12
13
"github.com/go-kit/log/level"
14
"github.com/grafana/agent/component"
15
"github.com/grafana/agent/pkg/river"
16
"github.com/grafana/agent/pkg/river/rivertypes"
17
)
18
19
// waitReadPeriod holds the time to wait before reading a file while the
20
// local.file component is running.
21
//
22
// This prevents local.file from updating too frequently and exporting partial
23
// writes.
24
const waitReadPeriod time.Duration = 30 * time.Millisecond
25
26
func init() {
27
component.Register(component.Registration{
28
Name: "local.file",
29
Args: Arguments{},
30
Exports: Exports{},
31
32
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
33
return New(opts, args.(Arguments))
34
},
35
})
36
}
37
38
// Arguments holds values which are used to configure the local.file component.
39
type Arguments struct {
40
// Filename indicates the file to watch.
41
Filename string `river:"filename,attr"`
42
// Type indicates how to detect changes to the file.
43
Type Detector `river:"detector,attr,optional"`
44
// PollFrequency determines the frequency to check for changes when Type is
45
// Poll.
46
PollFrequency time.Duration `river:"poll_frequency,attr,optional"`
47
// IsSecret marks the file as holding a secret value which should not be
48
// displayed to the user.
49
IsSecret bool `river:"is_secret,attr,optional"`
50
}
51
52
// DefaultArguments provides the default arguments for the local.file
53
// component.
54
var DefaultArguments = Arguments{
55
Type: DetectorFSNotify,
56
PollFrequency: time.Minute,
57
}
58
59
var _ river.Unmarshaler = (*Arguments)(nil)
60
61
// UnmarshalRiver implements river.Unmarshaler.
62
func (a *Arguments) UnmarshalRiver(f func(interface{}) error) error {
63
*a = DefaultArguments
64
65
type arguments Arguments
66
return f((*arguments)(a))
67
}
68
69
// Exports holds values which are exported by the local.file component.
70
type Exports struct {
71
// Content of the file.
72
Content rivertypes.OptionalSecret `river:"content,attr"`
73
}
74
75
// Component implements the local.file component.
76
type Component struct {
77
opts component.Options
78
79
mut sync.Mutex
80
args Arguments
81
latestContent string
82
detector io.Closer
83
84
healthMut sync.RWMutex
85
health component.Health
86
87
// reloadCh is a buffered channel which is written to when the watched file
88
// should be reloaded by the component.
89
reloadCh chan struct{}
90
lastAccessed prometheus.Gauge
91
}
92
93
var (
94
_ component.Component = (*Component)(nil)
95
_ component.HealthComponent = (*Component)(nil)
96
)
97
98
// New creates a new local.file component.
99
func New(o component.Options, args Arguments) (*Component, error) {
100
c := &Component{
101
opts: o,
102
103
reloadCh: make(chan struct{}, 1),
104
lastAccessed: prometheus.NewGauge(prometheus.GaugeOpts{
105
Name: "agent_local_file_timestamp_last_accessed_unix_seconds",
106
Help: "The last successful access in unix seconds",
107
}),
108
}
109
110
err := o.Registerer.Register(c.lastAccessed)
111
if err != nil {
112
return nil, err
113
}
114
// Perform an update which will immediately set our exports to the initial
115
// contents of the file.
116
if err = c.Update(args); err != nil {
117
return nil, err
118
}
119
return c, nil
120
}
121
122
// Run implements component.Component.
123
func (c *Component) Run(ctx context.Context) error {
124
defer func() {
125
c.mut.Lock()
126
defer c.mut.Unlock()
127
128
if err := c.detector.Close(); err != nil {
129
level.Error(c.opts.Logger).Log("msg", "failed to shut down detector", "err", err)
130
}
131
c.detector = nil
132
}()
133
134
// Since Run _may_ get recalled if we're told to exit but still exist in the
135
// config file, we may have prematurely destroyed the detector. If no
136
// detector exists, we need to recreate it for Run to work properly.
137
//
138
// We ignore the error (indicating the file has disappeared) so we can allow
139
// the detector to inform us when it comes back.
140
//
141
// TODO(rfratto): this is a design wart, and can hopefully be removed in
142
// future iterations.
143
c.mut.Lock()
144
_ = c.configureDetector()
145
c.mut.Unlock()
146
147
for {
148
select {
149
case <-ctx.Done():
150
return nil
151
case <-c.reloadCh:
152
time.Sleep(waitReadPeriod)
153
154
// We ignore the error here from readFile since readFile will log errors
155
// and also report the error as the health of the component.
156
c.mut.Lock()
157
_ = c.readFile()
158
c.mut.Unlock()
159
}
160
}
161
}
162
163
func (c *Component) readFile() error {
164
// Force a re-load of the file outside of the update detection mechanism.
165
bb, err := os.ReadFile(c.args.Filename)
166
if err != nil {
167
c.setHealth(component.Health{
168
Health: component.HealthTypeUnhealthy,
169
Message: fmt.Sprintf("failed to read file: %s", err),
170
UpdateTime: time.Now(),
171
})
172
level.Error(c.opts.Logger).Log("msg", "failed to read file", "path", c.opts.DataPath, "err", err)
173
return err
174
}
175
c.latestContent = string(bb)
176
c.lastAccessed.SetToCurrentTime()
177
178
c.opts.OnStateChange(Exports{
179
Content: rivertypes.OptionalSecret{
180
IsSecret: c.args.IsSecret,
181
Value: c.latestContent,
182
},
183
})
184
185
c.setHealth(component.Health{
186
Health: component.HealthTypeHealthy,
187
Message: "read file",
188
UpdateTime: time.Now(),
189
})
190
return nil
191
}
192
193
// Update implements component.Component.
194
func (c *Component) Update(args component.Arguments) error {
195
newArgs := args.(Arguments)
196
197
if newArgs.PollFrequency <= 0 {
198
return fmt.Errorf("poll_frequency must be greater than 0")
199
}
200
201
c.mut.Lock()
202
defer c.mut.Unlock()
203
c.args = newArgs
204
205
// Force an immediate read of the file to report any potential errors early.
206
if err := c.readFile(); err != nil {
207
return fmt.Errorf("failed to read file: %w", err)
208
}
209
210
// Each detector is dedicated to a single file path. We'll naively shut down
211
// the existing detector (if any) before setting up a new one to make sure
212
// the correct file is being watched in case the path changed between calls
213
// to Update.
214
if c.detector != nil {
215
if err := c.detector.Close(); err != nil {
216
level.Error(c.opts.Logger).Log("msg", "failed to shut down old detector", "err", err)
217
}
218
c.detector = nil
219
}
220
221
return c.configureDetector()
222
}
223
224
// configureDetector configures the detector if one isn't set. mut must be held
225
// when called.
226
func (c *Component) configureDetector() error {
227
if c.detector != nil {
228
// Already have a detector; don't do anything.
229
return nil
230
}
231
232
var err error
233
234
reloadFile := func() {
235
select {
236
case c.reloadCh <- struct{}{}:
237
default:
238
// no-op: a reload is already queued so we don't need to queue a second
239
// one.
240
}
241
}
242
243
switch c.args.Type {
244
case DetectorPoll:
245
c.detector = newPoller(pollerOptions{
246
Filename: c.args.Filename,
247
ReloadFile: reloadFile,
248
PollFrequency: c.args.PollFrequency,
249
})
250
case DetectorFSNotify:
251
c.detector, err = newFSNotify(fsNotifyOptions{
252
Logger: c.opts.Logger,
253
Filename: c.args.Filename,
254
ReloadFile: reloadFile,
255
PollFrequency: c.args.PollFrequency,
256
})
257
}
258
259
return err
260
}
261
262
// CurrentHealth implements component.HealthComponent.
263
func (c *Component) CurrentHealth() component.Health {
264
c.healthMut.RLock()
265
defer c.healthMut.RUnlock()
266
return c.health
267
}
268
269
func (c *Component) setHealth(h component.Health) {
270
c.healthMut.Lock()
271
defer c.healthMut.Unlock()
272
c.health = h
273
}
274
275