Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/flow/internal/controller/scheduler_test.go
4095 views
1
package controller_test
2
3
import (
4
"context"
5
"sync"
6
"testing"
7
8
"github.com/grafana/agent/component"
9
"github.com/grafana/agent/pkg/flow/internal/controller"
10
"github.com/stretchr/testify/require"
11
)
12
13
func TestScheduler_Synchronize(t *testing.T) {
14
t.Run("Can start new jobs", func(t *testing.T) {
15
var started, finished sync.WaitGroup
16
started.Add(3)
17
finished.Add(3)
18
19
runFunc := func(ctx context.Context) error {
20
defer finished.Done()
21
started.Done()
22
23
<-ctx.Done()
24
return nil
25
}
26
27
sched := controller.NewScheduler()
28
sched.Synchronize([]controller.RunnableNode{
29
fakeRunnable{ID: "component-a", Component: mockComponent{RunFunc: runFunc}},
30
fakeRunnable{ID: "component-b", Component: mockComponent{RunFunc: runFunc}},
31
fakeRunnable{ID: "component-c", Component: mockComponent{RunFunc: runFunc}},
32
})
33
34
started.Wait()
35
require.NoError(t, sched.Close())
36
finished.Wait()
37
})
38
39
t.Run("Ignores existing jobs", func(t *testing.T) {
40
var started sync.WaitGroup
41
started.Add(1)
42
43
runFunc := func(ctx context.Context) error {
44
started.Done()
45
<-ctx.Done()
46
return nil
47
}
48
49
sched := controller.NewScheduler()
50
51
for i := 0; i < 10; i++ {
52
// If a new runnable is created, runFunc will panic since the WaitGroup
53
// only supports 1 goroutine.
54
sched.Synchronize([]controller.RunnableNode{
55
fakeRunnable{ID: "component-a", Component: mockComponent{RunFunc: runFunc}},
56
})
57
}
58
59
started.Wait()
60
require.NoError(t, sched.Close())
61
})
62
63
t.Run("Removes stale jobs", func(t *testing.T) {
64
var started, finished sync.WaitGroup
65
started.Add(1)
66
finished.Add(1)
67
68
runFunc := func(ctx context.Context) error {
69
defer finished.Done()
70
started.Done()
71
<-ctx.Done()
72
return nil
73
}
74
75
sched := controller.NewScheduler()
76
77
sched.Synchronize([]controller.RunnableNode{
78
fakeRunnable{ID: "component-a", Component: mockComponent{RunFunc: runFunc}},
79
})
80
started.Wait()
81
82
sched.Synchronize([]controller.RunnableNode{})
83
84
finished.Wait()
85
require.NoError(t, sched.Close())
86
})
87
}
88
89
type fakeRunnable struct {
90
ID string
91
Component component.Component
92
}
93
94
var _ controller.RunnableNode = fakeRunnable{}
95
96
func (fr fakeRunnable) NodeID() string { return fr.ID }
97
func (fr fakeRunnable) Run(ctx context.Context) error { return fr.Component.Run(ctx) }
98
99
type mockComponent struct {
100
RunFunc func(ctx context.Context) error
101
UpdateFunc func(newConfig component.Arguments) error
102
}
103
104
var _ component.Component = (*mockComponent)(nil)
105
106
func (mc mockComponent) Run(ctx context.Context) error { return mc.RunFunc(ctx) }
107
func (mc mockComponent) Update(newConfig component.Arguments) error { return mc.UpdateFunc(newConfig) }
108
109