Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/metrics/instance/instance_integration_test.go
4094 views
1
package instance
2
3
import (
4
"context"
5
"fmt"
6
"net"
7
"net/http"
8
"os"
9
"strings"
10
"testing"
11
"time"
12
13
"github.com/cortexproject/cortex/pkg/util/test"
14
"github.com/go-kit/log"
15
"github.com/gorilla/mux"
16
"github.com/prometheus/client_golang/prometheus"
17
"github.com/prometheus/client_golang/prometheus/promhttp"
18
"github.com/stretchr/testify/require"
19
"go.uber.org/atomic"
20
)
21
22
// TestInstance_Update performs a full integration test by doing the following:
23
//
24
// 1. Launching an HTTP server which can be scraped and also mocks the remote_write
25
// endpoint.
26
// 2. Creating an instance config with no scrape_configs or remote_write configs.
27
// 3. Updates the instance with a scrape_config and remote_write.
28
// 4. Validates that after 15 seconds, the scrape endpoint and remote_write
29
// endpoint has been called.
30
func TestInstance_Update(t *testing.T) {
31
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
32
33
walDir := t.TempDir()
34
35
var (
36
scraped = atomic.NewBool(false)
37
pushed = atomic.NewBool(false)
38
)
39
40
r := mux.NewRouter()
41
r.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
42
scraped.Store(true)
43
promhttp.Handler().ServeHTTP(w, r)
44
})
45
r.HandleFunc("/push", func(w http.ResponseWriter, r *http.Request) {
46
pushed.Store(true)
47
// We don't particularly care what was pushed to us so we'll ignore
48
// everything here; we just want to make sure the endpoint was invoked.
49
})
50
51
// Start a server for exposing the router.
52
l, err := net.Listen("tcp", "127.0.0.1:0")
53
require.NoError(t, err)
54
defer l.Close()
55
go func() {
56
_ = http.Serve(l, r)
57
}()
58
59
// Create a new instance where it's not scraping or writing anything by default.
60
initialConfig := loadConfig(t, `
61
name: integration_test
62
scrape_configs: []
63
remote_write: []
64
`)
65
inst, err := New(prometheus.NewRegistry(), initialConfig, walDir, logger)
66
require.NoError(t, err)
67
68
instCtx, cancel := context.WithCancel(context.Background())
69
defer cancel()
70
go func() {
71
err := inst.Run(instCtx)
72
require.NoError(t, err)
73
}()
74
75
// Update the config with a single scrape_config and remote_write.
76
newConfig := loadConfig(t, fmt.Sprintf(`
77
name: integration_test
78
scrape_configs:
79
- job_name: test_scrape
80
scrape_interval: 5s
81
static_configs:
82
- targets: ['%[1]s']
83
remote_write:
84
- url: http://%[1]s/push
85
`, l.Addr()))
86
87
// Wait minute for the instance to update (it might not be ready yet and
88
// would return an error until everything is initialized), and then wait
89
// again for the configs to apply and set the scraped and pushed atomic
90
// variables, indicating that the Prometheus components successfully updated.
91
test.Poll(t, time.Second*15, nil, func() interface{} {
92
err := inst.Update(newConfig)
93
if err != nil {
94
logger.Log("msg", "failed to update instance", "err", err)
95
}
96
return err
97
})
98
99
test.Poll(t, time.Second*15, true, func() interface{} {
100
return scraped.Load() && pushed.Load()
101
})
102
}
103
104
func TestInstance_Update_Failed(t *testing.T) {
105
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
106
107
walDir := t.TempDir()
108
109
r := mux.NewRouter()
110
r.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
111
promhttp.Handler().ServeHTTP(w, r)
112
})
113
r.HandleFunc("/push", func(w http.ResponseWriter, r *http.Request) {})
114
115
// Start a server for exposing the router.
116
l, err := net.Listen("tcp", "127.0.0.1:0")
117
require.NoError(t, err)
118
defer l.Close()
119
go func() {
120
_ = http.Serve(l, r)
121
}()
122
123
// Create a new instance where it's not scraping or writing anything by default.
124
initialConfig := loadConfig(t, `
125
name: integration_test
126
scrape_configs: []
127
remote_write: []
128
`)
129
inst, err := New(prometheus.NewRegistry(), initialConfig, walDir, logger)
130
require.NoError(t, err)
131
132
instCtx, cancel := context.WithCancel(context.Background())
133
defer cancel()
134
go func() {
135
err := inst.Run(instCtx)
136
require.NoError(t, err)
137
}()
138
139
// Create a new config to use for updating
140
newConfig := loadConfig(t, fmt.Sprintf(`
141
name: integration_test
142
scrape_configs:
143
- job_name: test_scrape
144
scrape_interval: 5s
145
static_configs:
146
- targets: ['%[1]s']
147
remote_write:
148
- url: http://%[1]s/push
149
`, l.Addr()))
150
151
// Make sure the instance can successfully update first
152
test.Poll(t, time.Second*15, nil, func() interface{} {
153
err := inst.Update(newConfig)
154
if err != nil {
155
logger.Log("msg", "failed to update instance", "err", err)
156
}
157
return err
158
})
159
160
// Now force an update back to the original config to fail
161
inst.readyScrapeManager.Set(nil)
162
require.NotNil(t, inst.Update(initialConfig), "update should have failed")
163
require.Equal(t, newConfig, inst.cfg, "config did not roll back")
164
}
165
166
// TestInstance_Update_InvalidChanges runs an instance with a blank initial
167
// config and performs various unacceptable updates that should return an
168
// error.
169
func TestInstance_Update_InvalidChanges(t *testing.T) {
170
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
171
172
walDir := t.TempDir()
173
174
// Create a new instance where it's not scraping or writing anything by default.
175
initialConfig := loadConfig(t, `
176
name: integration_test
177
scrape_configs: []
178
remote_write: []
179
`)
180
inst, err := New(prometheus.NewRegistry(), initialConfig, walDir, logger)
181
require.NoError(t, err)
182
183
instCtx, cancel := context.WithCancel(context.Background())
184
defer cancel()
185
go func() {
186
err := inst.Run(instCtx)
187
require.NoError(t, err)
188
}()
189
190
// Do a no-op update that succeeds to ensure that the instance is running.
191
test.Poll(t, time.Second*15, nil, func() interface{} {
192
err := inst.Update(initialConfig)
193
if err != nil {
194
logger.Log("msg", "failed to update instance", "err", err)
195
}
196
return err
197
})
198
199
tt := []struct {
200
name string
201
mut func(c *Config)
202
expect string
203
}{
204
{
205
name: "name changed",
206
mut: func(c *Config) { c.Name = "changed name" },
207
expect: "name cannot be changed dynamically",
208
},
209
{
210
name: "host_filter changed",
211
mut: func(c *Config) { c.HostFilter = true },
212
expect: "host_filter cannot be changed dynamically",
213
},
214
{
215
name: "wal_truncate_frequency changed",
216
mut: func(c *Config) { c.WALTruncateFrequency *= 2 },
217
expect: "wal_truncate_frequency cannot be changed dynamically",
218
},
219
{
220
name: "remote_flush_deadline changed",
221
mut: func(c *Config) { c.RemoteFlushDeadline *= 2 },
222
expect: "remote_flush_deadline cannot be changed dynamically",
223
},
224
{
225
name: "write_stale_on_shutdown changed",
226
mut: func(c *Config) { c.WriteStaleOnShutdown = true },
227
expect: "write_stale_on_shutdown cannot be changed dynamically",
228
},
229
}
230
231
for _, tc := range tt {
232
t.Run(tc.name, func(t *testing.T) {
233
mutatedConfig := initialConfig
234
tc.mut(&mutatedConfig)
235
236
err := inst.Update(mutatedConfig)
237
require.EqualError(t, err, tc.expect)
238
})
239
}
240
}
241
242
func loadConfig(t *testing.T, s string) Config {
243
cfg, err := UnmarshalConfig(strings.NewReader(s))
244
require.NoError(t, err)
245
require.NoError(t, cfg.ApplyDefaults(DefaultGlobalConfig))
246
return *cfg
247
}
248
249