Path: blob/main/pkg/metrics/instance/instance_test.go
4094 views
package instance12import (3"context"4"fmt"5"net/http/httptest"6"os"7"path"8"strings"9"sync"10"testing"11"time"1213"github.com/cortexproject/cortex/pkg/util/test"14"github.com/go-kit/log"15"github.com/prometheus/client_golang/prometheus"16"github.com/prometheus/client_golang/prometheus/promhttp"17"github.com/prometheus/common/model"18"github.com/prometheus/prometheus/config"19"github.com/prometheus/prometheus/discovery"20"github.com/prometheus/prometheus/model/exemplar"21"github.com/prometheus/prometheus/model/histogram"22"github.com/prometheus/prometheus/model/labels"23"github.com/prometheus/prometheus/model/metadata"24"github.com/prometheus/prometheus/storage"25"github.com/stretchr/testify/require"26)2728func TestConfig_Unmarshal_Defaults(t *testing.T) {29global := DefaultGlobalConfig30cfgText := `name: test31scrape_configs:32- job_name: local_scrape33static_configs:34- targets: ['127.0.0.1:12345']35labels:36cluster: 'localhost'37remote_write:38- url: http://localhost:9009/api/prom/push`3940cfg, err := UnmarshalConfig(strings.NewReader(cfgText))41require.NoError(t, err)4243err = cfg.ApplyDefaults(global)44require.NoError(t, err)4546require.Equal(t, DefaultConfig.HostFilter, cfg.HostFilter)47require.Equal(t, DefaultConfig.WALTruncateFrequency, cfg.WALTruncateFrequency)48require.Equal(t, DefaultConfig.RemoteFlushDeadline, cfg.RemoteFlushDeadline)49require.Equal(t, DefaultConfig.WriteStaleOnShutdown, cfg.WriteStaleOnShutdown)5051for _, sc := range cfg.ScrapeConfigs {52require.Equal(t, sc.ScrapeInterval, global.Prometheus.ScrapeInterval)53require.Equal(t, sc.ScrapeTimeout, global.Prometheus.ScrapeTimeout)54}55}5657func TestConfig_ApplyDefaults_Validations(t *testing.T) {58global := DefaultGlobalConfig59cfg := DefaultConfig60cfg.Name = "instance"61cfg.ScrapeConfigs = []*config.ScrapeConfig{{62JobName: "scrape",63ServiceDiscoveryConfigs: discovery.Configs{64discovery.StaticConfig{{65Targets: []model.LabelSet{{66model.AddressLabel: model.LabelValue("127.0.0.1:12345"),67}},68Labels: model.LabelSet{"cluster": "localhost"},69}},70},71}}72cfg.RemoteWrite = []*config.RemoteWriteConfig{{Name: "write"}}7374tt := []struct {75name string76mutation func(c *Config)77err error78}{79{80"valid config",81nil,82nil,83},84{85"requires name",86func(c *Config) { c.Name = "" },87fmt.Errorf("missing instance name"),88},89{90"missing scrape",91func(c *Config) { c.ScrapeConfigs[0] = nil },92fmt.Errorf("empty or null scrape config section"),93},94{95"missing wal truncate frequency",96func(c *Config) { c.WALTruncateFrequency = 0 },97fmt.Errorf("wal_truncate_frequency must be greater than 0s"),98},99{100"missing remote flush deadline",101func(c *Config) { c.RemoteFlushDeadline = 0 },102fmt.Errorf("remote_flush_deadline must be greater than 0s"),103},104{105"scrape timeout too high",106func(c *Config) { c.ScrapeConfigs[0].ScrapeTimeout = global.Prometheus.ScrapeInterval + 1 },107fmt.Errorf("scrape timeout greater than scrape interval for scrape config with job name \"scrape\""),108},109{110"scrape interval greater than truncate frequency",111func(c *Config) { c.ScrapeConfigs[0].ScrapeInterval = model.Duration(c.WALTruncateFrequency + 1) },112fmt.Errorf("scrape interval greater than wal_truncate_frequency for scrape config with job name \"scrape\""),113},114{115"multiple scrape configs with same name",116func(c *Config) {117c.ScrapeConfigs = append(c.ScrapeConfigs, &config.ScrapeConfig{118JobName: "scrape",119})120},121fmt.Errorf("found multiple scrape configs with job name \"scrape\""),122},123{124"empty remote write",125func(c *Config) { c.RemoteWrite = append(c.RemoteWrite, nil) },126fmt.Errorf("empty or null remote write config section"),127},128{129"multiple remote writes with same name",130func(c *Config) {131c.RemoteWrite = []*config.RemoteWriteConfig{132{Name: "foo"},133{Name: "foo"},134}135},136fmt.Errorf("found duplicate remote write configs with name \"foo\""),137},138}139140for _, tc := range tt {141t.Run(tc.name, func(t *testing.T) {142// Copy the input and all of its slices143input := cfg144145var scrapeConfigs []*config.ScrapeConfig146for _, sc := range input.ScrapeConfigs {147scCopy := *sc148scrapeConfigs = append(scrapeConfigs, &scCopy)149}150input.ScrapeConfigs = scrapeConfigs151152var remoteWrites []*config.RemoteWriteConfig153for _, rw := range input.RemoteWrite {154rwCopy := *rw155remoteWrites = append(remoteWrites, &rwCopy)156}157input.RemoteWrite = remoteWrites158159if tc.mutation != nil {160tc.mutation(&input)161}162163err := input.ApplyDefaults(global)164if tc.err == nil {165require.NoError(t, err)166} else {167require.EqualError(t, err, tc.err.Error())168}169})170}171}172173func TestConfig_ApplyDefaults_HashedName(t *testing.T) {174cfgText := `175name: default176host_filter: false177remote_write:178- url: http://localhost:9009/api/prom/push179sigv4: {}`180181cfg, err := UnmarshalConfig(strings.NewReader(cfgText))182require.NoError(t, err)183require.NoError(t, cfg.ApplyDefaults(DefaultGlobalConfig))184require.NotEmpty(t, cfg.RemoteWrite[0].Name)185}186187func TestInstance_Path(t *testing.T) {188scrapeAddr, closeSrv := getTestServer(t)189defer closeSrv()190191walDir := t.TempDir()192193globalConfig := getTestGlobalConfig(t)194195cfg := getTestConfig(t, &globalConfig, scrapeAddr)196cfg.WALTruncateFrequency = time.Hour197cfg.RemoteFlushDeadline = time.Hour198199logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))200inst, err := New(prometheus.NewRegistry(), cfg, walDir, logger)201require.NoError(t, err)202runInstance(t, inst)203204// <walDir>/<inst.name> path should exist for WAL205test.Poll(t, time.Second*5, true, func() interface{} {206_, err := os.Stat(path.Join(walDir, "test"))207return err == nil208})209}210211// TestInstance tests that discovery and scraping are working by using a mock212// instance of the WAL storage and testing that samples get written to it.213// This test touches most of Instance and is enough for a basic integration test.214func TestInstance(t *testing.T) {215scrapeAddr, closeSrv := getTestServer(t)216defer closeSrv()217218walDir := t.TempDir()219220globalConfig := getTestGlobalConfig(t)221cfg := getTestConfig(t, &globalConfig, scrapeAddr)222cfg.WALTruncateFrequency = time.Hour223cfg.RemoteFlushDeadline = time.Hour224225mockStorage := mockWalStorage{226series: make(map[storage.SeriesRef]int),227directory: walDir,228}229newWal := func(_ prometheus.Registerer) (walStorage, error) { return &mockStorage, nil }230231logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))232inst, err := newInstance(cfg, nil, logger, newWal)233require.NoError(t, err)234runInstance(t, inst)235236// Wait until mockWalStorage has had a series added to it.237test.Poll(t, 30*time.Second, true, func() interface{} {238mockStorage.mut.Lock()239defer mockStorage.mut.Unlock()240return len(mockStorage.series) > 0241})242}243244// TestInstance_Recreate ensures that creating an instance with the same name twice245// does not cause any duplicate metrics registration that leads to a panic.246func TestInstance_Recreate(t *testing.T) {247scrapeAddr, closeSrv := getTestServer(t)248defer closeSrv()249250walDir := t.TempDir()251252globalConfig := getTestGlobalConfig(t)253254cfg := getTestConfig(t, &globalConfig, scrapeAddr)255cfg.Name = "recreate_test"256cfg.WALTruncateFrequency = time.Hour257cfg.RemoteFlushDeadline = time.Hour258259logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))260inst, err := New(prometheus.NewRegistry(), cfg, walDir, logger)261require.NoError(t, err)262263ctx, cancel := context.WithCancel(context.Background())264exited := make(chan bool)265go func() {266err := inst.Run(ctx)267close(exited)268269if err != nil {270require.Equal(t, context.Canceled, err)271}272}()273274time.Sleep(1 * time.Second)275cancel()276<-exited277278// Recreate the instance, no panic should happen.279require.NotPanics(t, func() {280inst, err := New(prometheus.NewRegistry(), cfg, walDir, logger)281require.NoError(t, err)282runInstance(t, inst)283284time.Sleep(1 * time.Second)285})286}287288func getTestServer(t *testing.T) (addr string, closeFunc func()) {289t.Helper()290291reg := prometheus.NewRegistry()292293testCounter := prometheus.NewCounter(prometheus.CounterOpts{294Name: "test_metric_total",295})296testCounter.Inc()297reg.MustRegister(testCounter)298299handler := promhttp.HandlerFor(reg, promhttp.HandlerOpts{})300httpSrv := httptest.NewServer(handler)301return httpSrv.Listener.Addr().String(), httpSrv.Close302}303304func getTestGlobalConfig(t *testing.T) GlobalConfig {305t.Helper()306307return GlobalConfig{308Prometheus: config.GlobalConfig{309ScrapeInterval: model.Duration(time.Millisecond * 50),310ScrapeTimeout: model.Duration(time.Millisecond * 25),311EvaluationInterval: model.Duration(time.Hour),312},313}314}315316func getTestConfig(t *testing.T, global *GlobalConfig, scrapeAddr string) Config {317t.Helper()318319scrapeCfg := config.DefaultScrapeConfig320scrapeCfg.JobName = "test"321scrapeCfg.ScrapeInterval = global.Prometheus.ScrapeInterval322scrapeCfg.ScrapeTimeout = global.Prometheus.ScrapeTimeout323scrapeCfg.ServiceDiscoveryConfigs = discovery.Configs{324discovery.StaticConfig{{325Targets: []model.LabelSet{{326model.AddressLabel: model.LabelValue(scrapeAddr),327}},328Labels: model.LabelSet{},329}},330}331332cfg := DefaultConfig333cfg.Name = "test"334cfg.ScrapeConfigs = []*config.ScrapeConfig{&scrapeCfg}335cfg.global = *global336337return cfg338}339340type mockWalStorage struct {341storage.Queryable342storage.ChunkQueryable343344directory string345mut sync.Mutex346series map[storage.SeriesRef]int347}348349func (s *mockWalStorage) Directory() string { return s.directory }350func (s *mockWalStorage) StartTime() (int64, error) { return 0, nil }351func (s *mockWalStorage) WriteStalenessMarkers(f func() int64) error { return nil }352func (s *mockWalStorage) Close() error { return nil }353func (s *mockWalStorage) Truncate(mint int64) error { return nil }354355func (s *mockWalStorage) Appender(context.Context) storage.Appender {356return &mockAppender{s: s}357}358359type mockAppender struct {360s *mockWalStorage361}362363func (a *mockAppender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {364if ref == 0 {365return a.Add(l, t, v)366}367return ref, a.AddFast(ref, t, v)368}369370// Add adds a new series and sets its written count to 1.371func (a *mockAppender) Add(l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {372a.s.mut.Lock()373defer a.s.mut.Unlock()374375hash := l.Hash()376a.s.series[storage.SeriesRef(hash)] = 1377return storage.SeriesRef(hash), nil378}379380// AddFast increments the number of writes to an existing series.381func (a *mockAppender) AddFast(ref storage.SeriesRef, t int64, v float64) error {382a.s.mut.Lock()383defer a.s.mut.Unlock()384_, ok := a.s.series[ref]385if !ok {386return storage.ErrNotFound387}388389a.s.series[ref]++390return nil391}392393func (a *mockAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {394return 0, nil395}396397func (a *mockAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {398return 0, nil399}400401func (a *mockAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {402return 0, nil403}404405func (a *mockAppender) Commit() error {406return nil407}408409func (a *mockAppender) Rollback() error {410return nil411}412413func runInstance(t *testing.T, i *Instance) {414ctx, cancel := context.WithCancel(context.Background())415t.Cleanup(func() { cancel() })416go require.NotPanics(t, func() {417_ = i.Run(ctx)418})419}420421422