Path: blob/main/component/prometheus/receive_http/receive_http_test.go
4096 views
package receive_http12import (3"context"4"fmt"5"net/http"6"net/url"7"testing"8"time"910"github.com/golang/snappy"11"github.com/grafana/agent/component"12fnet "github.com/grafana/agent/component/common/net"13agentprom "github.com/grafana/agent/component/prometheus"14"github.com/grafana/agent/pkg/util"15"github.com/phayes/freeport"16"github.com/prometheus/client_golang/prometheus"17"github.com/prometheus/common/config"18"github.com/prometheus/common/model"19"github.com/prometheus/prometheus/model/labels"20"github.com/prometheus/prometheus/prompb"21"github.com/prometheus/prometheus/storage"22"github.com/prometheus/prometheus/storage/remote"23"github.com/stretchr/testify/require"24"google.golang.org/protobuf/proto"25"google.golang.org/protobuf/protoadapt"26)2728func TestForwardsMetrics(t *testing.T) {29timestamp := time.Now().Add(time.Second).UnixMilli()30input := []prompb.TimeSeries{{31Labels: []prompb.Label{{Name: "cluster", Value: "local"}, {Name: "foo", Value: "bar"}},32Samples: []prompb.Sample{33{Timestamp: timestamp, Value: 12},34{Timestamp: timestamp + 1, Value: 24},35{Timestamp: timestamp + 2, Value: 48},36},37}, {38Labels: []prompb.Label{{Name: "cluster", Value: "local"}, {Name: "fizz", Value: "buzz"}},39Samples: []prompb.Sample{40{Timestamp: timestamp, Value: 191},41{Timestamp: timestamp + 1, Value: 1337},42},43}}4445expected := []testSample{46{ts: timestamp, val: 12, l: labels.FromStrings("cluster", "local", "foo", "bar")},47{ts: timestamp + 1, val: 24, l: labels.FromStrings("cluster", "local", "foo", "bar")},48{ts: timestamp + 2, val: 48, l: labels.FromStrings("cluster", "local", "foo", "bar")},49{ts: timestamp, val: 191, l: labels.FromStrings("cluster", "local", "fizz", "buzz")},50{ts: timestamp + 1, val: 1337, l: labels.FromStrings("cluster", "local", "fizz", "buzz")},51}5253actualSamples := make(chan testSample, 100)5455// Start the component56port, err := freeport.GetFreePort()57require.NoError(t, err)58args := Arguments{59Server: &fnet.ServerConfig{60HTTP: &fnet.HTTPConfig{61ListenAddress: "localhost",62ListenPort: port,63},64GRPC: testGRPCConfig(t),65},66ForwardTo: testAppendable(actualSamples),67}68comp, err := New(testOptions(t), args)69require.NoError(t, err)70ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)71defer cancel()72go func() {73require.NoError(t, comp.Run(ctx))74}()7576verifyExpectations(t, input, expected, actualSamples, args, ctx)77}7879func TestUpdate(t *testing.T) {80timestamp := time.Now().Add(time.Second).UnixMilli()81input01 := []prompb.TimeSeries{{82Labels: []prompb.Label{{Name: "cluster", Value: "local"}, {Name: "foo", Value: "bar"}},83Samples: []prompb.Sample{84{Timestamp: timestamp, Value: 12},85},86}, {87Labels: []prompb.Label{{Name: "cluster", Value: "local"}, {Name: "fizz", Value: "buzz"}},88Samples: []prompb.Sample{89{Timestamp: timestamp, Value: 191},90},91}}92expected01 := []testSample{93{ts: timestamp, val: 12, l: labels.FromStrings("cluster", "local", "foo", "bar")},94{ts: timestamp, val: 191, l: labels.FromStrings("cluster", "local", "fizz", "buzz")},95}9697input02 := []prompb.TimeSeries{{98Labels: []prompb.Label{{Name: "cluster", Value: "local"}, {Name: "foo", Value: "bar"}},99Samples: []prompb.Sample{100{Timestamp: timestamp + 1, Value: 24},101{Timestamp: timestamp + 2, Value: 48},102},103}, {104Labels: []prompb.Label{{Name: "cluster", Value: "local"}, {Name: "fizz", Value: "buzz"}},105Samples: []prompb.Sample{106{Timestamp: timestamp + 1, Value: 1337},107},108}}109expected02 := []testSample{110{ts: timestamp + 1, val: 24, l: labels.FromStrings("cluster", "local", "foo", "bar")},111{ts: timestamp + 2, val: 48, l: labels.FromStrings("cluster", "local", "foo", "bar")},112{ts: timestamp + 1, val: 1337, l: labels.FromStrings("cluster", "local", "fizz", "buzz")},113}114115actualSamples := make(chan testSample, 100)116117// Start the component118port, err := freeport.GetFreePort()119require.NoError(t, err)120args := Arguments{121Server: &fnet.ServerConfig{122HTTP: &fnet.HTTPConfig{123ListenAddress: "localhost",124ListenPort: port,125},126GRPC: testGRPCConfig(t),127},128ForwardTo: testAppendable(actualSamples),129}130comp, err := New(testOptions(t), args)131require.NoError(t, err)132ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)133defer cancel()134go func() {135require.NoError(t, comp.Run(ctx))136}()137138verifyExpectations(t, input01, expected01, actualSamples, args, ctx)139140otherPort, err := freeport.GetFreePort()141require.NoError(t, err)142args = Arguments{143Server: &fnet.ServerConfig{144HTTP: &fnet.HTTPConfig{145ListenAddress: "localhost",146ListenPort: otherPort,147},148GRPC: testGRPCConfig(t),149},150ForwardTo: testAppendable(actualSamples),151}152err = comp.Update(args)153require.NoError(t, err)154155verifyExpectations(t, input02, expected02, actualSamples, args, ctx)156}157158func testGRPCConfig(t *testing.T) *fnet.GRPCConfig {159return &fnet.GRPCConfig{ListenAddress: "127.0.0.1", ListenPort: getFreePort(t)}160}161162func TestServerRestarts(t *testing.T) {163port, err := freeport.GetFreePort()164require.NoError(t, err)165166otherPort, err := freeport.GetFreePort()167require.NoError(t, err)168169testCases := []struct {170name string171initialArgs Arguments172newArgs Arguments173shouldRestart bool174}{175{176name: "identical args require no restart",177initialArgs: Arguments{178Server: &fnet.ServerConfig{179HTTP: &fnet.HTTPConfig{ListenAddress: "localhost", ListenPort: port},180},181ForwardTo: []storage.Appendable{},182},183newArgs: Arguments{184Server: &fnet.ServerConfig{185HTTP: &fnet.HTTPConfig{ListenAddress: "localhost", ListenPort: port},186},187ForwardTo: []storage.Appendable{},188},189shouldRestart: false,190},191{192name: "forward_to update does not require restart",193initialArgs: Arguments{194Server: &fnet.ServerConfig{195HTTP: &fnet.HTTPConfig{ListenAddress: "localhost", ListenPort: port},196},197ForwardTo: []storage.Appendable{},198},199newArgs: Arguments{200Server: &fnet.ServerConfig{201HTTP: &fnet.HTTPConfig{ListenAddress: "localhost", ListenPort: port},202},203ForwardTo: testAppendable(nil),204},205shouldRestart: false,206},207{208name: "hostname change requires restart",209initialArgs: Arguments{210Server: &fnet.ServerConfig{211HTTP: &fnet.HTTPConfig{ListenAddress: "localhost", ListenPort: port},212},213ForwardTo: []storage.Appendable{},214},215newArgs: Arguments{216Server: &fnet.ServerConfig{217HTTP: &fnet.HTTPConfig{ListenAddress: "127.0.0.1", ListenPort: port},218},219ForwardTo: testAppendable(nil),220},221shouldRestart: true,222},223{224name: "port change requires restart",225initialArgs: Arguments{226Server: &fnet.ServerConfig{227HTTP: &fnet.HTTPConfig{ListenAddress: "localhost", ListenPort: port},228},229ForwardTo: []storage.Appendable{},230},231newArgs: Arguments{232Server: &fnet.ServerConfig{233HTTP: &fnet.HTTPConfig{ListenAddress: "localhost", ListenPort: otherPort},234},235ForwardTo: testAppendable(nil),236},237shouldRestart: true,238},239}240241for _, tc := range testCases {242t.Run(tc.name, func(t *testing.T) {243ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)244defer cancel()245246c, err := New(testOptions(t), tc.initialArgs)247require.NoError(t, err)248249serverExit := make(chan error)250go func() {251serverExit <- c.Run(ctx)252}()253254comp := c.(*Component)255waitForServerToBeReady(t, comp.args)256257initialServer := comp.server258require.NotNil(t, initialServer)259260err = c.Update(tc.newArgs)261require.NoError(t, err)262263waitForServerToBeReady(t, comp.args)264265require.NotNil(t, comp.server)266restarted := initialServer != comp.server267268require.Equal(t, tc.shouldRestart, restarted)269270// shut down cleanly to release ports for other tests271cancel()272select {273case err := <-serverExit:274require.NoError(t, err, "unexpected error on server exit")275case <-time.After(5 * time.Second):276t.Fatalf("timed out waiting for server to shut down")277}278})279}280}281282type testSample struct {283ts int64284val float64285l labels.Labels286}287288func waitForServerToBeReady(t *testing.T, args Arguments) {289require.Eventuallyf(t, func() bool {290resp, err := http.Get(fmt.Sprintf(291"http://%v:%d/wrong/path",292args.Server.HTTP.ListenAddress,293args.Server.HTTP.ListenPort,294))295t.Logf("err: %v, resp: %v", err, resp)296return err == nil && resp.StatusCode == 404297}, 5*time.Second, 20*time.Millisecond, "server failed to start before timeout")298}299300func verifyExpectations(301t *testing.T,302input []prompb.TimeSeries,303expected []testSample,304actualSamples chan testSample,305args Arguments,306ctx context.Context,307) {308// In case server didn't start yet309waitForServerToBeReady(t, args)310311// Send the input time series to the component312endpoint := fmt.Sprintf(313"http://%s:%d/api/v1/metrics/write",314args.Server.HTTP.ListenAddress,315args.Server.HTTP.ListenPort,316)317err := request(ctx, endpoint, &prompb.WriteRequest{Timeseries: input})318require.NoError(t, err)319320// Verify we receive expected metrics321for _, exp := range expected {322select {323case actual := <-actualSamples:324require.Equal(t, exp, actual)325case <-ctx.Done():326t.Fatalf("test timed out")327}328}329330select {331case unexpected := <-actualSamples:332t.Fatalf("unexpected extra sample received: %v", unexpected)333default:334}335}336337func testAppendable(actualSamples chan testSample) []storage.Appendable {338hookFn := func(339ref storage.SeriesRef,340l labels.Labels,341ts int64,342val float64,343next storage.Appender,344) (storage.SeriesRef, error) {345346actualSamples <- testSample{ts: ts, val: val, l: l}347return ref, nil348}349350return []storage.Appendable{agentprom.NewInterceptor(351nil,352agentprom.WithAppendHook(353hookFn))}354}355356func request(ctx context.Context, rawRemoteWriteURL string, req *prompb.WriteRequest) error {357remoteWriteURL, err := url.Parse(rawRemoteWriteURL)358if err != nil {359return err360}361362client, err := remote.NewWriteClient("remote-write-client", &remote.ClientConfig{363URL: &config.URL{URL: remoteWriteURL},364Timeout: model.Duration(30 * time.Second),365})366if err != nil {367return err368}369370buf, err := proto.Marshal(protoadapt.MessageV2Of(req))371if err != nil {372return err373}374375compressed := snappy.Encode(buf, buf)376return client.Store(ctx, compressed)377}378379func testOptions(t *testing.T) component.Options {380return component.Options{381ID: "prometheus.receive_http.test",382Logger: util.TestFlowLogger(t),383Registerer: prometheus.NewRegistry(),384}385}386387func getFreePort(t *testing.T) int {388p, err := freeport.GetFreePort()389require.NoError(t, err)390return p391}392393394