Path: blob/main/component/loki/source/heroku/internal/herokutarget/target_test.go
4096 views
package herokutarget12// This code is copied from Promtail. The herokutarget package is used to3// configure and run the targets that can read heroku entries and forward them4// to other loki components.56import (7"fmt"8"net"9"net/http"10"net/url"11"os"12"strings"13"testing"14"time"1516"github.com/grafana/agent/component/common/loki/client/fake"1718"github.com/go-kit/log"19"github.com/google/uuid"20"github.com/prometheus/client_golang/prometheus"21"github.com/prometheus/common/model"22"github.com/prometheus/prometheus/model/relabel"23"github.com/stretchr/testify/require"2425fnet "github.com/grafana/agent/component/common/net"26)2728const localhost = "127.0.0.1"2930const testPayload = `270 <158>1 2022-06-13T14:52:23.622778+00:00 host heroku router - at=info method=GET path="/" host=cryptic-cliffs-27764.herokuapp.com request_id=59da6323-2bc4-4143-8677-cc66ccfb115f fwd="181.167.87.140" dyno=web.1 connect=0ms service=3ms status=200 bytes=6979 protocol=https31`32const testLogLine1 = `140 <190>1 2022-06-13T14:52:23.621815+00:00 host app web.1 - [GIN] 2022/06/13 - 14:52:23 | 200 | 1.428101ms | 181.167.87.140 | GET "/"33`34const testLogLine1Timestamp = "2022-06-13T14:52:23.621815+00:00"35const testLogLine2 = `156 <190>1 2022-06-13T14:52:23.827271+00:00 host app web.1 - [GIN] 2022/06/13 - 14:52:23 | 200 | 163.92µs | 181.167.87.140 | GET "/static/main.css"36`3738func makeDrainRequest(host string, params map[string][]string, bodies ...string) (*http.Request, error) {39req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/heroku/api/v1/drain", host), strings.NewReader(strings.Join(bodies, "")))40if err != nil {41return nil, err42}4344drainToken := uuid.New().String()45frameID := uuid.New().String()4647values := url.Values{}48for name, params := range params {49for _, p := range params {50values.Add(name, p)51}52}53req.URL.RawQuery = values.Encode()5455req.Header.Set("Content-Type", "application/heroku_drain-1")56req.Header.Set("Logplex-Drain-Token", fmt.Sprintf("d.%s", drainToken))57req.Header.Set("Logplex-Frame-Id", frameID)58req.Header.Set("Logplex-Msg-Count", fmt.Sprintf("%d", len(bodies)))5960return req, nil61}6263func TestHerokuDrainTarget(t *testing.T) {64w := log.NewSyncWriter(os.Stderr)65logger := log.NewLogfmtLogger(w)6667type expectedEntry struct {68labels model.LabelSet69line string70}71type args struct {72RequestBodies []string73RequestParams map[string][]string74RelabelConfigs []*relabel.Config75Labels model.LabelSet76}7778cases := map[string]struct {79args args80expectedEntries []expectedEntry81}{82"heroku request with a single log line, internal labels dropped, and fixed are propagated": {83args: args{84RequestBodies: []string{testPayload},85RequestParams: map[string][]string{},86Labels: model.LabelSet{87"job": "some_job_name",88},89},90expectedEntries: []expectedEntry{91{92labels: model.LabelSet{93"job": "some_job_name",94},95line: `at=info method=GET path="/" host=cryptic-cliffs-27764.herokuapp.com request_id=59da6323-2bc4-4143-8677-cc66ccfb115f fwd="181.167.87.140" dyno=web.1 connect=0ms service=3ms status=200 bytes=6979 protocol=https96`,97},98},99},100"heroku request with a single log line and query parameters, internal labels dropped, and fixed are propagated": {101args: args{102RequestBodies: []string{testPayload},103RequestParams: map[string][]string{104"some_query_param": {"app_123", "app_456"},105},106Labels: model.LabelSet{107"job": "some_job_name",108},109},110expectedEntries: []expectedEntry{111{112labels: model.LabelSet{113"job": "some_job_name",114},115line: `at=info method=GET path="/" host=cryptic-cliffs-27764.herokuapp.com request_id=59da6323-2bc4-4143-8677-cc66ccfb115f fwd="181.167.87.140" dyno=web.1 connect=0ms service=3ms status=200 bytes=6979 protocol=https116`,117},118},119},120"heroku request with two log lines, internal labels dropped, and fixed are propagated": {121args: args{122RequestBodies: []string{testLogLine1, testLogLine2},123RequestParams: map[string][]string{},124Labels: model.LabelSet{125"job": "multiple_line_job",126},127},128expectedEntries: []expectedEntry{129{130labels: model.LabelSet{131"job": "multiple_line_job",132},133line: `[GIN] 2022/06/13 - 14:52:23 | 200 | 1.428101ms | 181.167.87.140 | GET "/"134`,135},136{137labels: model.LabelSet{138"job": "multiple_line_job",139},140line: `[GIN] 2022/06/13 - 14:52:23 | 200 | 163.92µs | 181.167.87.140 | GET "/static/main.css"141`,142},143},144},145"heroku request with two log lines and query parameters, internal labels dropped, and fixed are propagated": {146args: args{147RequestBodies: []string{testLogLine1, testLogLine2},148RequestParams: map[string][]string{149"some_query_param": {"app_123", "app_456"},150},151Labels: model.LabelSet{152"job": "multiple_line_job",153},154},155expectedEntries: []expectedEntry{156{157labels: model.LabelSet{158"job": "multiple_line_job",159},160line: `[GIN] 2022/06/13 - 14:52:23 | 200 | 1.428101ms | 181.167.87.140 | GET "/"161`,162},163{164labels: model.LabelSet{165"job": "multiple_line_job",166},167line: `[GIN] 2022/06/13 - 14:52:23 | 200 | 163.92µs | 181.167.87.140 | GET "/static/main.css"168`,169},170},171},172"heroku request with a single log line, with internal labels relabeled, and fixed labels": {173args: args{174RequestBodies: []string{testLogLine1},175RequestParams: map[string][]string{},176Labels: model.LabelSet{177"job": "relabeling_job",178},179RelabelConfigs: []*relabel.Config{180{181SourceLabels: model.LabelNames{"__heroku_drain_host"},182TargetLabel: "host",183Replacement: "$1",184Action: relabel.Replace,185Regex: relabel.MustNewRegexp("(.*)"),186},187{188SourceLabels: model.LabelNames{"__heroku_drain_app"},189TargetLabel: "app",190Replacement: "$1",191Action: relabel.Replace,192Regex: relabel.MustNewRegexp("(.*)"),193},194{195SourceLabels: model.LabelNames{"__heroku_drain_proc"},196TargetLabel: "procID",197Replacement: "$1",198Action: relabel.Replace,199Regex: relabel.MustNewRegexp("(.*)"),200},201},202},203expectedEntries: []expectedEntry{204{205line: `[GIN] 2022/06/13 - 14:52:23 | 200 | 1.428101ms | 181.167.87.140 | GET "/"206`,207labels: model.LabelSet{208"host": "host",209"app": "app",210"procID": "web.1",211},212},213},214},215"heroku request with a single log line and query parameters, with internal labels relabeled, and fixed labels": {216args: args{217RequestBodies: []string{testLogLine1},218RequestParams: map[string][]string{219"some_query_param": {"app_123", "app_456"},220},221Labels: model.LabelSet{222"job": "relabeling_job",223},224RelabelConfigs: []*relabel.Config{225{226SourceLabels: model.LabelNames{"__heroku_drain_host"},227TargetLabel: "host",228Replacement: "$1",229Action: relabel.Replace,230Regex: relabel.MustNewRegexp("(.*)"),231},232{233SourceLabels: model.LabelNames{"__heroku_drain_app"},234TargetLabel: "app",235Replacement: "$1",236Action: relabel.Replace,237Regex: relabel.MustNewRegexp("(.*)"),238},239{240SourceLabels: model.LabelNames{"__heroku_drain_proc"},241TargetLabel: "procID",242Replacement: "$1",243Action: relabel.Replace,244Regex: relabel.MustNewRegexp("(.*)"),245},246{247SourceLabels: model.LabelNames{"__heroku_drain_param_some_query_param"},248TargetLabel: "query_param",249Replacement: "$1",250Action: relabel.Replace,251Regex: relabel.MustNewRegexp("(.*)"),252},253},254},255expectedEntries: []expectedEntry{256{257line: `[GIN] 2022/06/13 - 14:52:23 | 200 | 1.428101ms | 181.167.87.140 | GET "/"258`,259labels: model.LabelSet{260"host": "host",261"app": "app",262"procID": "web.1",263"query_param": "app_123,app_456",264},265},266},267},268}269for name, tc := range cases {270t.Run(name, func(t *testing.T) {271// Create fake promtail client272eh := fake.NewClient(func() {})273defer eh.Stop()274275serverConfig, port, err := getServerConfigWithAvailablePort()276require.NoError(t, err, "error generating server config or finding open port")277config := &HerokuDrainTargetConfig{278Server: serverConfig,279Labels: tc.args.Labels,280UseIncomingTimestamp: false,281}282283prometheus.DefaultRegisterer = prometheus.NewRegistry()284metrics := NewMetrics(prometheus.DefaultRegisterer)285pt, err := NewHerokuTarget(metrics, logger, eh, tc.args.RelabelConfigs, config, prometheus.DefaultRegisterer)286require.NoError(t, err)287defer func() {288_ = pt.Stop()289}()290291// Clear received lines after test case is ran292defer eh.Clear()293294// Send some logs295ts := time.Now()296297req, err := makeDrainRequest(fmt.Sprintf("http://%s:%d", localhost, port), tc.args.RequestParams, tc.args.RequestBodies...)298require.NoError(t, err, "expected test drain request to be successfully created")299res, err := http.DefaultClient.Do(req)300require.NoError(t, err)301require.Equal(t, http.StatusNoContent, res.StatusCode, "expected no-content status code")302303waitForMessages(eh)304305// Make sure we didn't time out306require.Equal(t, len(tc.args.RequestBodies), len(eh.Received()))307308require.Equal(t, len(eh.Received()), len(tc.expectedEntries), "expected to receive equal amount of expected label sets")309for i, expectedEntry := range tc.expectedEntries {310// TODO: Add assertion over propagated timestamp311actualEntry := eh.Received()[i]312313require.Equal(t, expectedEntry.line, actualEntry.Line, "expected line to be equal for %d-th entry", i)314315expectedLS := expectedEntry.labels316actualLS := actualEntry.Labels317for label, value := range expectedLS {318require.Equal(t, expectedLS[label], actualLS[label], "expected label %s to be equal to %s in %d-th entry", label, value, i)319}320321// Timestamp is always set in the handler, we expect received timestamps to be slightly higher than the timestamp when we started sending logs.322require.GreaterOrEqual(t, actualEntry.Timestamp.Unix(), ts.Unix(), "expected %d-th entry to have a received timestamp greater than publish time", i)323}324})325}326}327328func TestHerokuDrainTarget_UseIncomingTimestamp(t *testing.T) {329w := log.NewSyncWriter(os.Stderr)330logger := log.NewLogfmtLogger(w)331332// Create fake promtail client333eh := fake.NewClient(func() {})334defer eh.Stop()335336serverConfig, port, err := getServerConfigWithAvailablePort()337require.NoError(t, err, "error generating server config or finding open port")338config := &HerokuDrainTargetConfig{339Server: serverConfig,340Labels: nil,341UseIncomingTimestamp: true,342}343344prometheus.DefaultRegisterer = prometheus.NewRegistry()345metrics := NewMetrics(prometheus.DefaultRegisterer)346pt, err := NewHerokuTarget(metrics, logger, eh, nil, config, prometheus.DefaultRegisterer)347require.NoError(t, err)348defer func() {349_ = pt.Stop()350}()351352// Clear received lines after test case is ran353defer eh.Clear()354355req, err := makeDrainRequest(fmt.Sprintf("http://%s:%d", localhost, port), make(map[string][]string), testLogLine1)356require.NoError(t, err, "expected test drain request to be successfully created")357res, err := http.DefaultClient.Do(req)358require.NoError(t, err)359require.Equal(t, http.StatusNoContent, res.StatusCode, "expected no-content status code")360361waitForMessages(eh)362363// Make sure we didn't time out364require.Equal(t, 1, len(eh.Received()))365366expectedTs, err := time.Parse(time.RFC3339Nano, testLogLine1Timestamp)367require.NoError(t, err, "expected expected timestamp to be parse correctly")368require.Equal(t, expectedTs, eh.Received()[0].Timestamp, "expected entry timestamp to be overridden by received one")369}370371func TestHerokuDrainTarget_UseTenantIDHeaderIfPresent(t *testing.T) {372w := log.NewSyncWriter(os.Stderr)373logger := log.NewLogfmtLogger(w)374375// Create fake promtail client376eh := fake.NewClient(func() {})377defer eh.Stop()378379serverConfig, port, err := getServerConfigWithAvailablePort()380require.NoError(t, err, "error generating server config or finding open port")381config := &HerokuDrainTargetConfig{382Server: serverConfig,383Labels: nil,384UseIncomingTimestamp: true,385}386387prometheus.DefaultRegisterer = prometheus.NewRegistry()388metrics := NewMetrics(prometheus.DefaultRegisterer)389tenantIDRelabelConfig := []*relabel.Config{390{391SourceLabels: model.LabelNames{"__tenant_id__"},392TargetLabel: "tenant_id",393Replacement: "$1",394Action: relabel.Replace,395Regex: relabel.MustNewRegexp("(.*)"),396},397}398pt, err := NewHerokuTarget(metrics, logger, eh, tenantIDRelabelConfig, config, prometheus.DefaultRegisterer)399require.NoError(t, err)400defer func() {401_ = pt.Stop()402}()403404// Clear received lines after test case is ran405defer eh.Clear()406407req, err := makeDrainRequest(fmt.Sprintf("http://%s:%d", localhost, port), make(map[string][]string), testLogLine1)408require.NoError(t, err, "expected test drain request to be successfully created")409req.Header.Set("X-Scope-OrgID", "42")410res, err := http.DefaultClient.Do(req)411require.NoError(t, err)412require.Equal(t, http.StatusNoContent, res.StatusCode, "expected no-content status code")413414waitForMessages(eh)415416// Make sure we didn't time out417require.Equal(t, 1, len(eh.Received()))418419require.Equal(t, model.LabelValue("42"), eh.Received()[0].Labels[ReservedLabelTenantID])420require.Equal(t, model.LabelValue("42"), eh.Received()[0].Labels["tenant_id"])421}422423func waitForMessages(eh *fake.Client) {424countdown := 1000425for len(eh.Received()) != 1 && countdown > 0 {426time.Sleep(1 * time.Millisecond)427countdown--428}429}430431func getServerConfigWithAvailablePort() (cfg *fnet.ServerConfig, port int, err error) {432// Get a randomly available port by open and closing a TCP socket433addr, err := net.ResolveTCPAddr("tcp", localhost+":0")434if err != nil {435return436}437l, err := net.ListenTCP("tcp", addr)438if err != nil {439return440}441port = l.Addr().(*net.TCPAddr).Port442err = l.Close()443if err != nil {444return445}446447cfg = &fnet.ServerConfig{448HTTP: &fnet.HTTPConfig{449ListenAddress: localhost,450ListenPort: port,451},452// assign random grpc port453GRPC: &fnet.GRPCConfig{ListenPort: 0},454}455456return457}458459460