Path: blob/main/pkg/metrics/instance/instance_integration_test.go
4094 views
package instance12import (3"context"4"fmt"5"net"6"net/http"7"os"8"strings"9"testing"10"time"1112"github.com/cortexproject/cortex/pkg/util/test"13"github.com/go-kit/log"14"github.com/gorilla/mux"15"github.com/prometheus/client_golang/prometheus"16"github.com/prometheus/client_golang/prometheus/promhttp"17"github.com/stretchr/testify/require"18"go.uber.org/atomic"19)2021// TestInstance_Update performs a full integration test by doing the following:22//23// 1. Launching an HTTP server which can be scraped and also mocks the remote_write24// endpoint.25// 2. Creating an instance config with no scrape_configs or remote_write configs.26// 3. Updates the instance with a scrape_config and remote_write.27// 4. Validates that after 15 seconds, the scrape endpoint and remote_write28// endpoint has been called.29func TestInstance_Update(t *testing.T) {30logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))3132walDir := t.TempDir()3334var (35scraped = atomic.NewBool(false)36pushed = atomic.NewBool(false)37)3839r := mux.NewRouter()40r.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {41scraped.Store(true)42promhttp.Handler().ServeHTTP(w, r)43})44r.HandleFunc("/push", func(w http.ResponseWriter, r *http.Request) {45pushed.Store(true)46// We don't particularly care what was pushed to us so we'll ignore47// everything here; we just want to make sure the endpoint was invoked.48})4950// Start a server for exposing the router.51l, err := net.Listen("tcp", "127.0.0.1:0")52require.NoError(t, err)53defer l.Close()54go func() {55_ = http.Serve(l, r)56}()5758// Create a new instance where it's not scraping or writing anything by default.59initialConfig := loadConfig(t, `60name: integration_test61scrape_configs: []62remote_write: []63`)64inst, err := New(prometheus.NewRegistry(), initialConfig, walDir, logger)65require.NoError(t, err)6667instCtx, cancel := context.WithCancel(context.Background())68defer cancel()69go func() {70err := inst.Run(instCtx)71require.NoError(t, err)72}()7374// Update the config with a single scrape_config and remote_write.75newConfig := loadConfig(t, fmt.Sprintf(`76name: integration_test77scrape_configs:78- job_name: test_scrape79scrape_interval: 5s80static_configs:81- targets: ['%[1]s']82remote_write:83- url: http://%[1]s/push84`, l.Addr()))8586// Wait minute for the instance to update (it might not be ready yet and87// would return an error until everything is initialized), and then wait88// again for the configs to apply and set the scraped and pushed atomic89// variables, indicating that the Prometheus components successfully updated.90test.Poll(t, time.Second*15, nil, func() interface{} {91err := inst.Update(newConfig)92if err != nil {93logger.Log("msg", "failed to update instance", "err", err)94}95return err96})9798test.Poll(t, time.Second*15, true, func() interface{} {99return scraped.Load() && pushed.Load()100})101}102103func TestInstance_Update_Failed(t *testing.T) {104logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))105106walDir := t.TempDir()107108r := mux.NewRouter()109r.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {110promhttp.Handler().ServeHTTP(w, r)111})112r.HandleFunc("/push", func(w http.ResponseWriter, r *http.Request) {})113114// Start a server for exposing the router.115l, err := net.Listen("tcp", "127.0.0.1:0")116require.NoError(t, err)117defer l.Close()118go func() {119_ = http.Serve(l, r)120}()121122// Create a new instance where it's not scraping or writing anything by default.123initialConfig := loadConfig(t, `124name: integration_test125scrape_configs: []126remote_write: []127`)128inst, err := New(prometheus.NewRegistry(), initialConfig, walDir, logger)129require.NoError(t, err)130131instCtx, cancel := context.WithCancel(context.Background())132defer cancel()133go func() {134err := inst.Run(instCtx)135require.NoError(t, err)136}()137138// Create a new config to use for updating139newConfig := loadConfig(t, fmt.Sprintf(`140name: integration_test141scrape_configs:142- job_name: test_scrape143scrape_interval: 5s144static_configs:145- targets: ['%[1]s']146remote_write:147- url: http://%[1]s/push148`, l.Addr()))149150// Make sure the instance can successfully update first151test.Poll(t, time.Second*15, nil, func() interface{} {152err := inst.Update(newConfig)153if err != nil {154logger.Log("msg", "failed to update instance", "err", err)155}156return err157})158159// Now force an update back to the original config to fail160inst.readyScrapeManager.Set(nil)161require.NotNil(t, inst.Update(initialConfig), "update should have failed")162require.Equal(t, newConfig, inst.cfg, "config did not roll back")163}164165// TestInstance_Update_InvalidChanges runs an instance with a blank initial166// config and performs various unacceptable updates that should return an167// error.168func TestInstance_Update_InvalidChanges(t *testing.T) {169logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))170171walDir := t.TempDir()172173// Create a new instance where it's not scraping or writing anything by default.174initialConfig := loadConfig(t, `175name: integration_test176scrape_configs: []177remote_write: []178`)179inst, err := New(prometheus.NewRegistry(), initialConfig, walDir, logger)180require.NoError(t, err)181182instCtx, cancel := context.WithCancel(context.Background())183defer cancel()184go func() {185err := inst.Run(instCtx)186require.NoError(t, err)187}()188189// Do a no-op update that succeeds to ensure that the instance is running.190test.Poll(t, time.Second*15, nil, func() interface{} {191err := inst.Update(initialConfig)192if err != nil {193logger.Log("msg", "failed to update instance", "err", err)194}195return err196})197198tt := []struct {199name string200mut func(c *Config)201expect string202}{203{204name: "name changed",205mut: func(c *Config) { c.Name = "changed name" },206expect: "name cannot be changed dynamically",207},208{209name: "host_filter changed",210mut: func(c *Config) { c.HostFilter = true },211expect: "host_filter cannot be changed dynamically",212},213{214name: "wal_truncate_frequency changed",215mut: func(c *Config) { c.WALTruncateFrequency *= 2 },216expect: "wal_truncate_frequency cannot be changed dynamically",217},218{219name: "remote_flush_deadline changed",220mut: func(c *Config) { c.RemoteFlushDeadline *= 2 },221expect: "remote_flush_deadline cannot be changed dynamically",222},223{224name: "write_stale_on_shutdown changed",225mut: func(c *Config) { c.WriteStaleOnShutdown = true },226expect: "write_stale_on_shutdown cannot be changed dynamically",227},228}229230for _, tc := range tt {231t.Run(tc.name, func(t *testing.T) {232mutatedConfig := initialConfig233tc.mut(&mutatedConfig)234235err := inst.Update(mutatedConfig)236require.EqualError(t, err, tc.expect)237})238}239}240241func loadConfig(t *testing.T, s string) Config {242cfg, err := UnmarshalConfig(strings.NewReader(s))243require.NoError(t, err)244require.NoError(t, cfg.ApplyDefaults(DefaultGlobalConfig))245return *cfg246}247248249