Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/runner/runner.go
4096 views
1
// Package runner provides an API for generic goroutine scheduling. It is
2
// broken up into three concepts:
3
//
4
// 1. Task: a unit of work to perform
5
// 2. Worker: a goroutine dedicated to doing a specific Task
6
// 3. Runner: manages the set of Workers, one per unique Task
7
//
8
// An example of a Task and Worker pair would be a Task which describes an
9
// endpoint to poll for health. The Task would then be assigned to a Worker to
10
// perform the polling.
11
package runner
12
13
import (
14
"context"
15
"fmt"
16
"sync"
17
)
18
19
// A Task is a payload that determines what a Worker should do. For example,
20
// a Task may be a struct including an address for a Worker to poll.
21
type Task interface {
22
// Hash should return a hash which represents this Task.
23
Hash() uint64
24
// Equals should determine if two Tasks are equal. It is only called when two
25
// Tasks have the same Hash.
26
Equals(other Task) bool
27
}
28
29
// A Worker is a goroutine which performs business logic for a Task which is
30
// assigned to it. Each Worker is responsible for a single Task.
31
type Worker interface {
32
// Run starts a Worker, blocking until the provided ctx is canceled or a
33
// fatal error occurs. Run is guaranteed to be called exactly once for any
34
// given Worker.
35
Run(ctx context.Context)
36
}
37
38
// The Runner manages a set of running Workers based on an active set of tasks.
39
type Runner[TaskType Task] struct {
40
newWorker func(t TaskType) Worker
41
42
ctx context.Context
43
cancel context.CancelFunc
44
45
running sync.WaitGroup
46
workers *hashMap
47
}
48
49
// Internal types used to implement the Runner.
50
type (
51
// scheduledWorker is a representation of a running worker.
52
scheduledWorker struct {
53
Worker Worker // The underlying Worker instance.
54
55
// Function to call to request the worker to stop.
56
Cancel context.CancelFunc
57
58
// Exited will close once the worker has exited.
59
Exited chan struct{}
60
}
61
62
// workerTask represents a tuple of a scheduledWorker with its assigned Task.
63
// workerTask implements Task for it to be used in a hashMap; two workerTasks
64
// are equal if their underlying Tasks are equal.
65
workerTask struct {
66
Worker *scheduledWorker
67
Task Task
68
}
69
)
70
71
// Hash returns the hash of the Task the scheduledWorker owns.
72
func (sw *workerTask) Hash() uint64 {
73
return sw.Task.Hash()
74
}
75
76
// Equals returns true if the Task owned by this workerTask equals the Task
77
// owned by another workerTask.
78
func (sw *workerTask) Equals(other Task) bool {
79
return sw.Task.Equals(other.(*workerTask).Task)
80
}
81
82
// New creates a new Runner which manages workers for a given Task type. The
83
// newWorker function is called whenever a new Task is received that is not
84
// managed by any existing Worker.
85
func New[TaskType Task](newWorker func(t TaskType) Worker) *Runner[TaskType] {
86
ctx, cancel := context.WithCancel(context.Background())
87
88
return &Runner[TaskType]{
89
newWorker: newWorker,
90
91
ctx: ctx,
92
cancel: cancel,
93
94
workers: newHashMap(10),
95
}
96
}
97
98
// ApplyTasks updates the Tasks tracked by the Runner to the slice specified
99
// by t. t should be the entire set of tasks that workers should be operating
100
// against. ApplyTasks will launch new Workers for new tasks and terminate
101
// previous Workers for tasks which are no longer found in tt.
102
//
103
// ApplyTasks will block until Workers for stale Tasks have terminated. If the
104
// provided context is canceled, ApplyTasks will still finish synchronizing the
105
// set of Workers but will not wait for stale Workers to exit.
106
func (s *Runner[TaskType]) ApplyTasks(ctx context.Context, tt []TaskType) error {
107
if s.ctx.Err() != nil {
108
return fmt.Errorf("Runner is closed")
109
}
110
111
// Create a new hashMap of tasks we intend to run.
112
newTasks := newHashMap(len(tt))
113
for _, t := range tt {
114
newTasks.Add(t)
115
}
116
117
// Stop stale workers (i.e., Workers whose tasks are not in newTasks).
118
var stopping sync.WaitGroup
119
for w := range s.workers.Iterate() {
120
if newTasks.Has(w.(*workerTask).Task) {
121
// Task still exists.
122
continue
123
}
124
125
// Stop and remove the task from s.workers.
126
stopping.Add(1)
127
go func(w *workerTask) {
128
defer stopping.Done()
129
defer s.workers.Delete(w)
130
w.Worker.Cancel()
131
132
select {
133
case <-ctx.Done():
134
case <-w.Worker.Exited:
135
}
136
}(w.(*workerTask))
137
}
138
139
// Ensure that every defined task in newTasks has a worker associated with
140
// it.
141
for definedTask := range newTasks.Iterate() {
142
// Ignore tasks for workers that are already running.
143
//
144
// We use a temporary workerTask here where only the task field is used
145
// for comparison. This prevents unnecessarily creating a new worker when
146
// one isn't needed.
147
if s.workers.Has(&workerTask{Task: definedTask}) {
148
continue
149
}
150
151
workerCtx, workerCancel := context.WithCancel(s.ctx)
152
newWorker := &scheduledWorker{
153
Worker: s.newWorker(definedTask.(TaskType)),
154
Cancel: workerCancel,
155
Exited: make(chan struct{}),
156
}
157
newTask := &workerTask{
158
Worker: newWorker,
159
Task: definedTask,
160
}
161
162
s.running.Add(1)
163
go func() {
164
defer s.running.Done()
165
defer close(newWorker.Exited)
166
newWorker.Worker.Run(workerCtx)
167
}()
168
169
_ = s.workers.Add(newTask)
170
}
171
172
// Wait for all stopping workers to stop (or until the context to cancel,
173
// which will stop the WaitGroup early).
174
stopping.Wait()
175
return ctx.Err()
176
}
177
178
// Tasks returns the current set of Tasks. Tasks are included even if their
179
// associated Worker has terminated.
180
func (s *Runner[TaskType]) Tasks() []TaskType {
181
var res []TaskType
182
for task := range s.workers.Iterate() {
183
workerTask := task.(*workerTask)
184
res = append(res, workerTask.Task.(TaskType))
185
}
186
return res
187
}
188
189
// Workers returns the current set of Workers. Workers are included even if
190
// they have terminated.
191
func (s *Runner[TaskType]) Workers() []Worker {
192
var res []Worker
193
for task := range s.workers.Iterate() {
194
workerTask := task.(*workerTask)
195
res = append(res, workerTask.Worker.Worker)
196
}
197
return res
198
}
199
200
// Stop the Scheduler and all running Workers. Close blocks until all running
201
// Workers exit.
202
func (s *Runner[TaskType]) Stop() {
203
s.cancel()
204
s.running.Wait()
205
}
206
207