Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/metrics/cluster/config_watcher.go
4094 views
1
package cluster
2
3
import (
4
"context"
5
"fmt"
6
"sync"
7
"time"
8
9
"github.com/go-kit/log"
10
"github.com/go-kit/log/level"
11
"github.com/grafana/agent/pkg/metrics/instance"
12
"github.com/grafana/agent/pkg/metrics/instance/configstore"
13
"github.com/grafana/agent/pkg/util"
14
"github.com/prometheus/client_golang/prometheus"
15
"github.com/prometheus/client_golang/prometheus/promauto"
16
)
17
18
var (
19
reshardDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
20
Name: "agent_metrics_scraping_service_reshard_duration",
21
Help: "How long it took for resharding to run.",
22
}, []string{"success"})
23
)
24
25
// configWatcher connects to a configstore and will apply configs to an
26
// instance.Manager.
27
type configWatcher struct {
28
log log.Logger
29
30
mut sync.Mutex
31
cfg Config
32
stopped bool
33
stop context.CancelFunc
34
35
store configstore.Store
36
im instance.Manager
37
owns OwnershipFunc
38
validate ValidationFunc
39
40
refreshCh chan struct{}
41
instanceMut sync.Mutex
42
instances map[string]struct{}
43
}
44
45
// OwnershipFunc should determine if a given keep is owned by the caller.
46
type OwnershipFunc = func(key string) (bool, error)
47
48
// ValidationFunc should validate a config.
49
type ValidationFunc = func(*instance.Config) error
50
51
// newConfigWatcher watches store for changes and checks for each config against
52
// owns. It will also poll the configstore at a configurable interval.
53
func newConfigWatcher(log log.Logger, cfg Config, store configstore.Store, im instance.Manager, owns OwnershipFunc, validate ValidationFunc) (*configWatcher, error) {
54
ctx, cancel := context.WithCancel(context.Background())
55
56
w := &configWatcher{
57
log: log,
58
59
stop: cancel,
60
61
store: store,
62
im: im,
63
owns: owns,
64
validate: validate,
65
66
refreshCh: make(chan struct{}, 1),
67
instances: make(map[string]struct{}),
68
}
69
if err := w.ApplyConfig(cfg); err != nil {
70
return nil, err
71
}
72
// Delay duration, this is to prevent a race condition, see method for details
73
delay := cfg.Lifecycler.HeartbeatPeriod * 3
74
go w.run(ctx, delay)
75
return w, nil
76
}
77
78
func (w *configWatcher) ApplyConfig(cfg Config) error {
79
w.mut.Lock()
80
defer w.mut.Unlock()
81
82
if util.CompareYAML(w.cfg, cfg) {
83
return nil
84
}
85
86
if w.stopped {
87
return fmt.Errorf("configWatcher already stopped")
88
}
89
90
w.cfg = cfg
91
return nil
92
}
93
94
func (w *configWatcher) run(ctx context.Context, delay time.Duration) {
95
defer level.Info(w.log).Log("msg", "config watcher run loop exiting")
96
// This is due to a race condition between the heartbeat and config ring in a very narrow set of circumstances
97
// https://gist.github.com/mattdurham/c15f27de17a6da97bf2e6a870991c7f2
98
time.Sleep(delay)
99
lastReshard := time.Now()
100
101
for {
102
select {
103
case <-ctx.Done():
104
return
105
case <-w.nextReshard(lastReshard):
106
level.Debug(w.log).Log("msg", "reshard timer ticked, scheduling refresh")
107
w.RequestRefresh()
108
lastReshard = time.Now()
109
case <-w.refreshCh:
110
err := w.refresh(ctx)
111
if err != nil {
112
level.Error(w.log).Log("msg", "refresh failed", "err", err)
113
}
114
case ev := <-w.store.Watch():
115
level.Debug(w.log).Log("msg", "handling event from config store")
116
if err := w.handleEvent(ev); err != nil {
117
level.Error(w.log).Log("msg", "failed to handle changed or deleted config", "key", ev.Key, "err", err)
118
}
119
}
120
}
121
}
122
123
// nextReshard returns a channel to that will fill a value when the reshard
124
// interval has elapsed.
125
func (w *configWatcher) nextReshard(lastReshard time.Time) <-chan time.Time {
126
w.mut.Lock()
127
nextReshard := lastReshard.Add(w.cfg.ReshardInterval)
128
w.mut.Unlock()
129
130
remaining := time.Until(nextReshard)
131
132
// NOTE(rfratto): clamping to 0 isn't necessary for time.After,
133
// but it makes the log message clearer to always use "0s" as
134
// "next reshard will be scheduled immediately."
135
if remaining < 0 {
136
remaining = 0
137
}
138
139
level.Debug(w.log).Log("msg", "waiting for next reshard interval", "last_reshard", lastReshard, "next_reshard", nextReshard, "remaining", remaining)
140
return time.After(remaining)
141
}
142
143
// RequestRefresh will queue a refresh. No more than one refresh can be queued at a time.
144
func (w *configWatcher) RequestRefresh() {
145
select {
146
case w.refreshCh <- struct{}{}:
147
level.Debug(w.log).Log("msg", "successfully scheduled a refresh")
148
default:
149
level.Debug(w.log).Log("msg", "ignoring request refresh: refresh already scheduled")
150
}
151
}
152
153
// refresh reloads all configs from the configstore. Deleted configs will be
154
// removed. refresh may not be called concurrently and must only be invoked from run.
155
// Call RequestRefresh to queue a call to refresh.
156
func (w *configWatcher) refresh(ctx context.Context) (err error) {
157
w.mut.Lock()
158
enabled := w.cfg.Enabled
159
refreshTimeout := w.cfg.ReshardTimeout
160
w.mut.Unlock()
161
162
if !enabled {
163
level.Debug(w.log).Log("msg", "refresh skipped because clustering is disabled")
164
return nil
165
}
166
level.Info(w.log).Log("msg", "starting refresh")
167
168
if refreshTimeout > 0 {
169
var cancel context.CancelFunc
170
ctx, cancel = context.WithTimeout(ctx, refreshTimeout)
171
defer cancel()
172
}
173
174
start := time.Now()
175
defer func() {
176
success := "1"
177
if err != nil {
178
success = "0"
179
}
180
duration := time.Since(start)
181
level.Info(w.log).Log("msg", "refresh finished", "duration", duration, "success", success, "err", err)
182
reshardDuration.WithLabelValues(success).Observe(duration.Seconds())
183
}()
184
185
// This is used to determine if the context was already exceeded before calling the kv provider
186
if err = ctx.Err(); err != nil {
187
level.Error(w.log).Log("msg", "context deadline exceeded before calling store.all", "err", err)
188
return err
189
}
190
deadline, _ := ctx.Deadline()
191
level.Debug(w.log).Log("msg", "deadline before store.all", "deadline", deadline)
192
configs, err := w.store.All(ctx, func(key string) bool {
193
owns, err := w.owns(key)
194
if err != nil {
195
level.Error(w.log).Log("msg", "failed to check for ownership, instance will be deleted if it is running", "key", key, "err", err)
196
return false
197
}
198
return owns
199
})
200
level.Debug(w.log).Log("msg", "count of configs from store.all", "count", len(configs))
201
202
if err != nil {
203
return fmt.Errorf("failed to get configs from store: %w", err)
204
}
205
206
var (
207
keys = make(map[string]struct{})
208
firstError error
209
)
210
211
Outer:
212
for {
213
select {
214
case <-ctx.Done():
215
return ctx.Err()
216
case cfg, ok := <-configs:
217
// w.store.All will close configs when all of them have been read.
218
if !ok {
219
break Outer
220
}
221
222
if err := w.handleEvent(configstore.WatchEvent{Key: cfg.Name, Config: &cfg}); err != nil {
223
level.Error(w.log).Log("msg", "failed to process changed config", "key", cfg.Name, "err", err)
224
if firstError == nil {
225
firstError = err
226
}
227
}
228
229
keys[cfg.Name] = struct{}{}
230
}
231
}
232
233
// Any config we used to be running that disappeared from this most recent
234
// iteration should be deleted. We hold the lock just for the duration of
235
// populating deleted because handleEvent also grabs a hold on the lock.
236
var deleted []string
237
w.instanceMut.Lock()
238
for key := range w.instances {
239
if _, exist := keys[key]; exist {
240
continue
241
}
242
deleted = append(deleted, key)
243
}
244
w.instanceMut.Unlock()
245
246
// Send a deleted event for any key that has gone away.
247
for _, key := range deleted {
248
if err := w.handleEvent(configstore.WatchEvent{Key: key, Config: nil}); err != nil {
249
level.Error(w.log).Log("msg", "failed to process changed config", "key", key, "err", err)
250
}
251
}
252
253
return firstError
254
}
255
256
func (w *configWatcher) handleEvent(ev configstore.WatchEvent) error {
257
w.mut.Lock()
258
defer w.mut.Unlock()
259
260
if w.stopped {
261
return fmt.Errorf("configWatcher stopped")
262
}
263
264
w.instanceMut.Lock()
265
defer w.instanceMut.Unlock()
266
267
owned, err := w.owns(ev.Key)
268
if err != nil {
269
level.Error(w.log).Log("msg", "failed to see if config is owned. instance will be deleted if it is running", "err", err)
270
}
271
272
var (
273
_, isRunning = w.instances[ev.Key]
274
isDeleted = ev.Config == nil
275
)
276
277
switch {
278
// Two deletion scenarios:
279
// 1. A config we're running got moved to a new owner.
280
// 2. A config we're running got deleted
281
case (isRunning && !owned) || (isDeleted && isRunning):
282
if isDeleted {
283
level.Info(w.log).Log("msg", "untracking deleted config", "key", ev.Key)
284
} else {
285
level.Info(w.log).Log("msg", "untracking config that changed owners", "key", ev.Key)
286
}
287
288
err := w.im.DeleteConfig(ev.Key)
289
delete(w.instances, ev.Key)
290
if err != nil {
291
return fmt.Errorf("failed to delete: %w", err)
292
}
293
294
case !isDeleted && owned:
295
if err := w.validate(ev.Config); err != nil {
296
return fmt.Errorf(
297
"failed to validate config. %[1]s cannot run until the global settings are adjusted or the config is adjusted to operate within the global constraints. error: %[2]w",
298
ev.Key, err,
299
)
300
}
301
302
if _, exist := w.instances[ev.Key]; !exist {
303
level.Info(w.log).Log("msg", "tracking new config", "key", ev.Key)
304
}
305
306
if err := w.im.ApplyConfig(*ev.Config); err != nil {
307
return fmt.Errorf("failed to apply config: %w", err)
308
}
309
w.instances[ev.Key] = struct{}{}
310
}
311
312
return nil
313
}
314
315
// Stop stops the configWatcher. Cannot be called more than once.
316
func (w *configWatcher) Stop() error {
317
w.mut.Lock()
318
defer w.mut.Unlock()
319
320
if w.stopped {
321
return fmt.Errorf("already stopped")
322
}
323
w.stop()
324
w.stopped = true
325
326
// Shut down all the instances that this configWatcher managed. It *MUST*
327
// happen after w.stop() is called to prevent the run loop from applying any
328
// new configs.
329
w.instanceMut.Lock()
330
defer w.instanceMut.Unlock()
331
332
for key := range w.instances {
333
if err := w.im.DeleteConfig(key); err != nil {
334
level.Warn(w.log).Log("msg", "failed deleting config on shutdown", "key", key, "err", err)
335
}
336
}
337
w.instances = make(map[string]struct{})
338
339
return nil
340
}
341
342