Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/prometheus/remotewrite/remote_write_test.go
4094 views
1
package remotewrite_test
2
3
import (
4
"context"
5
"fmt"
6
"net/http"
7
"net/http/httptest"
8
"testing"
9
"time"
10
11
"github.com/grafana/agent/component/prometheus/remotewrite"
12
"github.com/grafana/agent/pkg/flow/componenttest"
13
"github.com/grafana/agent/pkg/river"
14
"github.com/grafana/agent/pkg/util"
15
"github.com/prometheus/prometheus/model/labels"
16
"github.com/prometheus/prometheus/prompb"
17
"github.com/prometheus/prometheus/storage/remote"
18
"github.com/stretchr/testify/require"
19
)
20
21
// Test is an integration-level test which ensures that metrics can get sent to
22
// a prometheus.remote_write component and forwarded to a
23
// remote_write-compatible server.
24
func Test(t *testing.T) {
25
writeResult := make(chan *prompb.WriteRequest)
26
27
// Create a remote_write server which forwards any received payloads to the
28
// writeResult channel.
29
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
30
req, err := remote.DecodeWriteRequest(r.Body)
31
if err != nil {
32
http.Error(w, err.Error(), http.StatusBadRequest)
33
return
34
}
35
36
select {
37
case writeResult <- req:
38
default:
39
require.Fail(t, "failed to send remote_write result over channel")
40
}
41
}))
42
defer srv.Close()
43
44
// Configure our component to remote_write to the server we just created. We
45
// configure batch_send_deadline to 100ms so this test executes fairly
46
// quickly.
47
cfg := fmt.Sprintf(`
48
external_labels = {
49
cluster = "local",
50
}
51
52
endpoint {
53
name = "test-url"
54
url = "%s/api/v1/write"
55
remote_timeout = "100ms"
56
57
queue_config {
58
batch_send_deadline = "100ms"
59
}
60
}
61
`, srv.URL)
62
63
var args remotewrite.Arguments
64
require.NoError(t, river.Unmarshal([]byte(cfg), &args))
65
66
// Create our component and wait for it to start running so we can write
67
// metrics to the WAL.
68
tc, err := componenttest.NewControllerFromID(util.TestLogger(t), "prometheus.remote_write")
69
require.NoError(t, err)
70
go func() {
71
err = tc.Run(componenttest.TestContext(t), args)
72
require.NoError(t, err)
73
}()
74
require.NoError(t, tc.WaitRunning(time.Second))
75
76
// We need to use a future timestamp since remote_write will ignore any
77
// sample which is earlier than the time when it started. Adding a minute
78
// ensures that our samples will never get ignored.
79
sampleTimestamp := time.Now().Add(time.Minute).UnixMilli()
80
81
// Send metrics to our component. These will be written to the WAL and
82
// subsequently written to our HTTP server.
83
rwExports := tc.Exports().(remotewrite.Exports)
84
appender := rwExports.Receiver.Appender(context.Background())
85
_, err = appender.Append(0, labels.FromStrings("foo", "bar"), sampleTimestamp, 12)
86
require.NoError(t, err)
87
_, err = appender.Append(0, labels.FromStrings("fizz", "buzz"), sampleTimestamp, 34)
88
require.NoError(t, err)
89
err = appender.Commit()
90
require.NoError(t, err)
91
92
expect := []prompb.TimeSeries{{
93
Labels: []prompb.Label{
94
{Name: "cluster", Value: "local"},
95
{Name: "foo", Value: "bar"},
96
},
97
Samples: []prompb.Sample{
98
{Timestamp: sampleTimestamp, Value: 12},
99
},
100
}, {
101
Labels: []prompb.Label{
102
{Name: "cluster", Value: "local"},
103
{Name: "fizz", Value: "buzz"},
104
},
105
Samples: []prompb.Sample{
106
{Timestamp: sampleTimestamp, Value: 34},
107
},
108
}}
109
110
select {
111
case <-time.After(time.Minute):
112
require.FailNow(t, "timed out waiting for metrics")
113
case res := <-writeResult:
114
require.Equal(t, expect, res.Timeseries)
115
}
116
}
117
118