Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/integrations/v2/controller_test.go
5283 views
1
package integrations
2
3
import (
4
"context"
5
"strings"
6
"sync"
7
"testing"
8
9
"github.com/go-kit/log"
10
"github.com/grafana/agent/pkg/util"
11
"github.com/stretchr/testify/require"
12
"go.uber.org/atomic"
13
)
14
15
//
16
// Tests for Controller's utilization of the core Integration interface.
17
//
18
19
// Test_controller_UniqueIdentifier ensures that integrations must not share a (name, id) tuple.
20
func Test_controller_UniqueIdentifier(t *testing.T) {
21
controllerFromConfigs := func(t *testing.T, cc []Config) (*controller, error) {
22
t.Helper()
23
return newController(util.TestLogger(t), controllerConfig(cc), Globals{})
24
}
25
26
t.Run("different name, identifier", func(t *testing.T) {
27
_, err := controllerFromConfigs(t, []Config{
28
mockConfigNameTuple(t, "foo", "bar"),
29
mockConfigNameTuple(t, "fizz", "buzz"),
30
})
31
require.NoError(t, err)
32
})
33
34
t.Run("same name, different identifier", func(t *testing.T) {
35
_, err := controllerFromConfigs(t, []Config{
36
mockConfigNameTuple(t, "foo", "bar"),
37
mockConfigNameTuple(t, "foo", "buzz"),
38
})
39
require.NoError(t, err)
40
})
41
42
t.Run("same name, same identifier", func(t *testing.T) {
43
_, err := controllerFromConfigs(t, []Config{
44
mockConfigNameTuple(t, "foo", "bar"),
45
mockConfigNameTuple(t, "foo", "bar"),
46
})
47
require.Error(t, err, `multiple instance names "bar" in integration "foo"`)
48
})
49
}
50
51
// Test_controller_RunsIntegration ensures that integrations
52
// run.
53
func Test_controller_RunsIntegration(t *testing.T) {
54
var wg sync.WaitGroup
55
wg.Add(1)
56
57
ctx, cancel := context.WithCancel(context.Background())
58
59
ctrl, err := newController(
60
util.TestLogger(t),
61
controllerConfig{
62
mockConfigForIntegration(t, FuncIntegration(func(ctx context.Context) error {
63
defer wg.Done()
64
cancel()
65
<-ctx.Done()
66
return nil
67
})),
68
},
69
Globals{},
70
)
71
require.NoError(t, err, "failed to create controller")
72
73
// Run the controller. The controller should immediately run our fake integration
74
// which will cancel ctx and cause ctrl to exit.
75
ctrl.run(ctx)
76
77
// Make sure that our integration exited too.
78
wg.Wait()
79
}
80
81
// Test_controller_ConfigChanges ensures that integrations only get restarted
82
// when configs are no longer equal.
83
func Test_controller_ConfigChanges(t *testing.T) {
84
tc := func(t *testing.T, changed bool) (timesRan uint64) {
85
t.Helper()
86
87
var integrationsWg sync.WaitGroup
88
var starts atomic.Uint64
89
90
mockIntegration := FuncIntegration(func(ctx context.Context) error {
91
integrationsWg.Done()
92
starts.Inc()
93
<-ctx.Done()
94
return nil
95
})
96
97
cfg := controllerConfig{
98
mockConfig{
99
NameFunc: func() string { return mockIntegrationName },
100
ConfigEqualsFunc: func(Config) bool { return !changed },
101
ApplyDefaultsFunc: func(g Globals) error { return nil },
102
IdentifierFunc: func(Globals) (string, error) {
103
return mockIntegrationName, nil
104
},
105
NewIntegrationFunc: func(log.Logger, Globals) (Integration, error) {
106
integrationsWg.Add(1)
107
return mockIntegration, nil
108
},
109
},
110
}
111
112
globals := Globals{}
113
ctrl, err := newController(util.TestLogger(t), cfg, globals)
114
require.NoError(t, err, "failed to create controller")
115
116
sc := newSyncController(t, ctrl)
117
require.NoError(t, sc.UpdateController(cfg, globals), "failed to re-apply config")
118
119
// Wait for our integrations to have been started
120
integrationsWg.Wait()
121
122
sc.Stop()
123
return starts.Load()
124
}
125
126
t.Run("Unchanged", func(t *testing.T) {
127
starts := tc(t, false)
128
require.Equal(t, uint64(1), starts, "integration should only have started exactly once")
129
})
130
131
t.Run("Changed", func(t *testing.T) {
132
starts := tc(t, true)
133
require.Equal(t, uint64(2), starts, "integration should have started exactly twice")
134
})
135
}
136
137
func Test_controller_SingletonCheck(t *testing.T) {
138
var integrationsWg sync.WaitGroup
139
var starts atomic.Uint64
140
141
mockIntegration := FuncIntegration(func(ctx context.Context) error {
142
integrationsWg.Done()
143
starts.Inc()
144
<-ctx.Done()
145
return nil
146
})
147
c1 := mockConfig{
148
NameFunc: func() string { return mockIntegrationName },
149
ConfigEqualsFunc: func(Config) bool { return true },
150
ApplyDefaultsFunc: func(g Globals) error { return nil },
151
IdentifierFunc: func(Globals) (string, error) {
152
return mockIntegrationName, nil
153
},
154
NewIntegrationFunc: func(log.Logger, Globals) (Integration, error) {
155
integrationsWg.Add(1)
156
return mockIntegration, nil
157
},
158
}
159
configMap := make(map[Config]Type)
160
configMap[&c1] = TypeSingleton
161
setRegistered(t, configMap)
162
cfg := controllerConfig{
163
c1,
164
c1,
165
}
166
167
globals := Globals{}
168
_, err := newController(util.TestLogger(t), cfg, globals)
169
require.Error(t, err)
170
require.True(t, strings.Contains(err.Error(), `integration "mock" may only be defined once`))
171
}
172
173
type syncController struct {
174
inner *controller
175
pool *workerPool
176
}
177
178
// newSyncController pairs an unstarted controller with a manually managed
179
// worker pool to synchronously apply integrations.
180
func newSyncController(t *testing.T, inner *controller) *syncController {
181
t.Helper()
182
183
sc := &syncController{
184
inner: inner,
185
pool: newWorkerPool(context.Background(), inner.logger),
186
}
187
188
// There's always immediately one queued integration set from any
189
// successfully created controller.
190
sc.refresh()
191
return sc
192
}
193
194
func (sc *syncController) refresh() {
195
sc.inner.mut.Lock()
196
defer sc.inner.mut.Unlock()
197
198
newIntegrations := <-sc.inner.runIntegrations
199
sc.pool.Reload(newIntegrations)
200
sc.inner.integrations = newIntegrations
201
}
202
203
func (sc *syncController) UpdateController(c controllerConfig, g Globals) error {
204
err := sc.inner.UpdateController(c, g)
205
if err != nil {
206
return err
207
}
208
sc.refresh()
209
return nil
210
}
211
212
func (sc *syncController) Stop() {
213
sc.pool.Close()
214
}
215
216
const mockIntegrationName = "mock"
217
218
type mockConfig struct {
219
NameFunc func() string
220
ApplyDefaultsFunc func(Globals) error
221
ConfigEqualsFunc func(Config) bool
222
IdentifierFunc func(Globals) (string, error)
223
NewIntegrationFunc func(log.Logger, Globals) (Integration, error)
224
}
225
226
func (mc mockConfig) Name() string {
227
return mc.NameFunc()
228
}
229
230
func (mc mockConfig) ConfigEquals(c Config) bool {
231
if mc.ConfigEqualsFunc != nil {
232
return mc.ConfigEqualsFunc(c)
233
}
234
return false
235
}
236
237
func (mc mockConfig) ApplyDefaults(g Globals) error {
238
return mc.ApplyDefaultsFunc(g)
239
}
240
241
func (mc mockConfig) Identifier(g Globals) (string, error) {
242
return mc.IdentifierFunc(g)
243
}
244
245
func (mc mockConfig) NewIntegration(l log.Logger, g Globals) (Integration, error) {
246
return mc.NewIntegrationFunc(l, g)
247
}
248
249
func (mc mockConfig) WithNewIntegrationFunc(f func(log.Logger, Globals) (Integration, error)) mockConfig {
250
return mockConfig{
251
NameFunc: mc.NameFunc,
252
ApplyDefaultsFunc: mc.ApplyDefaultsFunc,
253
ConfigEqualsFunc: mc.ConfigEqualsFunc,
254
IdentifierFunc: mc.IdentifierFunc,
255
NewIntegrationFunc: f,
256
}
257
}
258
259
func mockConfigNameTuple(t *testing.T, name, id string) mockConfig {
260
t.Helper()
261
262
return mockConfig{
263
NameFunc: func() string { return name },
264
IdentifierFunc: func(_ Globals) (string, error) { return id, nil },
265
ApplyDefaultsFunc: func(g Globals) error { return nil },
266
NewIntegrationFunc: func(log.Logger, Globals) (Integration, error) {
267
return NoOpIntegration, nil
268
},
269
}
270
}
271
272
// mockConfigForIntegration returns a Config that will always return i.
273
func mockConfigForIntegration(t *testing.T, i Integration) mockConfig {
274
t.Helper()
275
276
return mockConfig{
277
NameFunc: func() string { return mockIntegrationName },
278
ApplyDefaultsFunc: func(g Globals) error { return nil },
279
IdentifierFunc: func(Globals) (string, error) {
280
return mockIntegrationName, nil
281
},
282
NewIntegrationFunc: func(log.Logger, Globals) (Integration, error) {
283
return i, nil
284
},
285
}
286
}
287
288