Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/metrics/instance/configstore/remote.go
5340 views
1
package configstore
2
3
import (
4
"context"
5
"errors"
6
"fmt"
7
"net/http"
8
"strings"
9
"sync"
10
11
"github.com/weaveworks/common/instrument"
12
13
"github.com/hashicorp/go-cleanhttp"
14
15
"github.com/hashicorp/consul/api"
16
17
"github.com/go-kit/log"
18
"github.com/go-kit/log/level"
19
"github.com/grafana/agent/pkg/metrics/instance"
20
"github.com/grafana/agent/pkg/util"
21
"github.com/grafana/dskit/kv"
22
"github.com/prometheus/client_golang/prometheus"
23
"github.com/prometheus/client_golang/prometheus/promauto"
24
)
25
26
/***********************************************************************************************************************
27
The consul code skipping the cortex handler is due to performance issue with a large number of configs and overloading
28
consul. See issue https://github.com/grafana/agent/issues/789. The long term method will be to refactor and extract
29
the cortex code so other stores can also benefit from this. @mattdurham
30
***********************************************************************************************************************/
31
32
var consulRequestDuration = instrument.NewHistogramCollector(promauto.NewHistogramVec(prometheus.HistogramOpts{
33
Name: "agent_configstore_consul_request_duration_seconds",
34
Help: "Time spent on consul requests when listing configs.",
35
Buckets: prometheus.DefBuckets,
36
}, []string{"operation", "status_code"}))
37
38
// Remote loads instance files from a remote KV store. The KV store
39
// can be swapped out in real time.
40
type Remote struct {
41
log log.Logger
42
reg *util.Unregisterer
43
44
kvMut sync.RWMutex
45
kv *agentRemoteClient
46
reloadKV chan struct{}
47
48
cancelCtx context.Context
49
cancelFunc context.CancelFunc
50
51
configsMut sync.Mutex
52
configsCh chan WatchEvent
53
}
54
55
// agentRemoteClient is a simple wrapper to allow the shortcircuit of consul, while being backwards compatible with non
56
// consul kv stores
57
type agentRemoteClient struct {
58
kv.Client
59
consul *api.Client
60
config kv.Config
61
}
62
63
// NewRemote creates a new Remote store that uses a Key-Value client to store
64
// and retrieve configs. If enable is true, the store will be immediately
65
// connected to. Otherwise, it can be lazily loaded by enabling later through
66
// a call to Remote.ApplyConfig.
67
func NewRemote(l log.Logger, reg prometheus.Registerer, cfg kv.Config, enable bool) (*Remote, error) {
68
cancelCtx, cancelFunc := context.WithCancel(context.Background())
69
70
r := &Remote{
71
log: l,
72
reg: util.WrapWithUnregisterer(reg),
73
74
reloadKV: make(chan struct{}, 1),
75
76
cancelCtx: cancelCtx,
77
cancelFunc: cancelFunc,
78
79
configsCh: make(chan WatchEvent),
80
}
81
if err := r.ApplyConfig(cfg, enable); err != nil {
82
return nil, fmt.Errorf("failed to apply config for config store: %w", err)
83
}
84
85
go r.run()
86
return r, nil
87
}
88
89
// ApplyConfig applies the config for a kv client.
90
func (r *Remote) ApplyConfig(cfg kv.Config, enable bool) error {
91
r.kvMut.Lock()
92
defer r.kvMut.Unlock()
93
94
if r.cancelCtx.Err() != nil {
95
return fmt.Errorf("remote store already stopped")
96
}
97
98
// Unregister all metrics that the previous kv may have registered.
99
r.reg.UnregisterAll()
100
101
if !enable {
102
r.setClient(nil, nil, kv.Config{})
103
return nil
104
}
105
106
cli, err := kv.NewClient(cfg, GetCodec(), kv.RegistererWithKVName(r.reg, "agent_configs"), r.log)
107
// This is a hack to get a consul client, the client above has it embedded but it's not exposed
108
var consulClient *api.Client
109
if cfg.Store == "consul" {
110
consulClient, err = api.NewClient(&api.Config{
111
Address: cfg.Consul.Host,
112
Token: cfg.Consul.ACLToken.String(),
113
Scheme: "http",
114
HttpClient: &http.Client{
115
Transport: cleanhttp.DefaultPooledTransport(),
116
// See https://blog.cloudflare.com/the-complete-guide-to-golang-net-http-timeouts/
117
Timeout: cfg.Consul.HTTPClientTimeout,
118
},
119
})
120
if err != nil {
121
return err
122
}
123
}
124
125
if err != nil {
126
return fmt.Errorf("failed to create kv client: %w", err)
127
}
128
129
r.setClient(cli, consulClient, cfg)
130
return nil
131
}
132
133
// setClient sets the active client and notifies run to restart the
134
// kv watcher.
135
func (r *Remote) setClient(client kv.Client, consulClient *api.Client, config kv.Config) {
136
if client == nil && consulClient == nil {
137
r.kv = nil
138
} else {
139
r.kv = &agentRemoteClient{
140
Client: client,
141
consul: consulClient,
142
config: config,
143
}
144
}
145
r.reloadKV <- struct{}{}
146
}
147
148
func (r *Remote) run() {
149
var (
150
kvContext context.Context
151
kvCancel context.CancelFunc
152
)
153
154
Outer:
155
for {
156
select {
157
case <-r.cancelCtx.Done():
158
break Outer
159
case <-r.reloadKV:
160
r.kvMut.RLock()
161
kv := r.kv
162
r.kvMut.RUnlock()
163
164
if kvCancel != nil {
165
kvCancel()
166
}
167
kvContext, kvCancel = context.WithCancel(r.cancelCtx)
168
go r.watchKV(kvContext, kv)
169
}
170
}
171
172
if kvCancel != nil {
173
kvCancel()
174
}
175
}
176
177
func (r *Remote) watchKV(ctx context.Context, client *agentRemoteClient) {
178
// Edge case: client was unset, nothing to do here.
179
if client == nil {
180
level.Info(r.log).Log("msg", "not watching the KV, none set")
181
return
182
}
183
184
client.WatchPrefix(ctx, "", func(key string, v interface{}) bool {
185
if ctx.Err() != nil {
186
return false
187
}
188
189
r.configsMut.Lock()
190
defer r.configsMut.Unlock()
191
192
switch {
193
case v == nil:
194
r.configsCh <- WatchEvent{Key: key, Config: nil}
195
default:
196
cfg, err := instance.UnmarshalConfig(strings.NewReader(v.(string)))
197
if err != nil {
198
level.Error(r.log).Log("msg", "could not unmarshal config from store", "name", key, "err", err)
199
break
200
}
201
202
r.configsCh <- WatchEvent{Key: key, Config: cfg}
203
}
204
205
return true
206
})
207
}
208
209
// List returns the list of all configs in the KV store.
210
func (r *Remote) List(ctx context.Context) ([]string, error) {
211
r.kvMut.RLock()
212
defer r.kvMut.RUnlock()
213
if r.kv == nil {
214
return nil, ErrNotConnected
215
}
216
217
return r.kv.List(ctx, "")
218
}
219
220
// listConsul returns Key Value Pairs instead of []string
221
func (r *Remote) listConsul(ctx context.Context) (api.KVPairs, error) {
222
if r.kv == nil {
223
return nil, ErrNotConnected
224
}
225
226
var pairs api.KVPairs
227
options := &api.QueryOptions{
228
AllowStale: !r.kv.config.Consul.ConsistentReads,
229
RequireConsistent: r.kv.config.Consul.ConsistentReads,
230
}
231
// This is copied from cortex list so that stats stay the same
232
err := instrument.CollectedRequest(ctx, "List", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
233
var err error
234
pairs, _, err = r.kv.consul.KV().List(r.kv.config.Prefix, options.WithContext(ctx))
235
return err
236
})
237
238
if err != nil {
239
return nil, err
240
}
241
// This mirrors the previous behavior of returning a blank array as opposed to nil.
242
if pairs == nil {
243
blankPairs := make(api.KVPairs, 0)
244
return blankPairs, nil
245
}
246
for _, kvp := range pairs {
247
kvp.Key = strings.TrimPrefix(kvp.Key, r.kv.config.Prefix)
248
}
249
return pairs, nil
250
}
251
252
// Get retrieves an individual config from the KV store.
253
func (r *Remote) Get(ctx context.Context, key string) (instance.Config, error) {
254
r.kvMut.RLock()
255
defer r.kvMut.RUnlock()
256
if r.kv == nil {
257
return instance.Config{}, ErrNotConnected
258
}
259
260
v, err := r.kv.Get(ctx, key)
261
if err != nil {
262
return instance.Config{}, fmt.Errorf("failed to get config %s: %w", key, err)
263
} else if v == nil {
264
return instance.Config{}, NotExistError{Key: key}
265
}
266
267
cfg, err := instance.UnmarshalConfig(strings.NewReader(v.(string)))
268
if err != nil {
269
return instance.Config{}, fmt.Errorf("failed to unmarshal config %s: %w", key, err)
270
}
271
return *cfg, nil
272
}
273
274
// Put adds or updates a config in the KV store.
275
func (r *Remote) Put(ctx context.Context, c instance.Config) (bool, error) {
276
// We need to use a write lock here since two Applies can't run concurrently
277
// (given the current need to perform a store-wide validation.)
278
r.kvMut.Lock()
279
defer r.kvMut.Unlock()
280
if r.kv == nil {
281
return false, ErrNotConnected
282
}
283
284
bb, err := instance.MarshalConfig(&c, false)
285
if err != nil {
286
return false, fmt.Errorf("failed to marshal config: %w", err)
287
}
288
289
cfgCh, err := r.all(ctx, nil)
290
if err != nil {
291
return false, fmt.Errorf("failed to check validity of config: %w", err)
292
}
293
if err := checkUnique(cfgCh, &c); err != nil {
294
return false, fmt.Errorf("failed to check uniqueness of config: %w", err)
295
}
296
297
var created bool
298
err = r.kv.CAS(ctx, c.Name, func(in interface{}) (out interface{}, retry bool, err error) {
299
// The configuration is new if there's no previous value from the CAS
300
created = (in == nil)
301
return string(bb), false, nil
302
})
303
if err != nil {
304
return false, fmt.Errorf("failed to put config: %w", err)
305
}
306
return created, nil
307
}
308
309
// Delete deletes a config from the KV store. It returns NotExistError if
310
// the config doesn't exist.
311
func (r *Remote) Delete(ctx context.Context, key string) error {
312
r.kvMut.RLock()
313
defer r.kvMut.RUnlock()
314
if r.kv == nil {
315
return ErrNotConnected
316
}
317
318
// Some KV stores don't return an error if something failed to be
319
// deleted, so we'll try to get it first. This isn't perfect, and
320
// it may fail, so we'll silently ignore any errors here unless
321
// we know for sure the config doesn't exist.
322
v, err := r.kv.Get(ctx, key)
323
if err != nil {
324
level.Warn(r.log).Log("msg", "error validating key existence for deletion", "err", err)
325
} else if v == nil {
326
return NotExistError{Key: key}
327
}
328
329
err = r.kv.Delete(ctx, key)
330
if err != nil {
331
return fmt.Errorf("error deleting configuration: %w", err)
332
}
333
334
return nil
335
}
336
337
// All retrieves the set of all configs in the store.
338
func (r *Remote) All(ctx context.Context, keep func(key string) bool) (<-chan instance.Config, error) {
339
r.kvMut.RLock()
340
defer r.kvMut.RUnlock()
341
return r.all(ctx, keep)
342
}
343
344
// all can only be called if the kvMut lock is already held.
345
func (r *Remote) all(ctx context.Context, keep func(key string) bool) (<-chan instance.Config, error) {
346
if r.kv == nil {
347
return nil, ErrNotConnected
348
}
349
350
// If we are using a consul client then do the short circuit way, this is done so that we receive all the key value pairs
351
// in one call then, operate on them in memory. Previously we retrieved the list (which stripped the values)
352
// then ran a goroutine to get each individual value from consul. In situations with an extremely large number of
353
// configs this overloaded the consul instances. This reduces that to one call, that was being made anyways.
354
if r.kv.consul != nil {
355
return r.allConsul(ctx, keep)
356
}
357
358
return r.allOther(ctx, keep)
359
}
360
361
// allConsul is ONLY usable when consul is the keystore. This is a performance improvement in using the client directly
362
//
363
// instead of the cortex multi store kv interface. That interface returns the list then each value must be retrieved
364
// individually. This returns all the keys and values in one call and works on them in memory
365
func (r *Remote) allConsul(ctx context.Context, keep func(key string) bool) (<-chan instance.Config, error) {
366
if r.kv.consul == nil {
367
level.Error(r.log).Log("err", "allConsul called but consul client nil")
368
return nil, errors.New("allConsul called but consul client nil")
369
}
370
var configs []*instance.Config
371
c := GetCodec()
372
373
pairs, err := r.listConsul(ctx)
374
375
if err != nil {
376
return nil, err
377
}
378
for _, kvp := range pairs {
379
if keep != nil && !keep(kvp.Key) {
380
level.Debug(r.log).Log("msg", "skipping key that was filtered out", "key", kvp.Key)
381
continue
382
}
383
value, err := c.Decode(kvp.Value)
384
if err != nil {
385
level.Error(r.log).Log("msg", "failed to decode config from store", "key", kvp.Key, "err", err)
386
continue
387
}
388
if value == nil {
389
// Config was deleted since we called list, skip it.
390
level.Debug(r.log).Log("msg", "skipping key that was deleted after list was called", "key", kvp.Key)
391
continue
392
}
393
394
cfg, err := instance.UnmarshalConfig(strings.NewReader(value.(string)))
395
if err != nil {
396
level.Error(r.log).Log("msg", "failed to unmarshal config from store", "key", kvp.Key, "err", err)
397
continue
398
}
399
configs = append(configs, cfg)
400
}
401
ch := make(chan instance.Config, len(configs))
402
for _, cfg := range configs {
403
ch <- *cfg
404
}
405
close(ch)
406
return ch, nil
407
}
408
409
func (r *Remote) allOther(ctx context.Context, keep func(key string) bool) (<-chan instance.Config, error) {
410
if r.kv == nil {
411
return nil, ErrNotConnected
412
}
413
414
keys, err := r.kv.List(ctx, "")
415
if err != nil {
416
return nil, fmt.Errorf("failed to list configs: %w", err)
417
}
418
419
ch := make(chan instance.Config)
420
421
var wg sync.WaitGroup
422
wg.Add(len(keys))
423
go func() {
424
wg.Wait()
425
close(ch)
426
}()
427
428
for _, key := range keys {
429
go func(key string) {
430
defer wg.Done()
431
432
if keep != nil && !keep(key) {
433
level.Debug(r.log).Log("msg", "skipping key that was filtered out", "key", key)
434
return
435
}
436
437
// TODO(rfratto): retries might be useful here
438
v, err := r.kv.Get(ctx, key)
439
if err != nil {
440
level.Error(r.log).Log("msg", "failed to get config with key", "key", key, "err", err)
441
return
442
} else if v == nil {
443
// Config was deleted since we called list, skip it.
444
level.Debug(r.log).Log("msg", "skipping key that was deleted after list was called", "key", key)
445
return
446
}
447
448
cfg, err := instance.UnmarshalConfig(strings.NewReader(v.(string)))
449
if err != nil {
450
level.Error(r.log).Log("msg", "failed to unmarshal config from store", "key", key, "err", err)
451
return
452
}
453
ch <- *cfg
454
}(key)
455
}
456
457
return ch, nil
458
}
459
460
// Watch watches the Store for changes.
461
func (r *Remote) Watch() <-chan WatchEvent {
462
return r.configsCh
463
}
464
465
// Close closes the Remote store.
466
func (r *Remote) Close() error {
467
r.kvMut.Lock()
468
defer r.kvMut.Unlock()
469
r.cancelFunc()
470
return nil
471
}
472
473