Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/otelcol/internal/scheduler/scheduler_test.go
4096 views
1
package scheduler_test
2
3
import (
4
"context"
5
"testing"
6
"time"
7
8
"github.com/grafana/agent/component/otelcol/internal/scheduler"
9
"github.com/grafana/agent/pkg/flow/componenttest"
10
"github.com/grafana/agent/pkg/util"
11
"github.com/stretchr/testify/require"
12
otelcomponent "go.opentelemetry.io/collector/component"
13
)
14
15
func TestScheduler(t *testing.T) {
16
t.Run("Scheduled components get started", func(t *testing.T) {
17
var (
18
l = util.TestLogger(t)
19
cs = scheduler.New(l)
20
h = scheduler.NewHost(l)
21
)
22
23
// Run our scheduler in the background.
24
go func() {
25
err := cs.Run(componenttest.TestContext(t))
26
require.NoError(t, err)
27
}()
28
29
// Schedule our component, which should notify the started trigger once it is
30
// running.
31
component, started, _ := newTriggerComponent()
32
cs.Schedule(h, component)
33
require.NoError(t, started.Wait(5*time.Second), "component did not start")
34
})
35
36
t.Run("Unscheduled components get stopped", func(t *testing.T) {
37
var (
38
l = util.TestLogger(t)
39
cs = scheduler.New(l)
40
h = scheduler.NewHost(l)
41
)
42
43
// Run our scheduler in the background.
44
go func() {
45
err := cs.Run(componenttest.TestContext(t))
46
require.NoError(t, err)
47
}()
48
49
// Schedule our component, which should notify the started and stopped
50
// trigger once it starts and stops respectively.
51
component, started, stopped := newTriggerComponent()
52
cs.Schedule(h, component)
53
54
// Wait for the component to start, and then unschedule all components, which
55
// should cause our running component to terminate.
56
require.NoError(t, started.Wait(5*time.Second), "component did not start")
57
cs.Schedule(h)
58
require.NoError(t, stopped.Wait(5*time.Second), "component did not shutdown")
59
})
60
61
t.Run("Running components get stopped on shutdown", func(t *testing.T) {
62
var (
63
l = util.TestLogger(t)
64
cs = scheduler.New(l)
65
h = scheduler.NewHost(l)
66
)
67
68
ctx, cancel := context.WithCancel(componenttest.TestContext(t))
69
defer cancel()
70
71
// Run our scheduler in the background.
72
go func() {
73
err := cs.Run(ctx)
74
require.NoError(t, err)
75
}()
76
77
// Schedule our component which will notify our trigger when Shutdown gets
78
// called.
79
component, started, stopped := newTriggerComponent()
80
cs.Schedule(h, component)
81
82
// Wait for the component to start, and then stop our scheduler, which
83
// should cause our running component to terminate.
84
require.NoError(t, started.Wait(5*time.Second), "component did not start")
85
cancel()
86
require.NoError(t, stopped.Wait(5*time.Second), "component did not shutdown")
87
})
88
}
89
90
func newTriggerComponent() (component otelcomponent.Component, started, stopped *util.WaitTrigger) {
91
started = util.NewWaitTrigger()
92
stopped = util.NewWaitTrigger()
93
94
component = &fakeComponent{
95
StartFunc: func(_ context.Context, _ otelcomponent.Host) error {
96
started.Trigger()
97
return nil
98
},
99
ShutdownFunc: func(_ context.Context) error {
100
stopped.Trigger()
101
return nil
102
},
103
}
104
105
return
106
}
107
108
type fakeComponent struct {
109
StartFunc func(ctx context.Context, host otelcomponent.Host) error
110
ShutdownFunc func(ctx context.Context) error
111
}
112
113
var _ otelcomponent.Component = (*fakeComponent)(nil)
114
115
func (fc *fakeComponent) Start(ctx context.Context, host otelcomponent.Host) error {
116
if fc.StartFunc != nil {
117
fc.StartFunc(ctx, host)
118
}
119
return nil
120
}
121
122
func (fc *fakeComponent) Shutdown(ctx context.Context) error {
123
if fc.ShutdownFunc != nil {
124
return fc.ShutdownFunc(ctx)
125
}
126
return nil
127
}
128
129