Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/flow/internal/controller/scheduler.go
4095 views
1
package controller
2
3
import (
4
"context"
5
"fmt"
6
"sync"
7
)
8
9
// RunnableNode is any dag.Node which can also be run.
10
type RunnableNode interface {
11
NodeID() string
12
Run(ctx context.Context) error
13
}
14
15
// Scheduler runs components.
16
type Scheduler struct {
17
ctx context.Context
18
cancel context.CancelFunc
19
running sync.WaitGroup
20
21
tasksMut sync.Mutex
22
tasks map[string]*task
23
}
24
25
// NewScheduler creates a new Scheduler. Call Synchronize to manage the set of
26
// components which are running.
27
//
28
// Call Close to stop the Scheduler and all running components.
29
func NewScheduler() *Scheduler {
30
ctx, cancel := context.WithCancel(context.Background())
31
return &Scheduler{
32
ctx: ctx,
33
cancel: cancel,
34
35
tasks: make(map[string]*task),
36
}
37
}
38
39
// Synchronize synchronizes the running components to those defined by rr.
40
//
41
// New RunnableNodes will be launched as new goroutines. RunnableNodes already
42
// managed by Scheduler will be kept running, while running RunnableNodes that
43
// are not in rr will be shut down and removed.
44
//
45
// Existing components will be restarted if they stopped since the previous
46
// call to Synchronize.
47
func (s *Scheduler) Synchronize(rr []RunnableNode) error {
48
s.tasksMut.Lock()
49
defer s.tasksMut.Unlock()
50
51
if s.ctx.Err() != nil {
52
return fmt.Errorf("Scheduler is closed")
53
}
54
55
newRunnables := make(map[string]RunnableNode, len(rr))
56
for _, r := range rr {
57
newRunnables[r.NodeID()] = r
58
}
59
60
// Stop tasks that are not defined in rr.
61
var stopping sync.WaitGroup
62
for id, t := range s.tasks {
63
if _, keep := newRunnables[id]; keep {
64
continue
65
}
66
67
stopping.Add(1)
68
go func(t *task) {
69
defer stopping.Done()
70
t.Stop()
71
}(t)
72
}
73
74
// Launch new runnables that have appeared.
75
for id, r := range newRunnables {
76
if _, exist := s.tasks[id]; exist {
77
continue
78
}
79
80
var (
81
nodeID = id
82
newRunnable = r
83
)
84
85
opts := taskOptions{
86
Context: s.ctx,
87
Runnable: newRunnable,
88
OnDone: func() {
89
defer s.running.Done()
90
91
s.tasksMut.Lock()
92
defer s.tasksMut.Unlock()
93
delete(s.tasks, nodeID)
94
},
95
}
96
97
s.running.Add(1)
98
s.tasks[nodeID] = newTask(opts)
99
}
100
101
// Wait for all stopping runnables to exit.
102
stopping.Wait()
103
return nil
104
}
105
106
// Close stops the Scheduler and returns after all running goroutines have
107
// exited.
108
func (s *Scheduler) Close() error {
109
s.cancel()
110
s.running.Wait()
111
return nil
112
}
113
114
// task is a scheduled runnable.
115
type task struct {
116
ctx context.Context
117
cancel context.CancelFunc
118
exited chan struct{}
119
}
120
121
type taskOptions struct {
122
Context context.Context
123
Runnable RunnableNode
124
OnDone func()
125
}
126
127
// newTask creates and starts a new task.
128
func newTask(opts taskOptions) *task {
129
ctx, cancel := context.WithCancel(opts.Context)
130
131
t := &task{
132
ctx: ctx,
133
cancel: cancel,
134
exited: make(chan struct{}),
135
}
136
137
go func() {
138
defer opts.OnDone()
139
defer close(t.exited)
140
_ = opts.Runnable.Run(t.ctx)
141
}()
142
return t
143
}
144
145
func (t *task) Stop() {
146
t.cancel()
147
<-t.exited
148
}
149
150