Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/prometheus/receive_http/receive_http_test.go
4096 views
1
package receive_http
2
3
import (
4
"context"
5
"fmt"
6
"net/http"
7
"net/url"
8
"testing"
9
"time"
10
11
"github.com/golang/snappy"
12
"github.com/grafana/agent/component"
13
fnet "github.com/grafana/agent/component/common/net"
14
agentprom "github.com/grafana/agent/component/prometheus"
15
"github.com/grafana/agent/pkg/util"
16
"github.com/phayes/freeport"
17
"github.com/prometheus/client_golang/prometheus"
18
"github.com/prometheus/common/config"
19
"github.com/prometheus/common/model"
20
"github.com/prometheus/prometheus/model/labels"
21
"github.com/prometheus/prometheus/prompb"
22
"github.com/prometheus/prometheus/storage"
23
"github.com/prometheus/prometheus/storage/remote"
24
"github.com/stretchr/testify/require"
25
"google.golang.org/protobuf/proto"
26
"google.golang.org/protobuf/protoadapt"
27
)
28
29
func TestForwardsMetrics(t *testing.T) {
30
timestamp := time.Now().Add(time.Second).UnixMilli()
31
input := []prompb.TimeSeries{{
32
Labels: []prompb.Label{{Name: "cluster", Value: "local"}, {Name: "foo", Value: "bar"}},
33
Samples: []prompb.Sample{
34
{Timestamp: timestamp, Value: 12},
35
{Timestamp: timestamp + 1, Value: 24},
36
{Timestamp: timestamp + 2, Value: 48},
37
},
38
}, {
39
Labels: []prompb.Label{{Name: "cluster", Value: "local"}, {Name: "fizz", Value: "buzz"}},
40
Samples: []prompb.Sample{
41
{Timestamp: timestamp, Value: 191},
42
{Timestamp: timestamp + 1, Value: 1337},
43
},
44
}}
45
46
expected := []testSample{
47
{ts: timestamp, val: 12, l: labels.FromStrings("cluster", "local", "foo", "bar")},
48
{ts: timestamp + 1, val: 24, l: labels.FromStrings("cluster", "local", "foo", "bar")},
49
{ts: timestamp + 2, val: 48, l: labels.FromStrings("cluster", "local", "foo", "bar")},
50
{ts: timestamp, val: 191, l: labels.FromStrings("cluster", "local", "fizz", "buzz")},
51
{ts: timestamp + 1, val: 1337, l: labels.FromStrings("cluster", "local", "fizz", "buzz")},
52
}
53
54
actualSamples := make(chan testSample, 100)
55
56
// Start the component
57
port, err := freeport.GetFreePort()
58
require.NoError(t, err)
59
args := Arguments{
60
Server: &fnet.ServerConfig{
61
HTTP: &fnet.HTTPConfig{
62
ListenAddress: "localhost",
63
ListenPort: port,
64
},
65
GRPC: testGRPCConfig(t),
66
},
67
ForwardTo: testAppendable(actualSamples),
68
}
69
comp, err := New(testOptions(t), args)
70
require.NoError(t, err)
71
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
72
defer cancel()
73
go func() {
74
require.NoError(t, comp.Run(ctx))
75
}()
76
77
verifyExpectations(t, input, expected, actualSamples, args, ctx)
78
}
79
80
func TestUpdate(t *testing.T) {
81
timestamp := time.Now().Add(time.Second).UnixMilli()
82
input01 := []prompb.TimeSeries{{
83
Labels: []prompb.Label{{Name: "cluster", Value: "local"}, {Name: "foo", Value: "bar"}},
84
Samples: []prompb.Sample{
85
{Timestamp: timestamp, Value: 12},
86
},
87
}, {
88
Labels: []prompb.Label{{Name: "cluster", Value: "local"}, {Name: "fizz", Value: "buzz"}},
89
Samples: []prompb.Sample{
90
{Timestamp: timestamp, Value: 191},
91
},
92
}}
93
expected01 := []testSample{
94
{ts: timestamp, val: 12, l: labels.FromStrings("cluster", "local", "foo", "bar")},
95
{ts: timestamp, val: 191, l: labels.FromStrings("cluster", "local", "fizz", "buzz")},
96
}
97
98
input02 := []prompb.TimeSeries{{
99
Labels: []prompb.Label{{Name: "cluster", Value: "local"}, {Name: "foo", Value: "bar"}},
100
Samples: []prompb.Sample{
101
{Timestamp: timestamp + 1, Value: 24},
102
{Timestamp: timestamp + 2, Value: 48},
103
},
104
}, {
105
Labels: []prompb.Label{{Name: "cluster", Value: "local"}, {Name: "fizz", Value: "buzz"}},
106
Samples: []prompb.Sample{
107
{Timestamp: timestamp + 1, Value: 1337},
108
},
109
}}
110
expected02 := []testSample{
111
{ts: timestamp + 1, val: 24, l: labels.FromStrings("cluster", "local", "foo", "bar")},
112
{ts: timestamp + 2, val: 48, l: labels.FromStrings("cluster", "local", "foo", "bar")},
113
{ts: timestamp + 1, val: 1337, l: labels.FromStrings("cluster", "local", "fizz", "buzz")},
114
}
115
116
actualSamples := make(chan testSample, 100)
117
118
// Start the component
119
port, err := freeport.GetFreePort()
120
require.NoError(t, err)
121
args := Arguments{
122
Server: &fnet.ServerConfig{
123
HTTP: &fnet.HTTPConfig{
124
ListenAddress: "localhost",
125
ListenPort: port,
126
},
127
GRPC: testGRPCConfig(t),
128
},
129
ForwardTo: testAppendable(actualSamples),
130
}
131
comp, err := New(testOptions(t), args)
132
require.NoError(t, err)
133
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
134
defer cancel()
135
go func() {
136
require.NoError(t, comp.Run(ctx))
137
}()
138
139
verifyExpectations(t, input01, expected01, actualSamples, args, ctx)
140
141
otherPort, err := freeport.GetFreePort()
142
require.NoError(t, err)
143
args = Arguments{
144
Server: &fnet.ServerConfig{
145
HTTP: &fnet.HTTPConfig{
146
ListenAddress: "localhost",
147
ListenPort: otherPort,
148
},
149
GRPC: testGRPCConfig(t),
150
},
151
ForwardTo: testAppendable(actualSamples),
152
}
153
err = comp.Update(args)
154
require.NoError(t, err)
155
156
verifyExpectations(t, input02, expected02, actualSamples, args, ctx)
157
}
158
159
func testGRPCConfig(t *testing.T) *fnet.GRPCConfig {
160
return &fnet.GRPCConfig{ListenAddress: "127.0.0.1", ListenPort: getFreePort(t)}
161
}
162
163
func TestServerRestarts(t *testing.T) {
164
port, err := freeport.GetFreePort()
165
require.NoError(t, err)
166
167
otherPort, err := freeport.GetFreePort()
168
require.NoError(t, err)
169
170
testCases := []struct {
171
name string
172
initialArgs Arguments
173
newArgs Arguments
174
shouldRestart bool
175
}{
176
{
177
name: "identical args require no restart",
178
initialArgs: Arguments{
179
Server: &fnet.ServerConfig{
180
HTTP: &fnet.HTTPConfig{ListenAddress: "localhost", ListenPort: port},
181
},
182
ForwardTo: []storage.Appendable{},
183
},
184
newArgs: Arguments{
185
Server: &fnet.ServerConfig{
186
HTTP: &fnet.HTTPConfig{ListenAddress: "localhost", ListenPort: port},
187
},
188
ForwardTo: []storage.Appendable{},
189
},
190
shouldRestart: false,
191
},
192
{
193
name: "forward_to update does not require restart",
194
initialArgs: Arguments{
195
Server: &fnet.ServerConfig{
196
HTTP: &fnet.HTTPConfig{ListenAddress: "localhost", ListenPort: port},
197
},
198
ForwardTo: []storage.Appendable{},
199
},
200
newArgs: Arguments{
201
Server: &fnet.ServerConfig{
202
HTTP: &fnet.HTTPConfig{ListenAddress: "localhost", ListenPort: port},
203
},
204
ForwardTo: testAppendable(nil),
205
},
206
shouldRestart: false,
207
},
208
{
209
name: "hostname change requires restart",
210
initialArgs: Arguments{
211
Server: &fnet.ServerConfig{
212
HTTP: &fnet.HTTPConfig{ListenAddress: "localhost", ListenPort: port},
213
},
214
ForwardTo: []storage.Appendable{},
215
},
216
newArgs: Arguments{
217
Server: &fnet.ServerConfig{
218
HTTP: &fnet.HTTPConfig{ListenAddress: "127.0.0.1", ListenPort: port},
219
},
220
ForwardTo: testAppendable(nil),
221
},
222
shouldRestart: true,
223
},
224
{
225
name: "port change requires restart",
226
initialArgs: Arguments{
227
Server: &fnet.ServerConfig{
228
HTTP: &fnet.HTTPConfig{ListenAddress: "localhost", ListenPort: port},
229
},
230
ForwardTo: []storage.Appendable{},
231
},
232
newArgs: Arguments{
233
Server: &fnet.ServerConfig{
234
HTTP: &fnet.HTTPConfig{ListenAddress: "localhost", ListenPort: otherPort},
235
},
236
ForwardTo: testAppendable(nil),
237
},
238
shouldRestart: true,
239
},
240
}
241
242
for _, tc := range testCases {
243
t.Run(tc.name, func(t *testing.T) {
244
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
245
defer cancel()
246
247
c, err := New(testOptions(t), tc.initialArgs)
248
require.NoError(t, err)
249
250
serverExit := make(chan error)
251
go func() {
252
serverExit <- c.Run(ctx)
253
}()
254
255
comp := c.(*Component)
256
waitForServerToBeReady(t, comp.args)
257
258
initialServer := comp.server
259
require.NotNil(t, initialServer)
260
261
err = c.Update(tc.newArgs)
262
require.NoError(t, err)
263
264
waitForServerToBeReady(t, comp.args)
265
266
require.NotNil(t, comp.server)
267
restarted := initialServer != comp.server
268
269
require.Equal(t, tc.shouldRestart, restarted)
270
271
// shut down cleanly to release ports for other tests
272
cancel()
273
select {
274
case err := <-serverExit:
275
require.NoError(t, err, "unexpected error on server exit")
276
case <-time.After(5 * time.Second):
277
t.Fatalf("timed out waiting for server to shut down")
278
}
279
})
280
}
281
}
282
283
type testSample struct {
284
ts int64
285
val float64
286
l labels.Labels
287
}
288
289
func waitForServerToBeReady(t *testing.T, args Arguments) {
290
require.Eventuallyf(t, func() bool {
291
resp, err := http.Get(fmt.Sprintf(
292
"http://%v:%d/wrong/path",
293
args.Server.HTTP.ListenAddress,
294
args.Server.HTTP.ListenPort,
295
))
296
t.Logf("err: %v, resp: %v", err, resp)
297
return err == nil && resp.StatusCode == 404
298
}, 5*time.Second, 20*time.Millisecond, "server failed to start before timeout")
299
}
300
301
func verifyExpectations(
302
t *testing.T,
303
input []prompb.TimeSeries,
304
expected []testSample,
305
actualSamples chan testSample,
306
args Arguments,
307
ctx context.Context,
308
) {
309
// In case server didn't start yet
310
waitForServerToBeReady(t, args)
311
312
// Send the input time series to the component
313
endpoint := fmt.Sprintf(
314
"http://%s:%d/api/v1/metrics/write",
315
args.Server.HTTP.ListenAddress,
316
args.Server.HTTP.ListenPort,
317
)
318
err := request(ctx, endpoint, &prompb.WriteRequest{Timeseries: input})
319
require.NoError(t, err)
320
321
// Verify we receive expected metrics
322
for _, exp := range expected {
323
select {
324
case actual := <-actualSamples:
325
require.Equal(t, exp, actual)
326
case <-ctx.Done():
327
t.Fatalf("test timed out")
328
}
329
}
330
331
select {
332
case unexpected := <-actualSamples:
333
t.Fatalf("unexpected extra sample received: %v", unexpected)
334
default:
335
}
336
}
337
338
func testAppendable(actualSamples chan testSample) []storage.Appendable {
339
hookFn := func(
340
ref storage.SeriesRef,
341
l labels.Labels,
342
ts int64,
343
val float64,
344
next storage.Appender,
345
) (storage.SeriesRef, error) {
346
347
actualSamples <- testSample{ts: ts, val: val, l: l}
348
return ref, nil
349
}
350
351
return []storage.Appendable{agentprom.NewInterceptor(
352
nil,
353
agentprom.WithAppendHook(
354
hookFn))}
355
}
356
357
func request(ctx context.Context, rawRemoteWriteURL string, req *prompb.WriteRequest) error {
358
remoteWriteURL, err := url.Parse(rawRemoteWriteURL)
359
if err != nil {
360
return err
361
}
362
363
client, err := remote.NewWriteClient("remote-write-client", &remote.ClientConfig{
364
URL: &config.URL{URL: remoteWriteURL},
365
Timeout: model.Duration(30 * time.Second),
366
})
367
if err != nil {
368
return err
369
}
370
371
buf, err := proto.Marshal(protoadapt.MessageV2Of(req))
372
if err != nil {
373
return err
374
}
375
376
compressed := snappy.Encode(buf, buf)
377
return client.Store(ctx, compressed)
378
}
379
380
func testOptions(t *testing.T) component.Options {
381
return component.Options{
382
ID: "prometheus.receive_http.test",
383
Logger: util.TestFlowLogger(t),
384
Registerer: prometheus.NewRegistry(),
385
}
386
}
387
388
func getFreePort(t *testing.T) int {
389
p, err := freeport.GetFreePort()
390
require.NoError(t, err)
391
return p
392
}
393
394