Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/metrics/agent_test.go
4093 views
1
package metrics
2
3
import (
4
"context"
5
"errors"
6
"fmt"
7
"sync"
8
"testing"
9
"time"
10
11
"github.com/cortexproject/cortex/pkg/util/test"
12
"github.com/go-kit/log"
13
"github.com/grafana/agent/pkg/metrics/instance"
14
"github.com/prometheus/client_golang/prometheus"
15
"github.com/prometheus/prometheus/scrape"
16
"github.com/prometheus/prometheus/storage"
17
"github.com/stretchr/testify/require"
18
"go.uber.org/atomic"
19
"gopkg.in/yaml.v2"
20
)
21
22
func TestConfig_Validate(t *testing.T) {
23
valid := Config{
24
WALDir: "/tmp/data",
25
Configs: []instance.Config{
26
makeInstanceConfig("instance"),
27
},
28
InstanceMode: instance.DefaultMode,
29
}
30
31
tt := []struct {
32
name string
33
mutator func(c *Config)
34
expect error
35
}{
36
{
37
name: "complete config should be valid",
38
mutator: func(c *Config) {},
39
expect: nil,
40
},
41
{
42
name: "no wal dir",
43
mutator: func(c *Config) { c.WALDir = "" },
44
expect: errors.New("no wal_directory configured"),
45
},
46
{
47
name: "missing instance name",
48
mutator: func(c *Config) { c.Configs[0].Name = "" },
49
expect: errors.New("error validating instance at index 0: missing instance name"),
50
},
51
{
52
name: "duplicate config name",
53
mutator: func(c *Config) {
54
c.Configs = append(c.Configs,
55
makeInstanceConfig("newinstance"),
56
makeInstanceConfig("instance"),
57
)
58
},
59
expect: errors.New("prometheus instance names must be unique. found multiple instances with name instance"),
60
},
61
}
62
63
for _, tc := range tt {
64
t.Run(tc.name, func(t *testing.T) {
65
cfg := copyConfig(t, valid)
66
tc.mutator(&cfg)
67
68
err := cfg.ApplyDefaults()
69
if tc.expect == nil {
70
require.NoError(t, err)
71
} else {
72
require.EqualError(t, err, tc.expect.Error())
73
}
74
})
75
}
76
}
77
78
func copyConfig(t *testing.T, c Config) Config {
79
t.Helper()
80
81
bb, err := yaml.Marshal(c)
82
require.NoError(t, err)
83
84
var cp Config
85
err = yaml.Unmarshal(bb, &cp)
86
require.NoError(t, err)
87
return cp
88
}
89
90
func TestConfigNonzeroDefaultScrapeInterval(t *testing.T) {
91
cfgText := `
92
wal_directory: ./wal
93
configs:
94
- name: testconfig
95
scrape_configs:
96
- job_name: 'node'
97
static_configs:
98
- targets: ['localhost:9100']
99
`
100
101
var cfg Config
102
103
err := yaml.Unmarshal([]byte(cfgText), &cfg)
104
require.NoError(t, err)
105
err = cfg.ApplyDefaults()
106
require.NoError(t, err)
107
108
require.Equal(t, len(cfg.Configs), 1)
109
instanceConfig := cfg.Configs[0]
110
require.Equal(t, len(instanceConfig.ScrapeConfigs), 1)
111
scrapeConfig := instanceConfig.ScrapeConfigs[0]
112
require.Greater(t, int64(scrapeConfig.ScrapeInterval), int64(0))
113
}
114
115
func TestAgent(t *testing.T) {
116
// Launch two instances
117
cfg := Config{
118
WALDir: "/tmp/wal",
119
Configs: []instance.Config{
120
makeInstanceConfig("instance_a"),
121
makeInstanceConfig("instance_b"),
122
},
123
InstanceRestartBackoff: time.Duration(0),
124
InstanceMode: instance.ModeDistinct,
125
}
126
127
fact := newFakeInstanceFactory()
128
129
a, err := newAgent(prometheus.NewRegistry(), cfg, log.NewNopLogger(), fact.factory)
130
require.NoError(t, err)
131
132
test.Poll(t, time.Second*30, true, func() interface{} {
133
if fact.created == nil {
134
return false
135
}
136
return fact.created.Load() == 2 && len(a.mm.ListInstances()) == 2
137
})
138
139
t.Run("instances should be running", func(t *testing.T) {
140
for _, mi := range fact.Mocks() {
141
// Each instance should have wait called on it
142
test.Poll(t, time.Millisecond*500, true, func() interface{} {
143
return mi.running.Load()
144
})
145
}
146
})
147
148
t.Run("instances should be restarted when stopped", func(t *testing.T) {
149
for _, mi := range fact.Mocks() {
150
test.Poll(t, time.Millisecond*500, int64(1), func() interface{} {
151
return mi.startedCount.Load()
152
})
153
}
154
155
for _, mi := range fact.Mocks() {
156
mi.err <- fmt.Errorf("really bad error")
157
}
158
159
for _, mi := range fact.Mocks() {
160
test.Poll(t, time.Millisecond*500, int64(2), func() interface{} {
161
return mi.startedCount.Load()
162
})
163
}
164
})
165
}
166
167
func TestAgent_NormalInstanceExits(t *testing.T) {
168
tt := []struct {
169
name string
170
simulateError error
171
}{
172
{"no error", nil},
173
{"context cancelled", context.Canceled},
174
}
175
176
cfg := Config{
177
WALDir: "/tmp/wal",
178
Configs: []instance.Config{
179
makeInstanceConfig("instance_a"),
180
makeInstanceConfig("instance_b"),
181
},
182
InstanceRestartBackoff: time.Duration(0),
183
InstanceMode: instance.ModeDistinct,
184
}
185
186
for _, tc := range tt {
187
t.Run(tc.name, func(t *testing.T) {
188
fact := newFakeInstanceFactory()
189
190
a, err := newAgent(prometheus.NewRegistry(), cfg, log.NewNopLogger(), fact.factory)
191
require.NoError(t, err)
192
193
test.Poll(t, time.Second*30, true, func() interface{} {
194
if fact.created == nil {
195
return false
196
}
197
return fact.created.Load() == 2 && len(a.mm.ListInstances()) == 2
198
})
199
200
for _, mi := range fact.Mocks() {
201
mi.err <- tc.simulateError
202
}
203
204
time.Sleep(time.Millisecond * 100)
205
206
// Get the new total amount of instances starts; value should
207
// be unchanged.
208
var startedCount int64
209
for _, i := range fact.Mocks() {
210
startedCount += i.startedCount.Load()
211
}
212
213
// There should only be two instances that started. If there's more, something
214
// restarted despite our error.
215
require.Equal(t, int64(2), startedCount, "instances should not have restarted")
216
})
217
}
218
}
219
220
func TestAgent_Stop(t *testing.T) {
221
// Launch two instances
222
cfg := Config{
223
WALDir: "/tmp/wal",
224
Configs: []instance.Config{
225
makeInstanceConfig("instance_a"),
226
makeInstanceConfig("instance_b"),
227
},
228
InstanceRestartBackoff: time.Duration(0),
229
InstanceMode: instance.ModeDistinct,
230
}
231
232
fact := newFakeInstanceFactory()
233
234
a, err := newAgent(prometheus.NewRegistry(), cfg, log.NewNopLogger(), fact.factory)
235
require.NoError(t, err)
236
237
test.Poll(t, time.Second*30, true, func() interface{} {
238
if fact.created == nil {
239
return false
240
}
241
return fact.created.Load() == 2 && len(a.mm.ListInstances()) == 2
242
})
243
244
a.Stop()
245
246
time.Sleep(time.Millisecond * 100)
247
248
for _, mi := range fact.Mocks() {
249
require.False(t, mi.running.Load(), "instance should not have been restarted")
250
}
251
}
252
253
type fakeInstance struct {
254
cfg instance.Config
255
256
err chan error
257
startedCount *atomic.Int64
258
running *atomic.Bool
259
}
260
261
func (i *fakeInstance) Run(ctx context.Context) error {
262
i.startedCount.Inc()
263
i.running.Store(true)
264
defer i.running.Store(false)
265
266
select {
267
case <-ctx.Done():
268
return ctx.Err()
269
case err := <-i.err:
270
return err
271
}
272
}
273
274
func (i *fakeInstance) Ready() bool {
275
return true
276
}
277
278
func (i *fakeInstance) Update(_ instance.Config) error {
279
return instance.ErrInvalidUpdate{
280
Inner: fmt.Errorf("can't dynamically update fakeInstance"),
281
}
282
}
283
284
func (i *fakeInstance) TargetsActive() map[string][]*scrape.Target {
285
return nil
286
}
287
288
func (i *fakeInstance) StorageDirectory() string {
289
return ""
290
}
291
292
func (i *fakeInstance) Appender(ctx context.Context) storage.Appender {
293
return nil
294
}
295
296
type fakeInstanceFactory struct {
297
mut sync.Mutex
298
mocks []*fakeInstance
299
300
created *atomic.Int64
301
}
302
303
func newFakeInstanceFactory() *fakeInstanceFactory {
304
return &fakeInstanceFactory{created: atomic.NewInt64(0)}
305
}
306
307
func (f *fakeInstanceFactory) Mocks() []*fakeInstance {
308
f.mut.Lock()
309
defer f.mut.Unlock()
310
return f.mocks
311
}
312
313
func (f *fakeInstanceFactory) factory(_ prometheus.Registerer, cfg instance.Config, _ string, _ log.Logger) (instance.ManagedInstance, error) {
314
f.created.Add(1)
315
316
f.mut.Lock()
317
defer f.mut.Unlock()
318
319
inst := &fakeInstance{
320
cfg: cfg,
321
running: atomic.NewBool(false),
322
startedCount: atomic.NewInt64(0),
323
err: make(chan error),
324
}
325
326
f.mocks = append(f.mocks, inst)
327
return inst, nil
328
}
329
330
func makeInstanceConfig(name string) instance.Config {
331
cfg := instance.DefaultConfig
332
cfg.Name = name
333
return cfg
334
}
335
336
func TestAgent_MarshalYAMLOmitDefaultConfigFields(t *testing.T) {
337
cfg := DefaultConfig
338
yml, err := yaml.Marshal(&cfg)
339
require.NoError(t, err)
340
require.NotContains(t, string(yml), "scraping_service_client")
341
require.NotContains(t, string(yml), "scraping_service")
342
require.NotContains(t, string(yml), "global")
343
}
344
345