Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/module/file/file.go
4095 views
1
package file
2
3
import (
4
"context"
5
"net/http"
6
"sync"
7
8
"go.uber.org/atomic"
9
10
"github.com/grafana/agent/component"
11
"github.com/grafana/agent/component/local/file"
12
"github.com/grafana/agent/component/module"
13
"github.com/grafana/agent/pkg/river"
14
"github.com/grafana/agent/pkg/river/rivertypes"
15
)
16
17
func init() {
18
component.Register(component.Registration{
19
Name: "module.file",
20
Args: Arguments{},
21
Exports: module.Exports{},
22
23
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
24
return New(opts, args.(Arguments))
25
},
26
})
27
}
28
29
// Arguments holds values which are used to configure the module.file component.
30
type Arguments struct {
31
LocalFileArguments file.Arguments `river:",squash"`
32
33
// Arguments to pass into the module.
34
Arguments map[string]any `river:"arguments,block,optional"`
35
}
36
37
var _ river.Unmarshaler = (*Arguments)(nil)
38
39
// UnmarshalRiver implements river.Unmarshaler.
40
func (a *Arguments) UnmarshalRiver(f func(interface{}) error) error {
41
a.LocalFileArguments = file.DefaultArguments
42
43
type arguments Arguments
44
err := f((*arguments)(a))
45
if err != nil {
46
return err
47
}
48
49
return nil
50
}
51
52
// Component implements the module.file component.
53
type Component struct {
54
opts component.Options
55
mod *module.ModuleComponent
56
57
mut sync.RWMutex
58
args Arguments
59
content rivertypes.OptionalSecret
60
61
managedLocalFile *file.Component
62
inUpdate atomic.Bool
63
isCreated atomic.Bool
64
}
65
66
var (
67
_ component.Component = (*Component)(nil)
68
_ component.HealthComponent = (*Component)(nil)
69
_ component.HTTPComponent = (*Component)(nil)
70
)
71
72
// New creates a new module.file component.
73
func New(o component.Options, args Arguments) (*Component, error) {
74
c := &Component{
75
opts: o,
76
mod: module.NewModuleComponent(o),
77
args: args,
78
}
79
defer c.isCreated.Store(true)
80
81
var err error
82
c.managedLocalFile, err = c.newManagedLocalComponent(o)
83
if err != nil {
84
return nil, err
85
}
86
if err := c.Update(args); err != nil {
87
return nil, err
88
}
89
return c, nil
90
}
91
92
// NewManagedLocalComponent creates the new local.file managed component.
93
func (c *Component) newManagedLocalComponent(o component.Options) (*file.Component, error) {
94
localFileOpts := o
95
localFileOpts.OnStateChange = func(e component.Exports) {
96
c.setContent(e.(file.Exports).Content)
97
98
if !c.inUpdate.Load() && c.isCreated.Load() {
99
// Any errors found here are reported via component health
100
_ = c.mod.LoadFlowContent(c.getArgs().Arguments, c.getContent().Value)
101
}
102
}
103
104
return file.New(localFileOpts, c.getArgs().LocalFileArguments)
105
}
106
107
// Run implements component.Component.
108
func (c *Component) Run(ctx context.Context) error {
109
ctx, cancel := context.WithCancel(ctx)
110
defer cancel()
111
112
ch := make(chan error, 1)
113
go func() {
114
err := c.managedLocalFile.Run(ctx)
115
if err != nil {
116
ch <- err
117
}
118
}()
119
120
go c.mod.RunFlowController(ctx)
121
122
for {
123
select {
124
case <-ctx.Done():
125
return nil
126
case err := <-ch:
127
return err
128
}
129
}
130
}
131
132
// Update implements component.Component.
133
func (c *Component) Update(args component.Arguments) error {
134
c.inUpdate.Store(true)
135
defer c.inUpdate.Store(false)
136
137
newArgs := args.(Arguments)
138
c.setArgs(newArgs)
139
140
err := c.managedLocalFile.Update(newArgs.LocalFileArguments)
141
if err != nil {
142
return err
143
}
144
145
// Force a content load here and bubble up any error. This will catch problems
146
// on initial load.
147
return c.mod.LoadFlowContent(newArgs.Arguments, c.getContent().Value)
148
}
149
150
// Handler implements component.HTTPComponent.
151
func (c *Component) Handler() http.Handler {
152
return c.mod.Handler()
153
}
154
155
// CurrentHealth implements component.HealthComponent.
156
func (c *Component) CurrentHealth() component.Health {
157
leastHealthy := component.LeastHealthy(
158
c.managedLocalFile.CurrentHealth(),
159
c.mod.CurrentHealth(),
160
)
161
162
// if both components are healthy - return c.mod's health, so we can have a stable Health.Message.
163
if leastHealthy.Health == component.HealthTypeHealthy {
164
return c.mod.CurrentHealth()
165
}
166
return leastHealthy
167
}
168
169
// getArgs is a goroutine safe way to get args
170
func (c *Component) getArgs() Arguments {
171
c.mut.RLock()
172
defer c.mut.RUnlock()
173
return c.args
174
}
175
176
// setArgs is a goroutine safe way to set args
177
func (c *Component) setArgs(args Arguments) {
178
c.mut.Lock()
179
c.args = args
180
c.mut.Unlock()
181
}
182
183
// getContent is a goroutine safe way to get content
184
func (c *Component) getContent() rivertypes.OptionalSecret {
185
c.mut.RLock()
186
defer c.mut.RUnlock()
187
return c.content
188
}
189
190
// setContent is a goroutine safe way to set content
191
func (c *Component) setContent(content rivertypes.OptionalSecret) {
192
c.mut.Lock()
193
c.content = content
194
c.mut.Unlock()
195
}
196
197