package file
import (
"context"
"fmt"
"io"
"os"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/go-kit/log/level"
"github.com/grafana/agent/component"
"github.com/grafana/agent/pkg/river"
"github.com/grafana/agent/pkg/river/rivertypes"
)
const waitReadPeriod time.Duration = 30 * time.Millisecond
func init() {
component.Register(component.Registration{
Name: "local.file",
Args: Arguments{},
Exports: Exports{},
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
return New(opts, args.(Arguments))
},
})
}
type Arguments struct {
Filename string `river:"filename,attr"`
Type Detector `river:"detector,attr,optional"`
PollFrequency time.Duration `river:"poll_frequency,attr,optional"`
IsSecret bool `river:"is_secret,attr,optional"`
}
var DefaultArguments = Arguments{
Type: DetectorFSNotify,
PollFrequency: time.Minute,
}
var _ river.Unmarshaler = (*Arguments)(nil)
func (a *Arguments) UnmarshalRiver(f func(interface{}) error) error {
*a = DefaultArguments
type arguments Arguments
return f((*arguments)(a))
}
type Exports struct {
Content rivertypes.OptionalSecret `river:"content,attr"`
}
type Component struct {
opts component.Options
mut sync.Mutex
args Arguments
latestContent string
detector io.Closer
healthMut sync.RWMutex
health component.Health
reloadCh chan struct{}
lastAccessed prometheus.Gauge
}
var (
_ component.Component = (*Component)(nil)
_ component.HealthComponent = (*Component)(nil)
)
func New(o component.Options, args Arguments) (*Component, error) {
c := &Component{
opts: o,
reloadCh: make(chan struct{}, 1),
lastAccessed: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "agent_local_file_timestamp_last_accessed_unix_seconds",
Help: "The last successful access in unix seconds",
}),
}
err := o.Registerer.Register(c.lastAccessed)
if err != nil {
return nil, err
}
if err = c.Update(args); err != nil {
return nil, err
}
return c, nil
}
func (c *Component) Run(ctx context.Context) error {
defer func() {
c.mut.Lock()
defer c.mut.Unlock()
if err := c.detector.Close(); err != nil {
level.Error(c.opts.Logger).Log("msg", "failed to shut down detector", "err", err)
}
c.detector = nil
}()
c.mut.Lock()
_ = c.configureDetector()
c.mut.Unlock()
for {
select {
case <-ctx.Done():
return nil
case <-c.reloadCh:
time.Sleep(waitReadPeriod)
c.mut.Lock()
_ = c.readFile()
c.mut.Unlock()
}
}
}
func (c *Component) readFile() error {
bb, err := os.ReadFile(c.args.Filename)
if err != nil {
c.setHealth(component.Health{
Health: component.HealthTypeUnhealthy,
Message: fmt.Sprintf("failed to read file: %s", err),
UpdateTime: time.Now(),
})
level.Error(c.opts.Logger).Log("msg", "failed to read file", "path", c.opts.DataPath, "err", err)
return err
}
c.latestContent = string(bb)
c.lastAccessed.SetToCurrentTime()
c.opts.OnStateChange(Exports{
Content: rivertypes.OptionalSecret{
IsSecret: c.args.IsSecret,
Value: c.latestContent,
},
})
c.setHealth(component.Health{
Health: component.HealthTypeHealthy,
Message: "read file",
UpdateTime: time.Now(),
})
return nil
}
func (c *Component) Update(args component.Arguments) error {
newArgs := args.(Arguments)
if newArgs.PollFrequency <= 0 {
return fmt.Errorf("poll_frequency must be greater than 0")
}
c.mut.Lock()
defer c.mut.Unlock()
c.args = newArgs
if err := c.readFile(); err != nil {
return fmt.Errorf("failed to read file: %w", err)
}
if c.detector != nil {
if err := c.detector.Close(); err != nil {
level.Error(c.opts.Logger).Log("msg", "failed to shut down old detector", "err", err)
}
c.detector = nil
}
return c.configureDetector()
}
func (c *Component) configureDetector() error {
if c.detector != nil {
return nil
}
var err error
reloadFile := func() {
select {
case c.reloadCh <- struct{}{}:
default:
}
}
switch c.args.Type {
case DetectorPoll:
c.detector = newPoller(pollerOptions{
Filename: c.args.Filename,
ReloadFile: reloadFile,
PollFrequency: c.args.PollFrequency,
})
case DetectorFSNotify:
c.detector, err = newFSNotify(fsNotifyOptions{
Logger: c.opts.Logger,
Filename: c.args.Filename,
ReloadFile: reloadFile,
PollFrequency: c.args.PollFrequency,
})
}
return err
}
func (c *Component) CurrentHealth() component.Health {
c.healthMut.RLock()
defer c.healthMut.RUnlock()
return c.health
}
func (c *Component) setHealth(h component.Health) {
c.healthMut.Lock()
defer c.healthMut.Unlock()
c.health = h
}