Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/runner/runner_test.go
4093 views
1
package runner_test
2
3
import (
4
"context"
5
"testing"
6
"time"
7
8
"github.com/cespare/xxhash/v2"
9
"github.com/grafana/agent/pkg/runner"
10
"github.com/grafana/agent/pkg/util"
11
"github.com/stretchr/testify/require"
12
"go.uber.org/atomic"
13
)
14
15
func TestRunner_ApplyPayloads(t *testing.T) {
16
t.Run("new Workers get scheduled for new tasks", func(t *testing.T) {
17
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
18
defer cancel()
19
20
workerCount := atomic.NewUint64(0)
21
22
r := runner.New(func(t stringTask) runner.Worker {
23
return &genericWorker{workerCount: workerCount}
24
})
25
defer r.Stop()
26
27
var tasks []stringTask
28
29
// Apply the first task and wait for it to run.
30
tasks = append(tasks, stringTask("task_a"))
31
require.NoError(t, r.ApplyTasks(ctx, tasks))
32
requireRunners(t, 1, workerCount)
33
34
// Append a more tasks and wait for it to run.
35
tasks = append(tasks, stringTask("task_b"), stringTask("task_c"))
36
require.NoError(t, r.ApplyTasks(ctx, tasks))
37
requireRunners(t, 3, workerCount)
38
})
39
40
t.Run("old Workers get terminated for removed tasks", func(t *testing.T) {
41
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
42
defer cancel()
43
44
workerCount := atomic.NewUint64(0)
45
46
r := runner.New(func(t stringTask) runner.Worker {
47
return &genericWorker{workerCount: workerCount}
48
})
49
defer r.Stop()
50
51
// Apply a set of initial tasks.
52
require.NoError(t, r.ApplyTasks(ctx, []stringTask{"task_a", "task_b", "task_c"}))
53
requireRunners(t, 3, workerCount)
54
55
// Apply a new set of tasks, removing tasks that were previously defined.
56
require.NoError(t, r.ApplyTasks(ctx, []stringTask{"task_b"}))
57
requireRunners(t, 1, workerCount)
58
})
59
}
60
61
func TestRunner_Stop(t *testing.T) {
62
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
63
defer cancel()
64
65
workerCount := atomic.NewUint64(0)
66
67
r := runner.New(func(t stringTask) runner.Worker {
68
return &genericWorker{workerCount: workerCount}
69
})
70
71
// Apply a set of initial tasks.
72
require.NoError(t, r.ApplyTasks(ctx, []stringTask{"task_a", "task_b", "task_c"}))
73
requireRunners(t, 3, workerCount)
74
75
// Stop the runner. No tasks should be running afterwards.
76
r.Stop()
77
requireRunners(t, 0, workerCount)
78
}
79
80
func requireRunners(t *testing.T, expect uint64, actual *atomic.Uint64) {
81
util.Eventually(t, func(t require.TestingT) {
82
require.Equal(t, expect, actual.Load())
83
})
84
}
85
86
type stringTask string
87
88
var _ runner.Task = stringTask("")
89
90
func (st stringTask) Hash() uint64 {
91
return xxhash.Sum64String(string(st))
92
}
93
94
func (st stringTask) Equals(other runner.Task) bool {
95
return st == other.(stringTask)
96
}
97
98
type genericWorker struct {
99
workerCount *atomic.Uint64
100
}
101
102
var _ runner.Worker = (*genericWorker)(nil)
103
104
func (w *genericWorker) Run(ctx context.Context) {
105
w.workerCount.Inc()
106
defer w.workerCount.Dec()
107
108
<-ctx.Done()
109
}
110
111