Path: blob/main/component/prometheus/remotewrite/remote_write_test.go
4094 views
package remotewrite_test12import (3"context"4"fmt"5"net/http"6"net/http/httptest"7"testing"8"time"910"github.com/grafana/agent/component/prometheus/remotewrite"11"github.com/grafana/agent/pkg/flow/componenttest"12"github.com/grafana/agent/pkg/river"13"github.com/grafana/agent/pkg/util"14"github.com/prometheus/prometheus/model/labels"15"github.com/prometheus/prometheus/prompb"16"github.com/prometheus/prometheus/storage/remote"17"github.com/stretchr/testify/require"18)1920// Test is an integration-level test which ensures that metrics can get sent to21// a prometheus.remote_write component and forwarded to a22// remote_write-compatible server.23func Test(t *testing.T) {24writeResult := make(chan *prompb.WriteRequest)2526// Create a remote_write server which forwards any received payloads to the27// writeResult channel.28srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {29req, err := remote.DecodeWriteRequest(r.Body)30if err != nil {31http.Error(w, err.Error(), http.StatusBadRequest)32return33}3435select {36case writeResult <- req:37default:38require.Fail(t, "failed to send remote_write result over channel")39}40}))41defer srv.Close()4243// Configure our component to remote_write to the server we just created. We44// configure batch_send_deadline to 100ms so this test executes fairly45// quickly.46cfg := fmt.Sprintf(`47external_labels = {48cluster = "local",49}5051endpoint {52name = "test-url"53url = "%s/api/v1/write"54remote_timeout = "100ms"5556queue_config {57batch_send_deadline = "100ms"58}59}60`, srv.URL)6162var args remotewrite.Arguments63require.NoError(t, river.Unmarshal([]byte(cfg), &args))6465// Create our component and wait for it to start running so we can write66// metrics to the WAL.67tc, err := componenttest.NewControllerFromID(util.TestLogger(t), "prometheus.remote_write")68require.NoError(t, err)69go func() {70err = tc.Run(componenttest.TestContext(t), args)71require.NoError(t, err)72}()73require.NoError(t, tc.WaitRunning(time.Second))7475// We need to use a future timestamp since remote_write will ignore any76// sample which is earlier than the time when it started. Adding a minute77// ensures that our samples will never get ignored.78sampleTimestamp := time.Now().Add(time.Minute).UnixMilli()7980// Send metrics to our component. These will be written to the WAL and81// subsequently written to our HTTP server.82rwExports := tc.Exports().(remotewrite.Exports)83appender := rwExports.Receiver.Appender(context.Background())84_, err = appender.Append(0, labels.FromStrings("foo", "bar"), sampleTimestamp, 12)85require.NoError(t, err)86_, err = appender.Append(0, labels.FromStrings("fizz", "buzz"), sampleTimestamp, 34)87require.NoError(t, err)88err = appender.Commit()89require.NoError(t, err)9091expect := []prompb.TimeSeries{{92Labels: []prompb.Label{93{Name: "cluster", Value: "local"},94{Name: "foo", Value: "bar"},95},96Samples: []prompb.Sample{97{Timestamp: sampleTimestamp, Value: 12},98},99}, {100Labels: []prompb.Label{101{Name: "cluster", Value: "local"},102{Name: "fizz", Value: "buzz"},103},104Samples: []prompb.Sample{105{Timestamp: sampleTimestamp, Value: 34},106},107}}108109select {110case <-time.After(time.Minute):111require.FailNow(t, "timed out waiting for metrics")112case res := <-writeResult:113require.Equal(t, expect, res.Timeseries)114}115}116117118