Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/module/git/git.go
4096 views
1
// Package git implements the module.git component.
2
package git
3
4
import (
5
"context"
6
"net/http"
7
"path/filepath"
8
"reflect"
9
"sync"
10
"time"
11
12
"github.com/go-kit/log"
13
"github.com/go-kit/log/level"
14
"github.com/grafana/agent/component"
15
"github.com/grafana/agent/component/module"
16
"github.com/grafana/agent/component/module/git/internal/vcs"
17
"github.com/grafana/agent/pkg/river"
18
)
19
20
func init() {
21
component.Register(component.Registration{
22
Name: "module.git",
23
Args: Arguments{},
24
Exports: module.Exports{},
25
26
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
27
return New(opts, args.(Arguments))
28
},
29
})
30
}
31
32
// Arguments configures the module.git component.
33
type Arguments struct {
34
Repository string `river:"repository,attr"`
35
Revision string `river:"revision,attr,optional"`
36
Path string `river:"path,attr"`
37
38
PullFrequency time.Duration `river:"pull_frequency,attr,optional"`
39
40
Arguments map[string]any `river:"arguments,block,optional"`
41
}
42
43
// DefaultArguments holds default settings for Arguments.
44
var DefaultArguments = Arguments{
45
Revision: "HEAD",
46
PullFrequency: time.Minute,
47
}
48
49
var _ river.Unmarshaler = (*Arguments)(nil)
50
51
// UnmarshalRiver implements river.Unmarshaler and applies default settings to
52
// args.
53
func (args *Arguments) UnmarshalRiver(f func(interface{}) error) error {
54
*args = DefaultArguments
55
56
type arguments Arguments
57
return f((*arguments)(args))
58
}
59
60
// Component implements the module.git component.
61
type Component struct {
62
opts component.Options
63
log log.Logger
64
mod *module.ModuleComponent
65
66
mut sync.RWMutex
67
repo *vcs.GitRepo
68
repoOpts vcs.GitRepoOptions
69
args Arguments
70
71
argsChanged chan struct{}
72
73
healthMut sync.RWMutex
74
health component.Health
75
}
76
77
var (
78
_ component.Component = (*Component)(nil)
79
_ component.HealthComponent = (*Component)(nil)
80
_ component.HTTPComponent = (*Component)(nil)
81
)
82
83
// New creates a new module.git component.
84
func New(o component.Options, args Arguments) (*Component, error) {
85
c := &Component{
86
opts: o,
87
log: o.Logger,
88
89
mod: module.NewModuleComponent(o),
90
91
argsChanged: make(chan struct{}, 1),
92
}
93
94
if err := c.Update(args); err != nil {
95
return nil, err
96
}
97
return c, nil
98
}
99
100
// Run implements component.Component.
101
func (c *Component) Run(ctx context.Context) error {
102
ctx, cancel := context.WithCancel(ctx)
103
defer cancel()
104
105
go c.mod.RunFlowController(ctx)
106
107
var (
108
ticker *time.Ticker
109
tickerC <-chan time.Time
110
)
111
112
for {
113
select {
114
case <-ctx.Done():
115
return nil
116
117
case <-c.argsChanged:
118
c.mut.Lock()
119
{
120
level.Info(c.log).Log("msg", "updating repository pull frequency", "new_frequency", c.args.PullFrequency)
121
122
if c.args.PullFrequency > 0 {
123
if ticker == nil {
124
ticker = time.NewTicker(c.args.PullFrequency)
125
tickerC = ticker.C
126
} else {
127
ticker.Reset(c.args.PullFrequency)
128
}
129
} else {
130
if ticker != nil {
131
ticker.Stop()
132
}
133
ticker = nil
134
tickerC = nil
135
}
136
}
137
c.mut.Unlock()
138
139
case <-tickerC:
140
level.Info(c.log).Log("msg", "updating repository", "new_frequency", c.args.PullFrequency)
141
c.tickPollFile(ctx)
142
}
143
}
144
}
145
146
func (c *Component) tickPollFile(ctx context.Context) {
147
c.mut.Lock()
148
err := c.pollFile(ctx, c.args)
149
c.mut.Unlock()
150
151
c.updateHealth(err)
152
}
153
154
func (c *Component) updateHealth(err error) {
155
c.healthMut.Lock()
156
defer c.healthMut.Unlock()
157
158
if err != nil {
159
c.health = component.Health{
160
Health: component.HealthTypeUnhealthy,
161
Message: err.Error(),
162
UpdateTime: time.Now(),
163
}
164
} else {
165
c.health = component.Health{
166
Health: component.HealthTypeHealthy,
167
Message: "module updated",
168
UpdateTime: time.Now(),
169
}
170
}
171
}
172
173
// Update implements component.Component.
174
func (c *Component) Update(args component.Arguments) (err error) {
175
defer func() {
176
c.updateHealth(err)
177
}()
178
179
c.mut.Lock()
180
defer c.mut.Unlock()
181
182
newArgs := args.(Arguments)
183
184
// TODO(rfratto): store in a repo-specific directory so changing repositories
185
// doesn't risk break the module loader if there's a SHA collision between
186
// the two different repositories.
187
repoPath := filepath.Join(c.opts.DataPath, "repo")
188
189
repoOpts := vcs.GitRepoOptions{
190
Repository: newArgs.Repository,
191
Revision: newArgs.Revision,
192
}
193
194
// Create or update the repo field.
195
if c.repo == nil || !reflect.DeepEqual(repoOpts, c.repoOpts) {
196
r, err := vcs.NewGitRepo(context.Background(), repoPath, repoOpts)
197
if err != nil {
198
return err
199
}
200
c.repo = r
201
c.repoOpts = repoOpts
202
}
203
204
if err := c.pollFile(context.Background(), newArgs); err != nil {
205
return err
206
}
207
208
// Schedule an update for handling the changed arguments.
209
select {
210
case c.argsChanged <- struct{}{}:
211
default:
212
}
213
214
c.args = newArgs
215
return nil
216
}
217
218
// pollFile fetches the latest content from the repository and updates the
219
// controller. pollFile must only be called with c.mut held.
220
func (c *Component) pollFile(ctx context.Context, args Arguments) error {
221
// Make sure our repo is up-to-date.
222
if err := c.repo.Update(ctx); err != nil {
223
return err
224
}
225
226
// Finally, configure our controller.
227
bb, err := c.repo.ReadFile(args.Path)
228
if err != nil {
229
return err
230
}
231
232
return c.mod.LoadFlowContent(args.Arguments, string(bb))
233
}
234
235
// CurrentHealth implements component.HealthComponent.
236
func (c *Component) CurrentHealth() component.Health {
237
c.healthMut.RLock()
238
defer c.healthMut.RUnlock()
239
240
return component.LeastHealthy(c.health, c.mod.CurrentHealth())
241
}
242
243
// Handler implements component.HTTPComponent.
244
func (c *Component) Handler() http.Handler {
245
return c.mod.Handler()
246
}
247
248
// DebugInfo implements component.DebugComponent.
249
func (c *Component) DebugInfo() interface{} {
250
type DebugInfo struct {
251
SHA string `river:"sha,attr"`
252
RepoError string `river:"repo_error,attr,optional"`
253
}
254
255
c.mut.RLock()
256
defer c.mut.RUnlock()
257
258
rev, err := c.repo.CurrentRevision()
259
if err != nil {
260
return DebugInfo{RepoError: err.Error()}
261
} else {
262
return DebugInfo{SHA: rev}
263
}
264
}
265
266