Path: blob/main/component/loki/source/heroku/heroku_test.go
4096 views
package heroku12import (3"context"4"fmt"5"net/http"6"strings"7"testing"8"time"910"github.com/grafana/agent/component"11"github.com/grafana/agent/component/common/loki"12fnet "github.com/grafana/agent/component/common/net"13flow_relabel "github.com/grafana/agent/component/common/relabel"14"github.com/grafana/agent/component/loki/source/heroku/internal/herokutarget"15"github.com/grafana/agent/pkg/util"16"github.com/grafana/regexp"17"github.com/phayes/freeport"18"github.com/prometheus/client_golang/prometheus"19"github.com/prometheus/common/model"20"github.com/stretchr/testify/require"21)2223func TestPush(t *testing.T) {24opts := defaultOptions(t)2526ch1, ch2 := make(chan loki.Entry), make(chan loki.Entry)27args := testArgsWith(t, func(args *Arguments) {28args.ForwardTo = []loki.LogsReceiver{ch1, ch2}29args.RelabelRules = rulesExport30args.Labels = map[string]string{"foo": "bar"}31})32// Create and run the component.33c, err := New(opts, args)34require.NoError(t, err)3536go func() { require.NoError(t, c.Run(context.Background())) }()37waitForServerToBeReady(t, c)3839// Create a Heroku Drain Request and send it to the launched server.40req, err := http.NewRequest(http.MethodPost, getEndpoint(c.target), strings.NewReader(testPayload))41require.NoError(t, err)4243res, err := http.DefaultClient.Do(req)44require.NoError(t, err)45require.Equal(t, http.StatusNoContent, res.StatusCode)4647// Check the received log entries48wantLabelSet := model.LabelSet{"foo": "bar", "host": "host", "app": "heroku", "proc": "router", "log_id": "-"}49wantLogLine := "at=info method=GET path=\"/\" host=cryptic-cliffs-27764.herokuapp.com request_id=59da6323-2bc4-4143-8677-cc66ccfb115f fwd=\"181.167.87.140\" dyno=web.1 connect=0ms service=3ms status=200 bytes=6979 protocol=https\n"5051for i := 0; i < 2; i++ {52select {53case logEntry := <-ch1:54require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)55require.Equal(t, wantLogLine, logEntry.Line)56require.Equal(t, wantLabelSet, logEntry.Labels)57case logEntry := <-ch2:58require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)59require.Equal(t, wantLogLine, logEntry.Line)60require.Equal(t, wantLabelSet, logEntry.Labels)61case <-time.After(5 * time.Second):62require.FailNow(t, "failed waiting for log line")63}64}65}6667func TestUpdate_detectsWhenTargetRequiresARestart(t *testing.T) {68httpPort := getFreePort(t)69grpcPort := getFreePort(t)70tests := []struct {71name string72args Arguments73newArgs Arguments74restartRequired bool75}{76{77name: "identical args don't require server restart",78args: testArgsWithPorts(httpPort, grpcPort),79newArgs: testArgsWithPorts(httpPort, grpcPort),80restartRequired: false,81},82{83name: "change in address requires server restart",84args: testArgsWithPorts(httpPort, grpcPort),85newArgs: testArgsWith(t, func(args *Arguments) {86args.Server.HTTP.ListenAddress = "127.0.0.1"87args.Server.HTTP.ListenPort = httpPort88args.Server.GRPC.ListenPort = grpcPort89}),90restartRequired: true,91},92{93name: "change in port requires server restart",94args: testArgsWithPorts(httpPort, grpcPort),95newArgs: testArgsWithPorts(getFreePort(t), grpcPort),96restartRequired: true,97},98{99name: "change in forwardTo does not require server restart",100args: testArgsWithPorts(httpPort, grpcPort),101newArgs: testArgsWith(t, func(args *Arguments) {102args.ForwardTo = []loki.LogsReceiver{}103args.Server.HTTP.ListenPort = httpPort104args.Server.GRPC.ListenPort = grpcPort105}),106restartRequired: false,107},108{109name: "change in labels requires server restart",110args: testArgsWithPorts(httpPort, grpcPort),111newArgs: testArgsWith(t, func(args *Arguments) {112args.Labels = map[string]string{"some": "label"}113args.Server.HTTP.ListenPort = httpPort114args.Server.GRPC.ListenPort = grpcPort115}),116restartRequired: true,117},118{119name: "change in relabel rules requires server restart",120args: testArgsWithPorts(httpPort, grpcPort),121newArgs: testArgsWith(t, func(args *Arguments) {122args.RelabelRules = flow_relabel.Rules{}123args.Server.HTTP.ListenPort = httpPort124args.Server.GRPC.ListenPort = grpcPort125}),126restartRequired: true,127},128{129name: "change in use incoming timestamp requires server restart",130args: testArgsWithPorts(httpPort, grpcPort),131newArgs: testArgsWith(t, func(args *Arguments) {132args.UseIncomingTimestamp = !args.UseIncomingTimestamp133args.Server.HTTP.ListenPort = httpPort134args.Server.GRPC.ListenPort = grpcPort135}),136restartRequired: true,137},138}139for _, tc := range tests {140t.Run(tc.name, func(t *testing.T) {141comp, err := New(142defaultOptions(t),143tc.args,144)145require.NoError(t, err)146defer func() {147// in order to cleanly shutdown, we want to make sure the server is running first.148waitForServerToBeReady(t, comp)149require.NoError(t, comp.target.Stop())150}()151152// in order to cleanly update, we want to make sure the server is running first.153waitForServerToBeReady(t, comp)154155targetBefore := comp.target156err = comp.Update(tc.newArgs)157require.NoError(t, err)158159restarted := targetBefore != comp.target160require.Equal(t, restarted, tc.restartRequired)161})162}163}164165const testPayload = `270 <158>1 2022-06-13T14:52:23.622778+00:00 host heroku router - at=info method=GET path="/" host=cryptic-cliffs-27764.herokuapp.com request_id=59da6323-2bc4-4143-8677-cc66ccfb115f fwd="181.167.87.140" dyno=web.1 connect=0ms service=3ms status=200 bytes=6979 protocol=https166`167168var rulesExport = flow_relabel.Rules{169{170SourceLabels: []string{"__heroku_drain_host"},171Regex: newRegexp(),172Action: flow_relabel.Replace,173Replacement: "$1",174TargetLabel: "host",175},176{177SourceLabels: []string{"__heroku_drain_app"},178Regex: newRegexp(),179Action: flow_relabel.Replace,180Replacement: "$1",181TargetLabel: "app",182},183{184SourceLabels: []string{"__heroku_drain_proc"},185Regex: newRegexp(),186Action: flow_relabel.Replace,187Replacement: "$1",188TargetLabel: "proc",189},190{191SourceLabels: []string{"__heroku_drain_log_id"},192Regex: newRegexp(),193Action: flow_relabel.Replace,194Replacement: "$1",195TargetLabel: "log_id",196},197}198199func defaultOptions(t *testing.T) component.Options {200return component.Options{201Logger: util.TestFlowLogger(t),202Registerer: prometheus.NewRegistry(),203OnStateChange: func(e component.Exports) {},204}205}206207func testArgsWithPorts(httpPort int, grpcPort int) Arguments {208return Arguments{209Server: &fnet.ServerConfig{210HTTP: &fnet.HTTPConfig{211ListenAddress: "localhost",212ListenPort: httpPort,213},214GRPC: &fnet.GRPCConfig{215ListenAddress: "localhost",216ListenPort: grpcPort,217},218},219ForwardTo: []loki.LogsReceiver{make(chan loki.Entry), make(chan loki.Entry)},220Labels: map[string]string{"foo": "bar", "fizz": "buzz"},221RelabelRules: flow_relabel.Rules{222{223SourceLabels: []string{"tag"},224Regex: flow_relabel.Regexp{Regexp: regexp.MustCompile("ignore")},225Action: flow_relabel.Drop,226},227},228UseIncomingTimestamp: false,229}230}231232func testArgsWith(t *testing.T, mutator func(arguments *Arguments)) Arguments {233a := testArgsWithPorts(getFreePort(t), getFreePort(t))234mutator(&a)235return a236}237238func waitForServerToBeReady(t *testing.T, comp *Component) {239require.Eventuallyf(t, func() bool {240resp, err := http.Get(fmt.Sprintf(241"http://%v/wrong/url",242comp.target.HTTPListenAddress(),243))244return err == nil && resp.StatusCode == 404245}, 5*time.Second, 20*time.Millisecond, "server failed to start before timeout")246}247248func getFreePort(t *testing.T) int {249port, err := freeport.GetFreePort()250require.NoError(t, err)251return port252}253254func newRegexp() flow_relabel.Regexp {255re, err := regexp.Compile("^(?:(.*))$")256if err != nil {257panic(err)258}259return flow_relabel.Regexp{Regexp: re}260}261262func getEndpoint(target *herokutarget.HerokuTarget) string {263return fmt.Sprintf("http://%s%s", target.HTTPListenAddress(), target.DrainEndpoint())264}265266267