Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/discovery/file/file.go
4096 views
1
package file
2
3
import (
4
"context"
5
"sync"
6
"time"
7
8
"github.com/grafana/agent/component/discovery"
9
10
"github.com/go-kit/log/level"
11
"github.com/grafana/agent/component"
12
)
13
14
func init() {
15
component.Register(component.Registration{
16
Name: "discovery.file",
17
Args: Arguments{},
18
Exports: discovery.Exports{},
19
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
20
return New(opts, args.(Arguments))
21
},
22
})
23
}
24
25
// Arguments holds values which are used to configure the discovery.file
26
// component.
27
type Arguments struct {
28
PathTargets []discovery.Target `river:"path_targets,attr"`
29
SyncPeriod time.Duration `river:"sync_period,attr,optional"`
30
}
31
32
var _ component.Component = (*Component)(nil)
33
34
// Component implements the discovery.file component.
35
type Component struct {
36
opts component.Options
37
38
mut sync.RWMutex
39
args Arguments
40
watches []watch
41
watchDog *time.Ticker
42
}
43
44
// New creates a new discovery.file component.
45
func New(o component.Options, args Arguments) (*Component, error) {
46
c := &Component{
47
opts: o,
48
mut: sync.RWMutex{},
49
args: args,
50
watches: make([]watch, 0),
51
watchDog: time.NewTicker(args.SyncPeriod),
52
}
53
54
if err := c.Update(args); err != nil {
55
return nil, err
56
}
57
return c, nil
58
}
59
60
func getDefault() Arguments {
61
return Arguments{SyncPeriod: 10 * time.Second}
62
}
63
64
// UnmarshalRiver implements river.Unmarshaler.
65
func (a *Arguments) UnmarshalRiver(f func(interface{}) error) error {
66
*a = getDefault()
67
type arguments Arguments
68
return f((*arguments)(a))
69
}
70
71
// Update satisfies the component interface.
72
func (c *Component) Update(args component.Arguments) error {
73
c.mut.Lock()
74
defer c.mut.Unlock()
75
76
// Check to see if our ticker timer needs to be reset.
77
if args.(Arguments).SyncPeriod != c.args.SyncPeriod {
78
c.watchDog.Reset(c.args.SyncPeriod)
79
}
80
c.args = args.(Arguments)
81
c.watches = c.watches[:0]
82
for _, v := range c.args.PathTargets {
83
c.watches = append(c.watches, watch{
84
target: v,
85
log: c.opts.Logger,
86
})
87
}
88
89
return nil
90
}
91
92
// Run satisfies the component interface.
93
func (c *Component) Run(ctx context.Context) error {
94
update := func() {
95
c.mut.Lock()
96
defer c.mut.Unlock()
97
98
paths := c.getWatchedFiles()
99
// The component node checks to see if exports have actually changed.
100
c.opts.OnStateChange(discovery.Exports{Targets: paths})
101
}
102
// Trigger initial check
103
update()
104
defer c.watchDog.Stop()
105
for {
106
select {
107
case <-c.watchDog.C:
108
// This triggers a check for any new paths, along with pushing new targets.
109
update()
110
case <-ctx.Done():
111
return nil
112
}
113
}
114
}
115
116
func (c *Component) getWatchedFiles() []discovery.Target {
117
paths := make([]discovery.Target, 0)
118
// See if there is anything new we need to check.
119
for _, w := range c.watches {
120
newPaths, err := w.getPaths()
121
if err != nil {
122
level.Error(c.opts.Logger).Log("msg", "error getting paths", "path", w.getPath(), "excluded", w.getExcludePath(), "err", err)
123
}
124
paths = append(paths, newPaths...)
125
}
126
return paths
127
}
128
129