Path: blob/main/pkg/metrics/cluster/config_watcher.go
4094 views
package cluster12import (3"context"4"fmt"5"sync"6"time"78"github.com/go-kit/log"9"github.com/go-kit/log/level"10"github.com/grafana/agent/pkg/metrics/instance"11"github.com/grafana/agent/pkg/metrics/instance/configstore"12"github.com/grafana/agent/pkg/util"13"github.com/prometheus/client_golang/prometheus"14"github.com/prometheus/client_golang/prometheus/promauto"15)1617var (18reshardDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{19Name: "agent_metrics_scraping_service_reshard_duration",20Help: "How long it took for resharding to run.",21}, []string{"success"})22)2324// configWatcher connects to a configstore and will apply configs to an25// instance.Manager.26type configWatcher struct {27log log.Logger2829mut sync.Mutex30cfg Config31stopped bool32stop context.CancelFunc3334store configstore.Store35im instance.Manager36owns OwnershipFunc37validate ValidationFunc3839refreshCh chan struct{}40instanceMut sync.Mutex41instances map[string]struct{}42}4344// OwnershipFunc should determine if a given keep is owned by the caller.45type OwnershipFunc = func(key string) (bool, error)4647// ValidationFunc should validate a config.48type ValidationFunc = func(*instance.Config) error4950// newConfigWatcher watches store for changes and checks for each config against51// owns. It will also poll the configstore at a configurable interval.52func newConfigWatcher(log log.Logger, cfg Config, store configstore.Store, im instance.Manager, owns OwnershipFunc, validate ValidationFunc) (*configWatcher, error) {53ctx, cancel := context.WithCancel(context.Background())5455w := &configWatcher{56log: log,5758stop: cancel,5960store: store,61im: im,62owns: owns,63validate: validate,6465refreshCh: make(chan struct{}, 1),66instances: make(map[string]struct{}),67}68if err := w.ApplyConfig(cfg); err != nil {69return nil, err70}71// Delay duration, this is to prevent a race condition, see method for details72delay := cfg.Lifecycler.HeartbeatPeriod * 373go w.run(ctx, delay)74return w, nil75}7677func (w *configWatcher) ApplyConfig(cfg Config) error {78w.mut.Lock()79defer w.mut.Unlock()8081if util.CompareYAML(w.cfg, cfg) {82return nil83}8485if w.stopped {86return fmt.Errorf("configWatcher already stopped")87}8889w.cfg = cfg90return nil91}9293func (w *configWatcher) run(ctx context.Context, delay time.Duration) {94defer level.Info(w.log).Log("msg", "config watcher run loop exiting")95// This is due to a race condition between the heartbeat and config ring in a very narrow set of circumstances96// https://gist.github.com/mattdurham/c15f27de17a6da97bf2e6a870991c7f297time.Sleep(delay)98lastReshard := time.Now()99100for {101select {102case <-ctx.Done():103return104case <-w.nextReshard(lastReshard):105level.Debug(w.log).Log("msg", "reshard timer ticked, scheduling refresh")106w.RequestRefresh()107lastReshard = time.Now()108case <-w.refreshCh:109err := w.refresh(ctx)110if err != nil {111level.Error(w.log).Log("msg", "refresh failed", "err", err)112}113case ev := <-w.store.Watch():114level.Debug(w.log).Log("msg", "handling event from config store")115if err := w.handleEvent(ev); err != nil {116level.Error(w.log).Log("msg", "failed to handle changed or deleted config", "key", ev.Key, "err", err)117}118}119}120}121122// nextReshard returns a channel to that will fill a value when the reshard123// interval has elapsed.124func (w *configWatcher) nextReshard(lastReshard time.Time) <-chan time.Time {125w.mut.Lock()126nextReshard := lastReshard.Add(w.cfg.ReshardInterval)127w.mut.Unlock()128129remaining := time.Until(nextReshard)130131// NOTE(rfratto): clamping to 0 isn't necessary for time.After,132// but it makes the log message clearer to always use "0s" as133// "next reshard will be scheduled immediately."134if remaining < 0 {135remaining = 0136}137138level.Debug(w.log).Log("msg", "waiting for next reshard interval", "last_reshard", lastReshard, "next_reshard", nextReshard, "remaining", remaining)139return time.After(remaining)140}141142// RequestRefresh will queue a refresh. No more than one refresh can be queued at a time.143func (w *configWatcher) RequestRefresh() {144select {145case w.refreshCh <- struct{}{}:146level.Debug(w.log).Log("msg", "successfully scheduled a refresh")147default:148level.Debug(w.log).Log("msg", "ignoring request refresh: refresh already scheduled")149}150}151152// refresh reloads all configs from the configstore. Deleted configs will be153// removed. refresh may not be called concurrently and must only be invoked from run.154// Call RequestRefresh to queue a call to refresh.155func (w *configWatcher) refresh(ctx context.Context) (err error) {156w.mut.Lock()157enabled := w.cfg.Enabled158refreshTimeout := w.cfg.ReshardTimeout159w.mut.Unlock()160161if !enabled {162level.Debug(w.log).Log("msg", "refresh skipped because clustering is disabled")163return nil164}165level.Info(w.log).Log("msg", "starting refresh")166167if refreshTimeout > 0 {168var cancel context.CancelFunc169ctx, cancel = context.WithTimeout(ctx, refreshTimeout)170defer cancel()171}172173start := time.Now()174defer func() {175success := "1"176if err != nil {177success = "0"178}179duration := time.Since(start)180level.Info(w.log).Log("msg", "refresh finished", "duration", duration, "success", success, "err", err)181reshardDuration.WithLabelValues(success).Observe(duration.Seconds())182}()183184// This is used to determine if the context was already exceeded before calling the kv provider185if err = ctx.Err(); err != nil {186level.Error(w.log).Log("msg", "context deadline exceeded before calling store.all", "err", err)187return err188}189deadline, _ := ctx.Deadline()190level.Debug(w.log).Log("msg", "deadline before store.all", "deadline", deadline)191configs, err := w.store.All(ctx, func(key string) bool {192owns, err := w.owns(key)193if err != nil {194level.Error(w.log).Log("msg", "failed to check for ownership, instance will be deleted if it is running", "key", key, "err", err)195return false196}197return owns198})199level.Debug(w.log).Log("msg", "count of configs from store.all", "count", len(configs))200201if err != nil {202return fmt.Errorf("failed to get configs from store: %w", err)203}204205var (206keys = make(map[string]struct{})207firstError error208)209210Outer:211for {212select {213case <-ctx.Done():214return ctx.Err()215case cfg, ok := <-configs:216// w.store.All will close configs when all of them have been read.217if !ok {218break Outer219}220221if err := w.handleEvent(configstore.WatchEvent{Key: cfg.Name, Config: &cfg}); err != nil {222level.Error(w.log).Log("msg", "failed to process changed config", "key", cfg.Name, "err", err)223if firstError == nil {224firstError = err225}226}227228keys[cfg.Name] = struct{}{}229}230}231232// Any config we used to be running that disappeared from this most recent233// iteration should be deleted. We hold the lock just for the duration of234// populating deleted because handleEvent also grabs a hold on the lock.235var deleted []string236w.instanceMut.Lock()237for key := range w.instances {238if _, exist := keys[key]; exist {239continue240}241deleted = append(deleted, key)242}243w.instanceMut.Unlock()244245// Send a deleted event for any key that has gone away.246for _, key := range deleted {247if err := w.handleEvent(configstore.WatchEvent{Key: key, Config: nil}); err != nil {248level.Error(w.log).Log("msg", "failed to process changed config", "key", key, "err", err)249}250}251252return firstError253}254255func (w *configWatcher) handleEvent(ev configstore.WatchEvent) error {256w.mut.Lock()257defer w.mut.Unlock()258259if w.stopped {260return fmt.Errorf("configWatcher stopped")261}262263w.instanceMut.Lock()264defer w.instanceMut.Unlock()265266owned, err := w.owns(ev.Key)267if err != nil {268level.Error(w.log).Log("msg", "failed to see if config is owned. instance will be deleted if it is running", "err", err)269}270271var (272_, isRunning = w.instances[ev.Key]273isDeleted = ev.Config == nil274)275276switch {277// Two deletion scenarios:278// 1. A config we're running got moved to a new owner.279// 2. A config we're running got deleted280case (isRunning && !owned) || (isDeleted && isRunning):281if isDeleted {282level.Info(w.log).Log("msg", "untracking deleted config", "key", ev.Key)283} else {284level.Info(w.log).Log("msg", "untracking config that changed owners", "key", ev.Key)285}286287err := w.im.DeleteConfig(ev.Key)288delete(w.instances, ev.Key)289if err != nil {290return fmt.Errorf("failed to delete: %w", err)291}292293case !isDeleted && owned:294if err := w.validate(ev.Config); err != nil {295return fmt.Errorf(296"failed to validate config. %[1]s cannot run until the global settings are adjusted or the config is adjusted to operate within the global constraints. error: %[2]w",297ev.Key, err,298)299}300301if _, exist := w.instances[ev.Key]; !exist {302level.Info(w.log).Log("msg", "tracking new config", "key", ev.Key)303}304305if err := w.im.ApplyConfig(*ev.Config); err != nil {306return fmt.Errorf("failed to apply config: %w", err)307}308w.instances[ev.Key] = struct{}{}309}310311return nil312}313314// Stop stops the configWatcher. Cannot be called more than once.315func (w *configWatcher) Stop() error {316w.mut.Lock()317defer w.mut.Unlock()318319if w.stopped {320return fmt.Errorf("already stopped")321}322w.stop()323w.stopped = true324325// Shut down all the instances that this configWatcher managed. It *MUST*326// happen after w.stop() is called to prevent the run loop from applying any327// new configs.328w.instanceMut.Lock()329defer w.instanceMut.Unlock()330331for key := range w.instances {332if err := w.im.DeleteConfig(key); err != nil {333level.Warn(w.log).Log("msg", "failed deleting config on shutdown", "key", key, "err", err)334}335}336w.instances = make(map[string]struct{})337338return nil339}340341342