Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/integrations/manager_test.go
5295 views
1
package integrations
2
3
import (
4
"context"
5
"fmt"
6
"net/http"
7
"testing"
8
"time"
9
10
"github.com/cortexproject/cortex/pkg/util/test"
11
"github.com/go-kit/log"
12
"github.com/grafana/agent/pkg/integrations/config"
13
"github.com/grafana/agent/pkg/metrics/instance"
14
"github.com/prometheus/client_golang/prometheus/promhttp"
15
"github.com/prometheus/common/model"
16
promConfig "github.com/prometheus/prometheus/config"
17
"github.com/prometheus/prometheus/model/labels"
18
"github.com/prometheus/prometheus/model/relabel"
19
"github.com/stretchr/testify/require"
20
"go.uber.org/atomic"
21
"gopkg.in/yaml.v2"
22
)
23
24
const mockIntegrationName = "integration/mock"
25
26
func noOpValidator(*instance.Config) error { return nil }
27
28
// TestConfig_MarshalEmptyIntegrations ensures that an empty set of integrations
29
// can be marshaled correctly.
30
func TestConfig_MarshalEmptyIntegrations(t *testing.T) {
31
cfgText := `
32
scrape_integrations: true
33
replace_instance_label: true
34
integration_restart_backoff: 5s
35
use_hostname_label: true
36
`
37
var (
38
cfg ManagerConfig
39
listenPort = 12345
40
listenHost = "127.0.0.1"
41
)
42
require.NoError(t, yaml.Unmarshal([]byte(cfgText), &cfg))
43
44
// Listen port must be set before applying defaults. Normally applied by the
45
// config package.
46
cfg.ListenPort = listenPort
47
cfg.ListenHost = listenHost
48
49
outBytes, err := yaml.Marshal(cfg)
50
require.NoError(t, err, "Failed creating integration")
51
require.YAMLEq(t, cfgText, string(outBytes))
52
}
53
54
// Test that embedded integration fields in the struct can be unmarshaled and
55
// remarshaled back out to text.
56
func TestConfig_Remarshal(t *testing.T) {
57
RegisterIntegration(&testIntegrationA{})
58
cfgText := `
59
scrape_integrations: true
60
replace_instance_label: true
61
integration_restart_backoff: 5s
62
use_hostname_label: true
63
test:
64
text: Hello, world!
65
truth: true
66
`
67
var (
68
cfg ManagerConfig
69
listenPort = 12345
70
listenHost = "127.0.0.1"
71
)
72
require.NoError(t, yaml.Unmarshal([]byte(cfgText), &cfg))
73
74
// Listen port must be set before applying defaults. Normally applied by the
75
// config package.
76
cfg.ListenPort = listenPort
77
cfg.ListenHost = listenHost
78
79
outBytes, err := yaml.Marshal(cfg)
80
require.NoError(t, err, "Failed creating integration")
81
require.YAMLEq(t, cfgText, string(outBytes))
82
}
83
84
func TestConfig_AddressRelabels(t *testing.T) {
85
cfgText := `
86
agent:
87
enabled: true
88
`
89
90
var (
91
cfg ManagerConfig
92
listenPort = 12345
93
listenHost = "127.0.0.1"
94
)
95
require.NoError(t, yaml.Unmarshal([]byte(cfgText), &cfg))
96
97
// Listen port must be set before applying defaults. Normally applied by the
98
// config package.
99
cfg.ListenPort = listenPort
100
cfg.ListenHost = listenHost
101
102
expectHostname, _ := instance.Hostname()
103
relabels := cfg.DefaultRelabelConfigs(expectHostname + ":12345")
104
105
// Ensure that the relabel configs are functional
106
require.Len(t, relabels, 1)
107
result, _ := relabel.Process(labels.FromStrings("__address__", "127.0.0.1"), relabels...)
108
109
require.Equal(t, result.Get("instance"), expectHostname+":12345")
110
}
111
112
func TestManager_instanceConfigForIntegration(t *testing.T) {
113
mock := newMockIntegration()
114
icfg := mockConfig{Integration: mock}
115
116
im := instance.NewBasicManager(instance.DefaultBasicManagerConfig, log.NewNopLogger(), mockInstanceFactory)
117
m, err := NewManager(mockManagerConfig(), log.NewNopLogger(), im, noOpValidator)
118
require.NoError(t, err)
119
defer m.Stop()
120
121
p := &integrationProcess{instanceKey: "key", cfg: makeUnmarshaledConfig(icfg, true), i: mock}
122
cfg := m.instanceConfigForIntegration(p, mockManagerConfig())
123
124
// Validate that the generated MetricsPath is a valid URL path
125
require.Len(t, cfg.ScrapeConfigs, 1)
126
require.Equal(t, "/integrations/mock/metrics", cfg.ScrapeConfigs[0].MetricsPath)
127
}
128
129
func makeUnmarshaledConfig(cfg Config, enabled bool) UnmarshaledConfig {
130
return UnmarshaledConfig{Config: cfg, Common: config.Common{Enabled: enabled}}
131
}
132
133
// TestManager_NoIntegrationsScrape ensures that configs don't get generates
134
// when the ScrapeIntegrations flag is disabled.
135
func TestManager_NoIntegrationsScrape(t *testing.T) {
136
mock := newMockIntegration()
137
icfg := mockConfig{Integration: mock}
138
139
im := instance.NewBasicManager(instance.DefaultBasicManagerConfig, log.NewNopLogger(), mockInstanceFactory)
140
141
cfg := mockManagerConfig()
142
cfg.ScrapeIntegrations = false
143
cfg.Integrations = append(cfg.Integrations, makeUnmarshaledConfig(&icfg, true))
144
145
m, err := NewManager(cfg, log.NewNopLogger(), im, noOpValidator)
146
require.NoError(t, err)
147
defer m.Stop()
148
149
// Normally we'd use test.Poll here, but since im.ListConfigs starts out with a
150
// length of zero, test.Poll would immediately pass. Instead we want to wait for a
151
// bit to make sure that the length of ListConfigs doesn't become non-zero.
152
time.Sleep(time.Second)
153
require.Zero(t, len(im.ListConfigs()))
154
}
155
156
// TestManager_NoIntegrationScrape ensures that configs don't get generates
157
// when the ScrapeIntegration flag is disabled on the integration.
158
func TestManager_NoIntegrationScrape(t *testing.T) {
159
mock := newMockIntegration()
160
icfg := mockConfig{Integration: mock}
161
noScrape := false
162
163
im := instance.NewBasicManager(instance.DefaultBasicManagerConfig, log.NewNopLogger(), mockInstanceFactory)
164
165
cfg := mockManagerConfig()
166
cfg.Integrations = append(cfg.Integrations, UnmarshaledConfig{
167
Config: icfg,
168
Common: config.Common{ScrapeIntegration: &noScrape},
169
})
170
171
m, err := NewManager(cfg, log.NewNopLogger(), im, noOpValidator)
172
require.NoError(t, err)
173
defer m.Stop()
174
175
time.Sleep(time.Second)
176
require.Zero(t, len(im.ListConfigs()))
177
}
178
179
// TestManager_StartsIntegrations tests that, when given an integration to
180
// launch, TestManager applies a config and runs the integration.
181
func TestManager_StartsIntegrations(t *testing.T) {
182
mock := newMockIntegration()
183
icfg := mockConfig{Integration: mock}
184
185
cfg := mockManagerConfig()
186
cfg.Integrations = append(cfg.Integrations, makeUnmarshaledConfig(icfg, true))
187
188
im := instance.NewBasicManager(instance.DefaultBasicManagerConfig, log.NewNopLogger(), mockInstanceFactory)
189
m, err := NewManager(cfg, log.NewNopLogger(), im, noOpValidator)
190
require.NoError(t, err)
191
defer m.Stop()
192
193
test.Poll(t, time.Second, 1, func() interface{} {
194
return len(im.ListConfigs())
195
})
196
197
// Check that the instance was set to run
198
test.Poll(t, time.Second, 1, func() interface{} {
199
return int(mock.startedCount.Load())
200
})
201
}
202
203
func TestManager_RestartsIntegrations(t *testing.T) {
204
mock := newMockIntegration()
205
icfg := mockConfig{Integration: mock}
206
207
cfg := mockManagerConfig()
208
cfg.Integrations = append(cfg.Integrations, makeUnmarshaledConfig(icfg, true))
209
210
im := instance.NewBasicManager(instance.DefaultBasicManagerConfig, log.NewNopLogger(), mockInstanceFactory)
211
m, err := NewManager(cfg, log.NewNopLogger(), im, noOpValidator)
212
require.NoError(t, err)
213
defer m.Stop()
214
215
mock.err <- fmt.Errorf("I can't believe this horrible error happened")
216
217
test.Poll(t, time.Second, 2, func() interface{} {
218
return int(mock.startedCount.Load())
219
})
220
}
221
222
func TestManager_GracefulStop(t *testing.T) {
223
mock := newMockIntegration()
224
icfg := mockConfig{Integration: mock}
225
226
cfg := mockManagerConfig()
227
cfg.Integrations = append(cfg.Integrations, makeUnmarshaledConfig(icfg, true))
228
229
im := instance.NewBasicManager(instance.DefaultBasicManagerConfig, log.NewNopLogger(), mockInstanceFactory)
230
m, err := NewManager(cfg, log.NewNopLogger(), im, noOpValidator)
231
require.NoError(t, err)
232
233
test.Poll(t, time.Second, 1, func() interface{} {
234
return int(mock.startedCount.Load())
235
})
236
237
m.Stop()
238
239
time.Sleep(500 * time.Millisecond)
240
require.Equal(t, 1, int(mock.startedCount.Load()), "graceful shutdown should not have restarted the Integration")
241
242
test.Poll(t, time.Second, false, func() interface{} {
243
return mock.running.Load()
244
})
245
}
246
247
func TestManager_IntegrationEnabledToDisabledReload(t *testing.T) {
248
mock := newMockIntegration()
249
icfg := mockConfig{Integration: mock}
250
cfg := mockManagerConfig()
251
cfg.Integrations = append(cfg.Integrations, makeUnmarshaledConfig(icfg, true))
252
253
im := instance.NewBasicManager(instance.DefaultBasicManagerConfig, log.NewNopLogger(), mockInstanceFactory)
254
m, err := NewManager(cfg, log.NewNopLogger(), im, noOpValidator)
255
require.NoError(t, err)
256
257
// Test for Enabled -> Disabled
258
_ = m.ApplyConfig(generateMockConfigWithEnabledFlag(false))
259
require.Len(t, m.integrations, 0, "Integration was disabled so should be removed from map")
260
_, err = m.im.GetInstance(mockIntegrationName)
261
require.Error(t, err, "This mock should not exist")
262
263
// test for Disabled -> Enabled
264
_ = m.ApplyConfig(generateMockConfigWithEnabledFlag(true))
265
require.Len(t, m.integrations, 1, "Integration was enabled so should be here")
266
_, err = m.im.GetInstance(mockIntegrationName)
267
require.NoError(t, err, "This mock should exist")
268
require.Len(t, m.im.ListInstances(), 1, "This instance should exist")
269
}
270
271
func TestManager_IntegrationDisabledToEnabledReload(t *testing.T) {
272
mock := newMockIntegration()
273
icfg := mockConfig{Integration: mock}
274
275
cfg := mockManagerConfig()
276
cfg.Integrations = append(cfg.Integrations, UnmarshaledConfig{
277
Config: icfg,
278
Common: config.Common{Enabled: false},
279
})
280
281
im := instance.NewBasicManager(instance.DefaultBasicManagerConfig, log.NewNopLogger(), mockInstanceFactory)
282
m, err := NewManager(cfg, log.NewNopLogger(), im, noOpValidator)
283
require.NoError(t, err)
284
require.Len(t, m.integrations, 0, "Integration was disabled so should be removed from map")
285
_, err = m.im.GetInstance(mockIntegrationName)
286
require.Error(t, err, "This mock should not exist")
287
288
// test for Disabled -> Enabled
289
290
_ = m.ApplyConfig(generateMockConfigWithEnabledFlag(true))
291
require.Len(t, m.integrations, 1, "Integration was enabled so should be here")
292
_, err = m.im.GetInstance(mockIntegrationName)
293
require.NoError(t, err, "This mock should exist")
294
require.Len(t, m.im.ListInstances(), 1, "This instance should exist")
295
}
296
297
type PromDefaultsValidator struct {
298
PrometheusGlobalConfig promConfig.GlobalConfig
299
}
300
301
func (i *PromDefaultsValidator) validate(c *instance.Config) error {
302
instanceConfig := instance.GlobalConfig{
303
Prometheus: i.PrometheusGlobalConfig,
304
}
305
return c.ApplyDefaults(instanceConfig)
306
}
307
308
func TestManager_PromConfigChangeReloads(t *testing.T) {
309
mock := newMockIntegration()
310
icfg := mockConfig{Integration: mock}
311
312
cfg := mockManagerConfig()
313
cfg.Integrations = append(cfg.Integrations, makeUnmarshaledConfig(icfg, true))
314
315
im := instance.NewBasicManager(instance.DefaultBasicManagerConfig, log.NewNopLogger(), mockInstanceFactory)
316
317
startingPromConfig := mockPromConfigWithValues(model.Duration(30*time.Second), model.Duration(25*time.Second))
318
cfg.PrometheusGlobalConfig = startingPromConfig
319
validator := PromDefaultsValidator{startingPromConfig}
320
321
m, err := NewManager(cfg, log.NewNopLogger(), im, validator.validate)
322
require.NoError(t, err)
323
require.Len(t, m.im.ListConfigs(), 1, "Integration was enabled so should be here")
324
//The integration never has the prom config overrides happen so go after the running instance config instead
325
for _, c := range m.im.ListConfigs() {
326
for _, scrape := range c.ScrapeConfigs {
327
require.Equal(t, startingPromConfig.ScrapeInterval, scrape.ScrapeInterval)
328
require.Equal(t, startingPromConfig.ScrapeTimeout, scrape.ScrapeTimeout)
329
}
330
}
331
332
newPromConfig := mockPromConfigWithValues(model.Duration(60*time.Second), model.Duration(55*time.Second))
333
cfg.PrometheusGlobalConfig = newPromConfig
334
validator.PrometheusGlobalConfig = newPromConfig
335
336
err = m.ApplyConfig(cfg)
337
require.NoError(t, err)
338
339
require.Len(t, m.im.ListConfigs(), 1, "Integration was enabled so should be here")
340
//The integration never has the prom config overrides happen so go after the running instance config instead
341
for _, c := range m.im.ListConfigs() {
342
for _, scrape := range c.ScrapeConfigs {
343
require.Equal(t, newPromConfig.ScrapeInterval, scrape.ScrapeInterval)
344
require.Equal(t, newPromConfig.ScrapeTimeout, scrape.ScrapeTimeout)
345
}
346
}
347
}
348
349
func generateMockConfigWithEnabledFlag(enabled bool) ManagerConfig {
350
enabledMock := newMockIntegration()
351
enabledConfig := mockConfig{Integration: enabledMock}
352
enabledManagerConfig := mockManagerConfig()
353
enabledManagerConfig.Integrations = append(
354
enabledManagerConfig.Integrations,
355
makeUnmarshaledConfig(enabledConfig, enabled),
356
)
357
return enabledManagerConfig
358
}
359
360
type mockConfig struct {
361
Integration *mockIntegration `yaml:"mock"`
362
}
363
364
// Equal is used for cmp.Equal, since otherwise mockConfig can't be compared to itself.
365
func (c mockConfig) Equal(other mockConfig) bool { return c.Integration == other.Integration }
366
367
func (c mockConfig) Name() string { return "mock" }
368
func (c mockConfig) InstanceKey(agentKey string) (string, error) { return agentKey, nil }
369
370
func (c mockConfig) NewIntegration(_ log.Logger) (Integration, error) {
371
return c.Integration, nil
372
}
373
374
type mockIntegration struct {
375
startedCount *atomic.Uint32
376
running *atomic.Bool
377
err chan error
378
}
379
380
func newMockIntegration() *mockIntegration {
381
return &mockIntegration{
382
running: atomic.NewBool(true),
383
startedCount: atomic.NewUint32(0),
384
err: make(chan error),
385
}
386
}
387
388
func (i *mockIntegration) MetricsHandler() (http.Handler, error) {
389
return promhttp.Handler(), nil
390
}
391
392
func (i *mockIntegration) ScrapeConfigs() []config.ScrapeConfig {
393
return []config.ScrapeConfig{{
394
JobName: "mock",
395
MetricsPath: "/metrics",
396
}}
397
}
398
399
func (i *mockIntegration) Run(ctx context.Context) error {
400
i.startedCount.Inc()
401
i.running.Store(true)
402
defer i.running.Store(false)
403
404
select {
405
case <-ctx.Done():
406
return ctx.Err()
407
case err := <-i.err:
408
return err
409
}
410
}
411
412
func mockInstanceFactory(_ instance.Config) (instance.ManagedInstance, error) {
413
return instance.NoOpInstance{}, nil
414
}
415
416
func mockManagerConfig() ManagerConfig {
417
listenPort := 0
418
listenHost := "127.0.0.1"
419
return ManagerConfig{
420
ScrapeIntegrations: true,
421
IntegrationRestartBackoff: 0,
422
ListenPort: listenPort,
423
ListenHost: listenHost,
424
}
425
}
426
427
func mockPromConfigWithValues(scrapeInterval model.Duration, scrapeTimeout model.Duration) promConfig.GlobalConfig {
428
return promConfig.GlobalConfig{
429
ScrapeInterval: scrapeInterval,
430
ScrapeTimeout: scrapeTimeout,
431
}
432
}
433
434