Path: blob/main/pkg/metrics/cluster/config_watcher_test.go
4094 views
package cluster12import (3"context"4"testing"5"time"67"github.com/grafana/agent/pkg/metrics/instance"8"github.com/grafana/agent/pkg/metrics/instance/configstore"9"github.com/grafana/agent/pkg/util"10"github.com/stretchr/testify/mock"11"github.com/stretchr/testify/require"12)1314func Test_configWatcher_Refresh(t *testing.T) {15var (16log = util.TestLogger(t)1718cfg = DefaultConfig19store = configstore.Mock{20WatchFunc: func() <-chan configstore.WatchEvent {21return make(chan configstore.WatchEvent)22},23}2425im mockConfigManager2627validate = func(*instance.Config) error { return nil }28owned = func(key string) (bool, error) { return true, nil }29)30cfg.Enabled = true31cfg.ReshardInterval = time.Hour3233w, err := newConfigWatcher(log, cfg, &store, &im, owned, validate)34require.NoError(t, err)35t.Cleanup(func() { _ = w.Stop() })3637im.On("ApplyConfig", mock.Anything).Return(nil)38im.On("DeleteConfig", mock.Anything).Return(nil)3940// First: return a "hello" config.41store.AllFunc = func(ctx context.Context, keep func(key string) bool) (<-chan instance.Config, error) {42ch := make(chan instance.Config)43go func() {44ch <- instance.Config{Name: "hello"}45close(ch)46}()47return ch, nil48}4950err = w.refresh(context.Background())51require.NoError(t, err)5253// Then: return a "new" config.54store.AllFunc = func(ctx context.Context, keep func(key string) bool) (<-chan instance.Config, error) {55ch := make(chan instance.Config, 1)56go func() {57ch <- instance.Config{Name: "new"}58close(ch)59}()60return ch, nil61}6263err = w.refresh(context.Background())64require.NoError(t, err)6566// "hello" and "new" should've been applied, and "hello" should've been deleted67// from the second refresh.68im.AssertCalled(t, "ApplyConfig", instance.Config{Name: "hello"})69im.AssertCalled(t, "ApplyConfig", instance.Config{Name: "new"})70im.AssertCalled(t, "DeleteConfig", "hello")71}7273func Test_configWatcher_handleEvent(t *testing.T) {74var (75cfg = DefaultConfig76store = configstore.Mock{77WatchFunc: func() <-chan configstore.WatchEvent {78return make(chan configstore.WatchEvent)79},80}8182validate = func(*instance.Config) error { return nil }8384owned = func(key string) (bool, error) { return true, nil }85unowned = func(key string) (bool, error) { return false, nil }86)87cfg.Enabled = true8889t.Run("new owned config", func(t *testing.T) {90var (91log = util.TestLogger(t)92im mockConfigManager93)9495w, err := newConfigWatcher(log, cfg, &store, &im, owned, validate)96require.NoError(t, err)97t.Cleanup(func() { _ = w.Stop() })9899im.On("ApplyConfig", mock.Anything).Return(nil)100im.On("DeleteConfig", mock.Anything).Return(nil)101102err = w.handleEvent(configstore.WatchEvent{Key: "new", Config: &instance.Config{}})103require.NoError(t, err)104105im.AssertNumberOfCalls(t, "ApplyConfig", 1)106})107108t.Run("updated owned config", func(t *testing.T) {109var (110log = util.TestLogger(t)111im mockConfigManager112)113114w, err := newConfigWatcher(log, cfg, &store, &im, owned, validate)115require.NoError(t, err)116t.Cleanup(func() { _ = w.Stop() })117118im.On("ApplyConfig", mock.Anything).Return(nil)119im.On("DeleteConfig", mock.Anything).Return(nil)120121// One for create, one for update122err = w.handleEvent(configstore.WatchEvent{Key: "update", Config: &instance.Config{}})123require.NoError(t, err)124125err = w.handleEvent(configstore.WatchEvent{Key: "update", Config: &instance.Config{}})126require.NoError(t, err)127128im.AssertNumberOfCalls(t, "ApplyConfig", 2)129})130131t.Run("new unowned config", func(t *testing.T) {132var (133log = util.TestLogger(t)134im mockConfigManager135)136137w, err := newConfigWatcher(log, cfg, &store, &im, unowned, validate)138require.NoError(t, err)139t.Cleanup(func() { _ = w.Stop() })140141im.On("ApplyConfig", mock.Anything).Return(nil)142im.On("DeleteConfig", mock.Anything).Return(nil)143144// One for create, one for update145err = w.handleEvent(configstore.WatchEvent{Key: "unowned", Config: &instance.Config{}})146require.NoError(t, err)147148im.AssertNumberOfCalls(t, "ApplyConfig", 0)149})150151t.Run("lost ownership", func(t *testing.T) {152var (153log = util.TestLogger(t)154155im mockConfigManager156157isOwned = true158owns = func(key string) (bool, error) { return isOwned, nil }159)160161w, err := newConfigWatcher(log, cfg, &store, &im, owns, validate)162require.NoError(t, err)163t.Cleanup(func() { _ = w.Stop() })164165im.On("ApplyConfig", mock.Anything).Return(nil)166im.On("DeleteConfig", mock.Anything).Return(nil)167168// One for create, then one for ownership change169err = w.handleEvent(configstore.WatchEvent{Key: "disappear", Config: &instance.Config{}})170require.NoError(t, err)171172// Mark the config as unowned. The re-apply should then delete it.173isOwned = false174175err = w.handleEvent(configstore.WatchEvent{Key: "disappear", Config: &instance.Config{}})176require.NoError(t, err)177178im.AssertNumberOfCalls(t, "ApplyConfig", 1)179im.AssertNumberOfCalls(t, "DeleteConfig", 1)180})181182t.Run("deleted running config", func(t *testing.T) {183var (184log = util.TestLogger(t)185186im mockConfigManager187)188189w, err := newConfigWatcher(log, cfg, &store, &im, owned, validate)190require.NoError(t, err)191t.Cleanup(func() { _ = w.Stop() })192193im.On("ApplyConfig", mock.Anything).Return(nil)194im.On("DeleteConfig", mock.Anything).Return(nil)195196// One for create, then one for deleted.197err = w.handleEvent(configstore.WatchEvent{Key: "new-key", Config: &instance.Config{}})198require.NoError(t, err)199200err = w.handleEvent(configstore.WatchEvent{Key: "new-key", Config: nil})201require.NoError(t, err)202203im.AssertNumberOfCalls(t, "ApplyConfig", 1)204im.AssertNumberOfCalls(t, "DeleteConfig", 1)205})206}207208func Test_configWatcher_nextReshard(t *testing.T) {209watcher := &configWatcher{210log: util.TestLogger(t),211cfg: Config{ReshardInterval: time.Second},212}213214t.Run("past time", func(t *testing.T) {215select {216case <-watcher.nextReshard(time.Time{}):217case <-time.After(250 * time.Millisecond):218require.FailNow(t, "nextReshard did not return an already ready channel")219}220})221222t.Run("future time", func(t *testing.T) {223select {224case <-watcher.nextReshard(time.Now()):225case <-time.After(1500 * time.Millisecond):226require.FailNow(t, "nextReshard took too long to return")227}228})229}230231type mockConfigManager struct {232mock.Mock233}234235func (m *mockConfigManager) GetInstance(name string) (instance.ManagedInstance, error) {236args := m.Mock.Called()237return args.Get(0).(instance.ManagedInstance), args.Error(1)238}239240func (m *mockConfigManager) ListInstances() map[string]instance.ManagedInstance {241args := m.Mock.Called()242return args.Get(0).(map[string]instance.ManagedInstance)243}244245// ListConfigs implements Manager.246func (m *mockConfigManager) ListConfigs() map[string]instance.Config {247args := m.Mock.Called()248return args.Get(0).(map[string]instance.Config)249}250251// ApplyConfig implements Manager.252func (m *mockConfigManager) ApplyConfig(c instance.Config) error {253args := m.Mock.Called(c)254return args.Error(0)255}256257// DeleteConfig implements Manager.258func (m *mockConfigManager) DeleteConfig(name string) error {259args := m.Mock.Called(name)260return args.Error(0)261}262263// Stop implements Manager.264func (m *mockConfigManager) Stop() {265m.Mock.Called()266}267268269