Path: blob/main/pkg/flow/internal/controller/scheduler_test.go
4095 views
package controller_test12import (3"context"4"sync"5"testing"67"github.com/grafana/agent/component"8"github.com/grafana/agent/pkg/flow/internal/controller"9"github.com/stretchr/testify/require"10)1112func TestScheduler_Synchronize(t *testing.T) {13t.Run("Can start new jobs", func(t *testing.T) {14var started, finished sync.WaitGroup15started.Add(3)16finished.Add(3)1718runFunc := func(ctx context.Context) error {19defer finished.Done()20started.Done()2122<-ctx.Done()23return nil24}2526sched := controller.NewScheduler()27sched.Synchronize([]controller.RunnableNode{28fakeRunnable{ID: "component-a", Component: mockComponent{RunFunc: runFunc}},29fakeRunnable{ID: "component-b", Component: mockComponent{RunFunc: runFunc}},30fakeRunnable{ID: "component-c", Component: mockComponent{RunFunc: runFunc}},31})3233started.Wait()34require.NoError(t, sched.Close())35finished.Wait()36})3738t.Run("Ignores existing jobs", func(t *testing.T) {39var started sync.WaitGroup40started.Add(1)4142runFunc := func(ctx context.Context) error {43started.Done()44<-ctx.Done()45return nil46}4748sched := controller.NewScheduler()4950for i := 0; i < 10; i++ {51// If a new runnable is created, runFunc will panic since the WaitGroup52// only supports 1 goroutine.53sched.Synchronize([]controller.RunnableNode{54fakeRunnable{ID: "component-a", Component: mockComponent{RunFunc: runFunc}},55})56}5758started.Wait()59require.NoError(t, sched.Close())60})6162t.Run("Removes stale jobs", func(t *testing.T) {63var started, finished sync.WaitGroup64started.Add(1)65finished.Add(1)6667runFunc := func(ctx context.Context) error {68defer finished.Done()69started.Done()70<-ctx.Done()71return nil72}7374sched := controller.NewScheduler()7576sched.Synchronize([]controller.RunnableNode{77fakeRunnable{ID: "component-a", Component: mockComponent{RunFunc: runFunc}},78})79started.Wait()8081sched.Synchronize([]controller.RunnableNode{})8283finished.Wait()84require.NoError(t, sched.Close())85})86}8788type fakeRunnable struct {89ID string90Component component.Component91}9293var _ controller.RunnableNode = fakeRunnable{}9495func (fr fakeRunnable) NodeID() string { return fr.ID }96func (fr fakeRunnable) Run(ctx context.Context) error { return fr.Component.Run(ctx) }9798type mockComponent struct {99RunFunc func(ctx context.Context) error100UpdateFunc func(newConfig component.Arguments) error101}102103var _ component.Component = (*mockComponent)(nil)104105func (mc mockComponent) Run(ctx context.Context) error { return mc.RunFunc(ctx) }106func (mc mockComponent) Update(newConfig component.Arguments) error { return mc.UpdateFunc(newConfig) }107108109