Path: blob/main/pkg/metrics/instance/configstore/remote.go
5340 views
package configstore12import (3"context"4"errors"5"fmt"6"net/http"7"strings"8"sync"910"github.com/weaveworks/common/instrument"1112"github.com/hashicorp/go-cleanhttp"1314"github.com/hashicorp/consul/api"1516"github.com/go-kit/log"17"github.com/go-kit/log/level"18"github.com/grafana/agent/pkg/metrics/instance"19"github.com/grafana/agent/pkg/util"20"github.com/grafana/dskit/kv"21"github.com/prometheus/client_golang/prometheus"22"github.com/prometheus/client_golang/prometheus/promauto"23)2425/***********************************************************************************************************************26The consul code skipping the cortex handler is due to performance issue with a large number of configs and overloading27consul. See issue https://github.com/grafana/agent/issues/789. The long term method will be to refactor and extract28the cortex code so other stores can also benefit from this. @mattdurham29***********************************************************************************************************************/3031var consulRequestDuration = instrument.NewHistogramCollector(promauto.NewHistogramVec(prometheus.HistogramOpts{32Name: "agent_configstore_consul_request_duration_seconds",33Help: "Time spent on consul requests when listing configs.",34Buckets: prometheus.DefBuckets,35}, []string{"operation", "status_code"}))3637// Remote loads instance files from a remote KV store. The KV store38// can be swapped out in real time.39type Remote struct {40log log.Logger41reg *util.Unregisterer4243kvMut sync.RWMutex44kv *agentRemoteClient45reloadKV chan struct{}4647cancelCtx context.Context48cancelFunc context.CancelFunc4950configsMut sync.Mutex51configsCh chan WatchEvent52}5354// agentRemoteClient is a simple wrapper to allow the shortcircuit of consul, while being backwards compatible with non55// consul kv stores56type agentRemoteClient struct {57kv.Client58consul *api.Client59config kv.Config60}6162// NewRemote creates a new Remote store that uses a Key-Value client to store63// and retrieve configs. If enable is true, the store will be immediately64// connected to. Otherwise, it can be lazily loaded by enabling later through65// a call to Remote.ApplyConfig.66func NewRemote(l log.Logger, reg prometheus.Registerer, cfg kv.Config, enable bool) (*Remote, error) {67cancelCtx, cancelFunc := context.WithCancel(context.Background())6869r := &Remote{70log: l,71reg: util.WrapWithUnregisterer(reg),7273reloadKV: make(chan struct{}, 1),7475cancelCtx: cancelCtx,76cancelFunc: cancelFunc,7778configsCh: make(chan WatchEvent),79}80if err := r.ApplyConfig(cfg, enable); err != nil {81return nil, fmt.Errorf("failed to apply config for config store: %w", err)82}8384go r.run()85return r, nil86}8788// ApplyConfig applies the config for a kv client.89func (r *Remote) ApplyConfig(cfg kv.Config, enable bool) error {90r.kvMut.Lock()91defer r.kvMut.Unlock()9293if r.cancelCtx.Err() != nil {94return fmt.Errorf("remote store already stopped")95}9697// Unregister all metrics that the previous kv may have registered.98r.reg.UnregisterAll()99100if !enable {101r.setClient(nil, nil, kv.Config{})102return nil103}104105cli, err := kv.NewClient(cfg, GetCodec(), kv.RegistererWithKVName(r.reg, "agent_configs"), r.log)106// This is a hack to get a consul client, the client above has it embedded but it's not exposed107var consulClient *api.Client108if cfg.Store == "consul" {109consulClient, err = api.NewClient(&api.Config{110Address: cfg.Consul.Host,111Token: cfg.Consul.ACLToken.String(),112Scheme: "http",113HttpClient: &http.Client{114Transport: cleanhttp.DefaultPooledTransport(),115// See https://blog.cloudflare.com/the-complete-guide-to-golang-net-http-timeouts/116Timeout: cfg.Consul.HTTPClientTimeout,117},118})119if err != nil {120return err121}122}123124if err != nil {125return fmt.Errorf("failed to create kv client: %w", err)126}127128r.setClient(cli, consulClient, cfg)129return nil130}131132// setClient sets the active client and notifies run to restart the133// kv watcher.134func (r *Remote) setClient(client kv.Client, consulClient *api.Client, config kv.Config) {135if client == nil && consulClient == nil {136r.kv = nil137} else {138r.kv = &agentRemoteClient{139Client: client,140consul: consulClient,141config: config,142}143}144r.reloadKV <- struct{}{}145}146147func (r *Remote) run() {148var (149kvContext context.Context150kvCancel context.CancelFunc151)152153Outer:154for {155select {156case <-r.cancelCtx.Done():157break Outer158case <-r.reloadKV:159r.kvMut.RLock()160kv := r.kv161r.kvMut.RUnlock()162163if kvCancel != nil {164kvCancel()165}166kvContext, kvCancel = context.WithCancel(r.cancelCtx)167go r.watchKV(kvContext, kv)168}169}170171if kvCancel != nil {172kvCancel()173}174}175176func (r *Remote) watchKV(ctx context.Context, client *agentRemoteClient) {177// Edge case: client was unset, nothing to do here.178if client == nil {179level.Info(r.log).Log("msg", "not watching the KV, none set")180return181}182183client.WatchPrefix(ctx, "", func(key string, v interface{}) bool {184if ctx.Err() != nil {185return false186}187188r.configsMut.Lock()189defer r.configsMut.Unlock()190191switch {192case v == nil:193r.configsCh <- WatchEvent{Key: key, Config: nil}194default:195cfg, err := instance.UnmarshalConfig(strings.NewReader(v.(string)))196if err != nil {197level.Error(r.log).Log("msg", "could not unmarshal config from store", "name", key, "err", err)198break199}200201r.configsCh <- WatchEvent{Key: key, Config: cfg}202}203204return true205})206}207208// List returns the list of all configs in the KV store.209func (r *Remote) List(ctx context.Context) ([]string, error) {210r.kvMut.RLock()211defer r.kvMut.RUnlock()212if r.kv == nil {213return nil, ErrNotConnected214}215216return r.kv.List(ctx, "")217}218219// listConsul returns Key Value Pairs instead of []string220func (r *Remote) listConsul(ctx context.Context) (api.KVPairs, error) {221if r.kv == nil {222return nil, ErrNotConnected223}224225var pairs api.KVPairs226options := &api.QueryOptions{227AllowStale: !r.kv.config.Consul.ConsistentReads,228RequireConsistent: r.kv.config.Consul.ConsistentReads,229}230// This is copied from cortex list so that stats stay the same231err := instrument.CollectedRequest(ctx, "List", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {232var err error233pairs, _, err = r.kv.consul.KV().List(r.kv.config.Prefix, options.WithContext(ctx))234return err235})236237if err != nil {238return nil, err239}240// This mirrors the previous behavior of returning a blank array as opposed to nil.241if pairs == nil {242blankPairs := make(api.KVPairs, 0)243return blankPairs, nil244}245for _, kvp := range pairs {246kvp.Key = strings.TrimPrefix(kvp.Key, r.kv.config.Prefix)247}248return pairs, nil249}250251// Get retrieves an individual config from the KV store.252func (r *Remote) Get(ctx context.Context, key string) (instance.Config, error) {253r.kvMut.RLock()254defer r.kvMut.RUnlock()255if r.kv == nil {256return instance.Config{}, ErrNotConnected257}258259v, err := r.kv.Get(ctx, key)260if err != nil {261return instance.Config{}, fmt.Errorf("failed to get config %s: %w", key, err)262} else if v == nil {263return instance.Config{}, NotExistError{Key: key}264}265266cfg, err := instance.UnmarshalConfig(strings.NewReader(v.(string)))267if err != nil {268return instance.Config{}, fmt.Errorf("failed to unmarshal config %s: %w", key, err)269}270return *cfg, nil271}272273// Put adds or updates a config in the KV store.274func (r *Remote) Put(ctx context.Context, c instance.Config) (bool, error) {275// We need to use a write lock here since two Applies can't run concurrently276// (given the current need to perform a store-wide validation.)277r.kvMut.Lock()278defer r.kvMut.Unlock()279if r.kv == nil {280return false, ErrNotConnected281}282283bb, err := instance.MarshalConfig(&c, false)284if err != nil {285return false, fmt.Errorf("failed to marshal config: %w", err)286}287288cfgCh, err := r.all(ctx, nil)289if err != nil {290return false, fmt.Errorf("failed to check validity of config: %w", err)291}292if err := checkUnique(cfgCh, &c); err != nil {293return false, fmt.Errorf("failed to check uniqueness of config: %w", err)294}295296var created bool297err = r.kv.CAS(ctx, c.Name, func(in interface{}) (out interface{}, retry bool, err error) {298// The configuration is new if there's no previous value from the CAS299created = (in == nil)300return string(bb), false, nil301})302if err != nil {303return false, fmt.Errorf("failed to put config: %w", err)304}305return created, nil306}307308// Delete deletes a config from the KV store. It returns NotExistError if309// the config doesn't exist.310func (r *Remote) Delete(ctx context.Context, key string) error {311r.kvMut.RLock()312defer r.kvMut.RUnlock()313if r.kv == nil {314return ErrNotConnected315}316317// Some KV stores don't return an error if something failed to be318// deleted, so we'll try to get it first. This isn't perfect, and319// it may fail, so we'll silently ignore any errors here unless320// we know for sure the config doesn't exist.321v, err := r.kv.Get(ctx, key)322if err != nil {323level.Warn(r.log).Log("msg", "error validating key existence for deletion", "err", err)324} else if v == nil {325return NotExistError{Key: key}326}327328err = r.kv.Delete(ctx, key)329if err != nil {330return fmt.Errorf("error deleting configuration: %w", err)331}332333return nil334}335336// All retrieves the set of all configs in the store.337func (r *Remote) All(ctx context.Context, keep func(key string) bool) (<-chan instance.Config, error) {338r.kvMut.RLock()339defer r.kvMut.RUnlock()340return r.all(ctx, keep)341}342343// all can only be called if the kvMut lock is already held.344func (r *Remote) all(ctx context.Context, keep func(key string) bool) (<-chan instance.Config, error) {345if r.kv == nil {346return nil, ErrNotConnected347}348349// If we are using a consul client then do the short circuit way, this is done so that we receive all the key value pairs350// in one call then, operate on them in memory. Previously we retrieved the list (which stripped the values)351// then ran a goroutine to get each individual value from consul. In situations with an extremely large number of352// configs this overloaded the consul instances. This reduces that to one call, that was being made anyways.353if r.kv.consul != nil {354return r.allConsul(ctx, keep)355}356357return r.allOther(ctx, keep)358}359360// allConsul is ONLY usable when consul is the keystore. This is a performance improvement in using the client directly361//362// instead of the cortex multi store kv interface. That interface returns the list then each value must be retrieved363// individually. This returns all the keys and values in one call and works on them in memory364func (r *Remote) allConsul(ctx context.Context, keep func(key string) bool) (<-chan instance.Config, error) {365if r.kv.consul == nil {366level.Error(r.log).Log("err", "allConsul called but consul client nil")367return nil, errors.New("allConsul called but consul client nil")368}369var configs []*instance.Config370c := GetCodec()371372pairs, err := r.listConsul(ctx)373374if err != nil {375return nil, err376}377for _, kvp := range pairs {378if keep != nil && !keep(kvp.Key) {379level.Debug(r.log).Log("msg", "skipping key that was filtered out", "key", kvp.Key)380continue381}382value, err := c.Decode(kvp.Value)383if err != nil {384level.Error(r.log).Log("msg", "failed to decode config from store", "key", kvp.Key, "err", err)385continue386}387if value == nil {388// Config was deleted since we called list, skip it.389level.Debug(r.log).Log("msg", "skipping key that was deleted after list was called", "key", kvp.Key)390continue391}392393cfg, err := instance.UnmarshalConfig(strings.NewReader(value.(string)))394if err != nil {395level.Error(r.log).Log("msg", "failed to unmarshal config from store", "key", kvp.Key, "err", err)396continue397}398configs = append(configs, cfg)399}400ch := make(chan instance.Config, len(configs))401for _, cfg := range configs {402ch <- *cfg403}404close(ch)405return ch, nil406}407408func (r *Remote) allOther(ctx context.Context, keep func(key string) bool) (<-chan instance.Config, error) {409if r.kv == nil {410return nil, ErrNotConnected411}412413keys, err := r.kv.List(ctx, "")414if err != nil {415return nil, fmt.Errorf("failed to list configs: %w", err)416}417418ch := make(chan instance.Config)419420var wg sync.WaitGroup421wg.Add(len(keys))422go func() {423wg.Wait()424close(ch)425}()426427for _, key := range keys {428go func(key string) {429defer wg.Done()430431if keep != nil && !keep(key) {432level.Debug(r.log).Log("msg", "skipping key that was filtered out", "key", key)433return434}435436// TODO(rfratto): retries might be useful here437v, err := r.kv.Get(ctx, key)438if err != nil {439level.Error(r.log).Log("msg", "failed to get config with key", "key", key, "err", err)440return441} else if v == nil {442// Config was deleted since we called list, skip it.443level.Debug(r.log).Log("msg", "skipping key that was deleted after list was called", "key", key)444return445}446447cfg, err := instance.UnmarshalConfig(strings.NewReader(v.(string)))448if err != nil {449level.Error(r.log).Log("msg", "failed to unmarshal config from store", "key", key, "err", err)450return451}452ch <- *cfg453}(key)454}455456return ch, nil457}458459// Watch watches the Store for changes.460func (r *Remote) Watch() <-chan WatchEvent {461return r.configsCh462}463464// Close closes the Remote store.465func (r *Remote) Close() error {466r.kvMut.Lock()467defer r.kvMut.Unlock()468r.cancelFunc()469return nil470}471472473