Path: blob/main/pkg/integrations/v2/controller_test.go
5283 views
package integrations12import (3"context"4"strings"5"sync"6"testing"78"github.com/go-kit/log"9"github.com/grafana/agent/pkg/util"10"github.com/stretchr/testify/require"11"go.uber.org/atomic"12)1314//15// Tests for Controller's utilization of the core Integration interface.16//1718// Test_controller_UniqueIdentifier ensures that integrations must not share a (name, id) tuple.19func Test_controller_UniqueIdentifier(t *testing.T) {20controllerFromConfigs := func(t *testing.T, cc []Config) (*controller, error) {21t.Helper()22return newController(util.TestLogger(t), controllerConfig(cc), Globals{})23}2425t.Run("different name, identifier", func(t *testing.T) {26_, err := controllerFromConfigs(t, []Config{27mockConfigNameTuple(t, "foo", "bar"),28mockConfigNameTuple(t, "fizz", "buzz"),29})30require.NoError(t, err)31})3233t.Run("same name, different identifier", func(t *testing.T) {34_, err := controllerFromConfigs(t, []Config{35mockConfigNameTuple(t, "foo", "bar"),36mockConfigNameTuple(t, "foo", "buzz"),37})38require.NoError(t, err)39})4041t.Run("same name, same identifier", func(t *testing.T) {42_, err := controllerFromConfigs(t, []Config{43mockConfigNameTuple(t, "foo", "bar"),44mockConfigNameTuple(t, "foo", "bar"),45})46require.Error(t, err, `multiple instance names "bar" in integration "foo"`)47})48}4950// Test_controller_RunsIntegration ensures that integrations51// run.52func Test_controller_RunsIntegration(t *testing.T) {53var wg sync.WaitGroup54wg.Add(1)5556ctx, cancel := context.WithCancel(context.Background())5758ctrl, err := newController(59util.TestLogger(t),60controllerConfig{61mockConfigForIntegration(t, FuncIntegration(func(ctx context.Context) error {62defer wg.Done()63cancel()64<-ctx.Done()65return nil66})),67},68Globals{},69)70require.NoError(t, err, "failed to create controller")7172// Run the controller. The controller should immediately run our fake integration73// which will cancel ctx and cause ctrl to exit.74ctrl.run(ctx)7576// Make sure that our integration exited too.77wg.Wait()78}7980// Test_controller_ConfigChanges ensures that integrations only get restarted81// when configs are no longer equal.82func Test_controller_ConfigChanges(t *testing.T) {83tc := func(t *testing.T, changed bool) (timesRan uint64) {84t.Helper()8586var integrationsWg sync.WaitGroup87var starts atomic.Uint648889mockIntegration := FuncIntegration(func(ctx context.Context) error {90integrationsWg.Done()91starts.Inc()92<-ctx.Done()93return nil94})9596cfg := controllerConfig{97mockConfig{98NameFunc: func() string { return mockIntegrationName },99ConfigEqualsFunc: func(Config) bool { return !changed },100ApplyDefaultsFunc: func(g Globals) error { return nil },101IdentifierFunc: func(Globals) (string, error) {102return mockIntegrationName, nil103},104NewIntegrationFunc: func(log.Logger, Globals) (Integration, error) {105integrationsWg.Add(1)106return mockIntegration, nil107},108},109}110111globals := Globals{}112ctrl, err := newController(util.TestLogger(t), cfg, globals)113require.NoError(t, err, "failed to create controller")114115sc := newSyncController(t, ctrl)116require.NoError(t, sc.UpdateController(cfg, globals), "failed to re-apply config")117118// Wait for our integrations to have been started119integrationsWg.Wait()120121sc.Stop()122return starts.Load()123}124125t.Run("Unchanged", func(t *testing.T) {126starts := tc(t, false)127require.Equal(t, uint64(1), starts, "integration should only have started exactly once")128})129130t.Run("Changed", func(t *testing.T) {131starts := tc(t, true)132require.Equal(t, uint64(2), starts, "integration should have started exactly twice")133})134}135136func Test_controller_SingletonCheck(t *testing.T) {137var integrationsWg sync.WaitGroup138var starts atomic.Uint64139140mockIntegration := FuncIntegration(func(ctx context.Context) error {141integrationsWg.Done()142starts.Inc()143<-ctx.Done()144return nil145})146c1 := mockConfig{147NameFunc: func() string { return mockIntegrationName },148ConfigEqualsFunc: func(Config) bool { return true },149ApplyDefaultsFunc: func(g Globals) error { return nil },150IdentifierFunc: func(Globals) (string, error) {151return mockIntegrationName, nil152},153NewIntegrationFunc: func(log.Logger, Globals) (Integration, error) {154integrationsWg.Add(1)155return mockIntegration, nil156},157}158configMap := make(map[Config]Type)159configMap[&c1] = TypeSingleton160setRegistered(t, configMap)161cfg := controllerConfig{162c1,163c1,164}165166globals := Globals{}167_, err := newController(util.TestLogger(t), cfg, globals)168require.Error(t, err)169require.True(t, strings.Contains(err.Error(), `integration "mock" may only be defined once`))170}171172type syncController struct {173inner *controller174pool *workerPool175}176177// newSyncController pairs an unstarted controller with a manually managed178// worker pool to synchronously apply integrations.179func newSyncController(t *testing.T, inner *controller) *syncController {180t.Helper()181182sc := &syncController{183inner: inner,184pool: newWorkerPool(context.Background(), inner.logger),185}186187// There's always immediately one queued integration set from any188// successfully created controller.189sc.refresh()190return sc191}192193func (sc *syncController) refresh() {194sc.inner.mut.Lock()195defer sc.inner.mut.Unlock()196197newIntegrations := <-sc.inner.runIntegrations198sc.pool.Reload(newIntegrations)199sc.inner.integrations = newIntegrations200}201202func (sc *syncController) UpdateController(c controllerConfig, g Globals) error {203err := sc.inner.UpdateController(c, g)204if err != nil {205return err206}207sc.refresh()208return nil209}210211func (sc *syncController) Stop() {212sc.pool.Close()213}214215const mockIntegrationName = "mock"216217type mockConfig struct {218NameFunc func() string219ApplyDefaultsFunc func(Globals) error220ConfigEqualsFunc func(Config) bool221IdentifierFunc func(Globals) (string, error)222NewIntegrationFunc func(log.Logger, Globals) (Integration, error)223}224225func (mc mockConfig) Name() string {226return mc.NameFunc()227}228229func (mc mockConfig) ConfigEquals(c Config) bool {230if mc.ConfigEqualsFunc != nil {231return mc.ConfigEqualsFunc(c)232}233return false234}235236func (mc mockConfig) ApplyDefaults(g Globals) error {237return mc.ApplyDefaultsFunc(g)238}239240func (mc mockConfig) Identifier(g Globals) (string, error) {241return mc.IdentifierFunc(g)242}243244func (mc mockConfig) NewIntegration(l log.Logger, g Globals) (Integration, error) {245return mc.NewIntegrationFunc(l, g)246}247248func (mc mockConfig) WithNewIntegrationFunc(f func(log.Logger, Globals) (Integration, error)) mockConfig {249return mockConfig{250NameFunc: mc.NameFunc,251ApplyDefaultsFunc: mc.ApplyDefaultsFunc,252ConfigEqualsFunc: mc.ConfigEqualsFunc,253IdentifierFunc: mc.IdentifierFunc,254NewIntegrationFunc: f,255}256}257258func mockConfigNameTuple(t *testing.T, name, id string) mockConfig {259t.Helper()260261return mockConfig{262NameFunc: func() string { return name },263IdentifierFunc: func(_ Globals) (string, error) { return id, nil },264ApplyDefaultsFunc: func(g Globals) error { return nil },265NewIntegrationFunc: func(log.Logger, Globals) (Integration, error) {266return NoOpIntegration, nil267},268}269}270271// mockConfigForIntegration returns a Config that will always return i.272func mockConfigForIntegration(t *testing.T, i Integration) mockConfig {273t.Helper()274275return mockConfig{276NameFunc: func() string { return mockIntegrationName },277ApplyDefaultsFunc: func(g Globals) error { return nil },278IdentifierFunc: func(Globals) (string, error) {279return mockIntegrationName, nil280},281NewIntegrationFunc: func(log.Logger, Globals) (Integration, error) {282return i, nil283},284}285}286287288