Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/metrics/instance/instance_test.go
4094 views
1
package instance
2
3
import (
4
"context"
5
"fmt"
6
"net/http/httptest"
7
"os"
8
"path"
9
"strings"
10
"sync"
11
"testing"
12
"time"
13
14
"github.com/cortexproject/cortex/pkg/util/test"
15
"github.com/go-kit/log"
16
"github.com/prometheus/client_golang/prometheus"
17
"github.com/prometheus/client_golang/prometheus/promhttp"
18
"github.com/prometheus/common/model"
19
"github.com/prometheus/prometheus/config"
20
"github.com/prometheus/prometheus/discovery"
21
"github.com/prometheus/prometheus/model/exemplar"
22
"github.com/prometheus/prometheus/model/histogram"
23
"github.com/prometheus/prometheus/model/labels"
24
"github.com/prometheus/prometheus/model/metadata"
25
"github.com/prometheus/prometheus/storage"
26
"github.com/stretchr/testify/require"
27
)
28
29
func TestConfig_Unmarshal_Defaults(t *testing.T) {
30
global := DefaultGlobalConfig
31
cfgText := `name: test
32
scrape_configs:
33
- job_name: local_scrape
34
static_configs:
35
- targets: ['127.0.0.1:12345']
36
labels:
37
cluster: 'localhost'
38
remote_write:
39
- url: http://localhost:9009/api/prom/push`
40
41
cfg, err := UnmarshalConfig(strings.NewReader(cfgText))
42
require.NoError(t, err)
43
44
err = cfg.ApplyDefaults(global)
45
require.NoError(t, err)
46
47
require.Equal(t, DefaultConfig.HostFilter, cfg.HostFilter)
48
require.Equal(t, DefaultConfig.WALTruncateFrequency, cfg.WALTruncateFrequency)
49
require.Equal(t, DefaultConfig.RemoteFlushDeadline, cfg.RemoteFlushDeadline)
50
require.Equal(t, DefaultConfig.WriteStaleOnShutdown, cfg.WriteStaleOnShutdown)
51
52
for _, sc := range cfg.ScrapeConfigs {
53
require.Equal(t, sc.ScrapeInterval, global.Prometheus.ScrapeInterval)
54
require.Equal(t, sc.ScrapeTimeout, global.Prometheus.ScrapeTimeout)
55
}
56
}
57
58
func TestConfig_ApplyDefaults_Validations(t *testing.T) {
59
global := DefaultGlobalConfig
60
cfg := DefaultConfig
61
cfg.Name = "instance"
62
cfg.ScrapeConfigs = []*config.ScrapeConfig{{
63
JobName: "scrape",
64
ServiceDiscoveryConfigs: discovery.Configs{
65
discovery.StaticConfig{{
66
Targets: []model.LabelSet{{
67
model.AddressLabel: model.LabelValue("127.0.0.1:12345"),
68
}},
69
Labels: model.LabelSet{"cluster": "localhost"},
70
}},
71
},
72
}}
73
cfg.RemoteWrite = []*config.RemoteWriteConfig{{Name: "write"}}
74
75
tt := []struct {
76
name string
77
mutation func(c *Config)
78
err error
79
}{
80
{
81
"valid config",
82
nil,
83
nil,
84
},
85
{
86
"requires name",
87
func(c *Config) { c.Name = "" },
88
fmt.Errorf("missing instance name"),
89
},
90
{
91
"missing scrape",
92
func(c *Config) { c.ScrapeConfigs[0] = nil },
93
fmt.Errorf("empty or null scrape config section"),
94
},
95
{
96
"missing wal truncate frequency",
97
func(c *Config) { c.WALTruncateFrequency = 0 },
98
fmt.Errorf("wal_truncate_frequency must be greater than 0s"),
99
},
100
{
101
"missing remote flush deadline",
102
func(c *Config) { c.RemoteFlushDeadline = 0 },
103
fmt.Errorf("remote_flush_deadline must be greater than 0s"),
104
},
105
{
106
"scrape timeout too high",
107
func(c *Config) { c.ScrapeConfigs[0].ScrapeTimeout = global.Prometheus.ScrapeInterval + 1 },
108
fmt.Errorf("scrape timeout greater than scrape interval for scrape config with job name \"scrape\""),
109
},
110
{
111
"scrape interval greater than truncate frequency",
112
func(c *Config) { c.ScrapeConfigs[0].ScrapeInterval = model.Duration(c.WALTruncateFrequency + 1) },
113
fmt.Errorf("scrape interval greater than wal_truncate_frequency for scrape config with job name \"scrape\""),
114
},
115
{
116
"multiple scrape configs with same name",
117
func(c *Config) {
118
c.ScrapeConfigs = append(c.ScrapeConfigs, &config.ScrapeConfig{
119
JobName: "scrape",
120
})
121
},
122
fmt.Errorf("found multiple scrape configs with job name \"scrape\""),
123
},
124
{
125
"empty remote write",
126
func(c *Config) { c.RemoteWrite = append(c.RemoteWrite, nil) },
127
fmt.Errorf("empty or null remote write config section"),
128
},
129
{
130
"multiple remote writes with same name",
131
func(c *Config) {
132
c.RemoteWrite = []*config.RemoteWriteConfig{
133
{Name: "foo"},
134
{Name: "foo"},
135
}
136
},
137
fmt.Errorf("found duplicate remote write configs with name \"foo\""),
138
},
139
}
140
141
for _, tc := range tt {
142
t.Run(tc.name, func(t *testing.T) {
143
// Copy the input and all of its slices
144
input := cfg
145
146
var scrapeConfigs []*config.ScrapeConfig
147
for _, sc := range input.ScrapeConfigs {
148
scCopy := *sc
149
scrapeConfigs = append(scrapeConfigs, &scCopy)
150
}
151
input.ScrapeConfigs = scrapeConfigs
152
153
var remoteWrites []*config.RemoteWriteConfig
154
for _, rw := range input.RemoteWrite {
155
rwCopy := *rw
156
remoteWrites = append(remoteWrites, &rwCopy)
157
}
158
input.RemoteWrite = remoteWrites
159
160
if tc.mutation != nil {
161
tc.mutation(&input)
162
}
163
164
err := input.ApplyDefaults(global)
165
if tc.err == nil {
166
require.NoError(t, err)
167
} else {
168
require.EqualError(t, err, tc.err.Error())
169
}
170
})
171
}
172
}
173
174
func TestConfig_ApplyDefaults_HashedName(t *testing.T) {
175
cfgText := `
176
name: default
177
host_filter: false
178
remote_write:
179
- url: http://localhost:9009/api/prom/push
180
sigv4: {}`
181
182
cfg, err := UnmarshalConfig(strings.NewReader(cfgText))
183
require.NoError(t, err)
184
require.NoError(t, cfg.ApplyDefaults(DefaultGlobalConfig))
185
require.NotEmpty(t, cfg.RemoteWrite[0].Name)
186
}
187
188
func TestInstance_Path(t *testing.T) {
189
scrapeAddr, closeSrv := getTestServer(t)
190
defer closeSrv()
191
192
walDir := t.TempDir()
193
194
globalConfig := getTestGlobalConfig(t)
195
196
cfg := getTestConfig(t, &globalConfig, scrapeAddr)
197
cfg.WALTruncateFrequency = time.Hour
198
cfg.RemoteFlushDeadline = time.Hour
199
200
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
201
inst, err := New(prometheus.NewRegistry(), cfg, walDir, logger)
202
require.NoError(t, err)
203
runInstance(t, inst)
204
205
// <walDir>/<inst.name> path should exist for WAL
206
test.Poll(t, time.Second*5, true, func() interface{} {
207
_, err := os.Stat(path.Join(walDir, "test"))
208
return err == nil
209
})
210
}
211
212
// TestInstance tests that discovery and scraping are working by using a mock
213
// instance of the WAL storage and testing that samples get written to it.
214
// This test touches most of Instance and is enough for a basic integration test.
215
func TestInstance(t *testing.T) {
216
scrapeAddr, closeSrv := getTestServer(t)
217
defer closeSrv()
218
219
walDir := t.TempDir()
220
221
globalConfig := getTestGlobalConfig(t)
222
cfg := getTestConfig(t, &globalConfig, scrapeAddr)
223
cfg.WALTruncateFrequency = time.Hour
224
cfg.RemoteFlushDeadline = time.Hour
225
226
mockStorage := mockWalStorage{
227
series: make(map[storage.SeriesRef]int),
228
directory: walDir,
229
}
230
newWal := func(_ prometheus.Registerer) (walStorage, error) { return &mockStorage, nil }
231
232
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
233
inst, err := newInstance(cfg, nil, logger, newWal)
234
require.NoError(t, err)
235
runInstance(t, inst)
236
237
// Wait until mockWalStorage has had a series added to it.
238
test.Poll(t, 30*time.Second, true, func() interface{} {
239
mockStorage.mut.Lock()
240
defer mockStorage.mut.Unlock()
241
return len(mockStorage.series) > 0
242
})
243
}
244
245
// TestInstance_Recreate ensures that creating an instance with the same name twice
246
// does not cause any duplicate metrics registration that leads to a panic.
247
func TestInstance_Recreate(t *testing.T) {
248
scrapeAddr, closeSrv := getTestServer(t)
249
defer closeSrv()
250
251
walDir := t.TempDir()
252
253
globalConfig := getTestGlobalConfig(t)
254
255
cfg := getTestConfig(t, &globalConfig, scrapeAddr)
256
cfg.Name = "recreate_test"
257
cfg.WALTruncateFrequency = time.Hour
258
cfg.RemoteFlushDeadline = time.Hour
259
260
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
261
inst, err := New(prometheus.NewRegistry(), cfg, walDir, logger)
262
require.NoError(t, err)
263
264
ctx, cancel := context.WithCancel(context.Background())
265
exited := make(chan bool)
266
go func() {
267
err := inst.Run(ctx)
268
close(exited)
269
270
if err != nil {
271
require.Equal(t, context.Canceled, err)
272
}
273
}()
274
275
time.Sleep(1 * time.Second)
276
cancel()
277
<-exited
278
279
// Recreate the instance, no panic should happen.
280
require.NotPanics(t, func() {
281
inst, err := New(prometheus.NewRegistry(), cfg, walDir, logger)
282
require.NoError(t, err)
283
runInstance(t, inst)
284
285
time.Sleep(1 * time.Second)
286
})
287
}
288
289
func getTestServer(t *testing.T) (addr string, closeFunc func()) {
290
t.Helper()
291
292
reg := prometheus.NewRegistry()
293
294
testCounter := prometheus.NewCounter(prometheus.CounterOpts{
295
Name: "test_metric_total",
296
})
297
testCounter.Inc()
298
reg.MustRegister(testCounter)
299
300
handler := promhttp.HandlerFor(reg, promhttp.HandlerOpts{})
301
httpSrv := httptest.NewServer(handler)
302
return httpSrv.Listener.Addr().String(), httpSrv.Close
303
}
304
305
func getTestGlobalConfig(t *testing.T) GlobalConfig {
306
t.Helper()
307
308
return GlobalConfig{
309
Prometheus: config.GlobalConfig{
310
ScrapeInterval: model.Duration(time.Millisecond * 50),
311
ScrapeTimeout: model.Duration(time.Millisecond * 25),
312
EvaluationInterval: model.Duration(time.Hour),
313
},
314
}
315
}
316
317
func getTestConfig(t *testing.T, global *GlobalConfig, scrapeAddr string) Config {
318
t.Helper()
319
320
scrapeCfg := config.DefaultScrapeConfig
321
scrapeCfg.JobName = "test"
322
scrapeCfg.ScrapeInterval = global.Prometheus.ScrapeInterval
323
scrapeCfg.ScrapeTimeout = global.Prometheus.ScrapeTimeout
324
scrapeCfg.ServiceDiscoveryConfigs = discovery.Configs{
325
discovery.StaticConfig{{
326
Targets: []model.LabelSet{{
327
model.AddressLabel: model.LabelValue(scrapeAddr),
328
}},
329
Labels: model.LabelSet{},
330
}},
331
}
332
333
cfg := DefaultConfig
334
cfg.Name = "test"
335
cfg.ScrapeConfigs = []*config.ScrapeConfig{&scrapeCfg}
336
cfg.global = *global
337
338
return cfg
339
}
340
341
type mockWalStorage struct {
342
storage.Queryable
343
storage.ChunkQueryable
344
345
directory string
346
mut sync.Mutex
347
series map[storage.SeriesRef]int
348
}
349
350
func (s *mockWalStorage) Directory() string { return s.directory }
351
func (s *mockWalStorage) StartTime() (int64, error) { return 0, nil }
352
func (s *mockWalStorage) WriteStalenessMarkers(f func() int64) error { return nil }
353
func (s *mockWalStorage) Close() error { return nil }
354
func (s *mockWalStorage) Truncate(mint int64) error { return nil }
355
356
func (s *mockWalStorage) Appender(context.Context) storage.Appender {
357
return &mockAppender{s: s}
358
}
359
360
type mockAppender struct {
361
s *mockWalStorage
362
}
363
364
func (a *mockAppender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
365
if ref == 0 {
366
return a.Add(l, t, v)
367
}
368
return ref, a.AddFast(ref, t, v)
369
}
370
371
// Add adds a new series and sets its written count to 1.
372
func (a *mockAppender) Add(l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
373
a.s.mut.Lock()
374
defer a.s.mut.Unlock()
375
376
hash := l.Hash()
377
a.s.series[storage.SeriesRef(hash)] = 1
378
return storage.SeriesRef(hash), nil
379
}
380
381
// AddFast increments the number of writes to an existing series.
382
func (a *mockAppender) AddFast(ref storage.SeriesRef, t int64, v float64) error {
383
a.s.mut.Lock()
384
defer a.s.mut.Unlock()
385
_, ok := a.s.series[ref]
386
if !ok {
387
return storage.ErrNotFound
388
}
389
390
a.s.series[ref]++
391
return nil
392
}
393
394
func (a *mockAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
395
return 0, nil
396
}
397
398
func (a *mockAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
399
return 0, nil
400
}
401
402
func (a *mockAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
403
return 0, nil
404
}
405
406
func (a *mockAppender) Commit() error {
407
return nil
408
}
409
410
func (a *mockAppender) Rollback() error {
411
return nil
412
}
413
414
func runInstance(t *testing.T, i *Instance) {
415
ctx, cancel := context.WithCancel(context.Background())
416
t.Cleanup(func() { cancel() })
417
go require.NotPanics(t, func() {
418
_ = i.Run(ctx)
419
})
420
}
421
422