Path: blob/main/component/loki/source/gcplog/internal/gcplogtarget/push_target_test.go
4096 views
package gcplogtarget12import (3"fmt"4"net/http"5"os"6"strings"7"sync"8"testing"9"time"1011"github.com/grafana/agent/component/common/loki/client/fake"1213"github.com/grafana/agent/component/common/loki"14fnet "github.com/grafana/agent/component/common/net"1516"github.com/go-kit/log"17"github.com/phayes/freeport"18"github.com/prometheus/client_golang/prometheus"19"github.com/prometheus/common/model"20"github.com/prometheus/prometheus/model/relabel"21"github.com/stretchr/testify/require"22)2324const localhost = "127.0.0.1"2526const expectedMessageData = `{"insertId":"4affa858-e5f2-47f7-9254-e609b5c014d0","labels":{},"logName":"projects/test-project/logs/cloudaudit.googleapis.com%2Fdata_access","receiveTimestamp":"2022-09-06T18:07:43.417714046Z","resource":{"labels":{"cluster_name":"dev-us-central-42","location":"us-central1","project_id":"test-project"},"type":"k8s_cluster"},"timestamp":"2022-09-06T18:07:42.363113Z"}27`28const testPayload = `29{30"message": {31"attributes": {32"logging.googleapis.com/timestamp": "2022-07-25T22:19:09.903683708Z"33},34"data": "eyJpbnNlcnRJZCI6IjRhZmZhODU4LWU1ZjItNDdmNy05MjU0LWU2MDliNWMwMTRkMCIsImxhYmVscyI6e30sImxvZ05hbWUiOiJwcm9qZWN0cy90ZXN0LXByb2plY3QvbG9ncy9jbG91ZGF1ZGl0Lmdvb2dsZWFwaXMuY29tJTJGZGF0YV9hY2Nlc3MiLCJyZWNlaXZlVGltZXN0YW1wIjoiMjAyMi0wOS0wNlQxODowNzo0My40MTc3MTQwNDZaIiwicmVzb3VyY2UiOnsibGFiZWxzIjp7ImNsdXN0ZXJfbmFtZSI6ImRldi11cy1jZW50cmFsLTQyIiwibG9jYXRpb24iOiJ1cy1jZW50cmFsMSIsInByb2plY3RfaWQiOiJ0ZXN0LXByb2plY3QifSwidHlwZSI6Ims4c19jbHVzdGVyIn0sInRpbWVzdGFtcCI6IjIwMjItMDktMDZUMTg6MDc6NDIuMzYzMTEzWiJ9Cg==",35"messageId": "5187581549398349",36"message_id": "5187581549398349",37"publishTime": "2022-07-25T22:19:15.56Z",38"publish_time": "2022-07-25T22:19:15.56Z"39},40"subscription": "projects/test-project/subscriptions/test"41}`4243func makeGCPPushRequest(host string, body string) (*http.Request, error) {44req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/gcp/api/v1/push", host), strings.NewReader(body))45if err != nil {46return nil, err47}48return req, nil49}5051func TestPushTarget(t *testing.T) {52w := log.NewSyncWriter(os.Stderr)53logger := log.NewLogfmtLogger(w)5455type expectedEntry struct {56labels model.LabelSet57line string58}59type args struct {60RequestBody string61RelabelConfigs []*relabel.Config62Labels model.LabelSet63}6465cases := map[string]struct {66args args67expectedEntries []expectedEntry68}{69"simplified cloud functions log line": {70args: args{71RequestBody: testPayload,72Labels: model.LabelSet{73"job": "some_job_name",74},75},76expectedEntries: []expectedEntry{77{78labels: model.LabelSet{79"job": "some_job_name",80},81line: expectedMessageData,82},83},84},85"simplified cloud functions log line, with relabeling custom attribute and message id": {86args: args{87RequestBody: testPayload,88Labels: model.LabelSet{89"job": "some_job_name",90},91RelabelConfigs: []*relabel.Config{92{93SourceLabels: model.LabelNames{"__gcp_attributes_logging_googleapis_com_timestamp"},94Regex: relabel.MustNewRegexp("(.*)"),95Replacement: "$1",96TargetLabel: "google_timestamp",97Action: relabel.Replace,98},99{100SourceLabels: model.LabelNames{"__gcp_message_id"},101Regex: relabel.MustNewRegexp("(.*)"),102Replacement: "$1",103TargetLabel: "message_id",104Action: relabel.Replace,105},106{107SourceLabels: model.LabelNames{"__gcp_subscription_name"},108Regex: relabel.MustNewRegexp("(.*)"),109Replacement: "$1",110TargetLabel: "subscription",111Action: relabel.Replace,112},113// Internal GCP Log entry attributes and labels114{115SourceLabels: model.LabelNames{"__gcp_logname"},116Regex: relabel.MustNewRegexp("(.*)"),117Replacement: "$1",118TargetLabel: "log_name",119Action: relabel.Replace,120},121{122SourceLabels: model.LabelNames{"__gcp_resource_type"},123Regex: relabel.MustNewRegexp("(.*)"),124Replacement: "$1",125TargetLabel: "resource_type",126Action: relabel.Replace,127},128{129SourceLabels: model.LabelNames{"__gcp_resource_labels_cluster_name"},130Regex: relabel.MustNewRegexp("(.*)"),131Replacement: "$1",132TargetLabel: "cluster",133Action: relabel.Replace,134},135},136},137expectedEntries: []expectedEntry{138{139labels: model.LabelSet{140"job": "some_job_name",141"google_timestamp": "2022-07-25T22:19:09.903683708Z",142"message_id": "5187581549398349",143"subscription": "projects/test-project/subscriptions/test",144"log_name": "projects/test-project/logs/cloudaudit.googleapis.com%2Fdata_access",145"resource_type": "k8s_cluster",146"cluster": "dev-us-central-42",147},148line: expectedMessageData,149},150},151},152}153for name, tc := range cases {154outerName := t.Name()155t.Run(name, func(t *testing.T) {156// Create fake promtail client157eh := fake.NewClient(func() {})158defer eh.Stop()159160port, err := freeport.GetFreePort()161require.NoError(t, err)162lbls := make(map[string]string, len(tc.args.Labels))163for k, v := range tc.args.Labels {164lbls[string(k)] = string(v)165}166config := &PushConfig{167Labels: lbls,168UseIncomingTimestamp: false,169Server: &fnet.ServerConfig{170HTTP: &fnet.HTTPConfig{171ListenAddress: "localhost",172ListenPort: port,173},174// assign random grpc port175GRPC: &fnet.GRPCConfig{ListenPort: 0},176},177}178179prometheus.DefaultRegisterer = prometheus.NewRegistry()180metrics := NewMetrics(prometheus.DefaultRegisterer)181pt, err := NewPushTarget(metrics, logger, eh, outerName+"_test_job", config, tc.args.RelabelConfigs, nil)182require.NoError(t, err)183defer func() {184_ = pt.Stop()185}()186187// Clear received lines after test case is ran188defer eh.Clear()189190// Send some logs191ts := time.Now()192193req, err := makeGCPPushRequest(fmt.Sprintf("http://%s:%d", localhost, port), tc.args.RequestBody)194require.NoError(t, err, "expected request to be created successfully")195res, err := http.DefaultClient.Do(req)196require.NoError(t, err)197require.Equal(t, http.StatusNoContent, res.StatusCode, "expected no-content status code")198199waitForMessages(eh)200201// Make sure we didn't time out202require.Equal(t, 1, len(eh.Received()))203204require.Equal(t, len(eh.Received()), len(tc.expectedEntries), "expected to receive equal amount of expected label sets")205for i, expectedEntry := range tc.expectedEntries {206// TODO: Add assertion over propagated timestamp207actualEntry := eh.Received()[i]208209require.Equal(t, expectedEntry.line, actualEntry.Line, "expected line to be equal for %d-th entry", i)210211expectedLS := expectedEntry.labels212actualLS := actualEntry.Labels213for label, value := range expectedLS {214require.Equal(t, expectedLS[label], actualLS[label], "expected label %s to be equal to %s in %d-th entry", label, value, i)215}216217// Timestamp is always set in the handler, we expect received timestamps to be slightly higher than the timestamp when we started sending logs.218require.GreaterOrEqual(t, actualEntry.Timestamp.Unix(), ts.Unix(), "expected %d-th entry to have a received timestamp greater than publish time", i)219}220})221}222}223224func TestPushTarget_UseIncomingTimestamp(t *testing.T) {225w := log.NewSyncWriter(os.Stderr)226logger := log.NewLogfmtLogger(w)227228// Create fake promtail client229eh := fake.NewClient(func() {})230defer eh.Stop()231232port, err := freeport.GetFreePort()233require.NoError(t, err)234require.NoError(t, err, "error generating server config or finding open port")235config := &PushConfig{236Labels: nil,237UseIncomingTimestamp: true,238Server: &fnet.ServerConfig{239HTTP: &fnet.HTTPConfig{240ListenAddress: "localhost",241ListenPort: port,242},243// assign random grpc port244GRPC: &fnet.GRPCConfig{ListenPort: 0},245},246}247248prometheus.DefaultRegisterer = prometheus.NewRegistry()249metrics := NewMetrics(prometheus.DefaultRegisterer)250pt, err := NewPushTarget(metrics, logger, eh, t.Name()+"_test_job", config, nil, nil)251require.NoError(t, err)252defer func() {253_ = pt.Stop()254}()255256// Clear received lines after test case is ran257defer eh.Clear()258259req, err := makeGCPPushRequest(fmt.Sprintf("http://%s:%d", localhost, port), testPayload)260require.NoError(t, err, "expected request to be created successfully")261res, err := http.DefaultClient.Do(req)262require.NoError(t, err)263require.Equal(t, http.StatusNoContent, res.StatusCode, "expected no-content status code")264265waitForMessages(eh)266267// Make sure we didn't time out268require.Equal(t, 1, len(eh.Received()))269270expectedTs, err := time.Parse(time.RFC3339Nano, "2022-09-06T18:07:42.363113Z")271require.NoError(t, err, "expected expected timestamp to be parse correctly")272require.Equal(t, expectedTs, eh.Received()[0].Timestamp, "expected entry timestamp to be overridden by received one")273}274275func TestPushTarget_UseTenantIDHeaderIfPresent(t *testing.T) {276w := log.NewSyncWriter(os.Stderr)277logger := log.NewLogfmtLogger(w)278279// Create fake promtail client280eh := fake.NewClient(func() {})281defer eh.Stop()282283port, err := freeport.GetFreePort()284require.NoError(t, err)285config := &PushConfig{286Labels: nil,287UseIncomingTimestamp: true,288Server: &fnet.ServerConfig{289HTTP: &fnet.HTTPConfig{290ListenAddress: "localhost",291ListenPort: port,292},293// assign random grpc port294GRPC: &fnet.GRPCConfig{ListenPort: 0},295},296}297298prometheus.DefaultRegisterer = prometheus.NewRegistry()299metrics := NewMetrics(prometheus.DefaultRegisterer)300tenantIDRelabelConfig := []*relabel.Config{301{302SourceLabels: model.LabelNames{"__tenant_id__"},303Regex: relabel.MustNewRegexp("(.*)"),304Replacement: "$1",305TargetLabel: "tenant_id",306Action: relabel.Replace,307},308}309pt, err := NewPushTarget(metrics, logger, eh, t.Name()+"_test_job", config, tenantIDRelabelConfig, nil)310require.NoError(t, err)311defer func() {312_ = pt.Stop()313}()314315// Clear received lines after test case is ran316defer eh.Clear()317318req, err := makeGCPPushRequest(fmt.Sprintf("http://%s:%d", localhost, port), testPayload)319require.NoError(t, err, "expected request to be created successfully")320req.Header.Set("X-Scope-OrgID", "42")321res, err := http.DefaultClient.Do(req)322require.NoError(t, err)323require.Equal(t, http.StatusNoContent, res.StatusCode, "expected no-content status code")324325waitForMessages(eh)326327// Make sure we didn't time out328require.Equal(t, 1, len(eh.Received()))329330require.Equal(t, model.LabelValue("42"), eh.Received()[0].Labels[ReservedLabelTenantID])331require.Equal(t, model.LabelValue("42"), eh.Received()[0].Labels["tenant_id"])332}333334func TestPushTarget_ErroneousPayloadsAreRejected(t *testing.T) {335w := log.NewSyncWriter(os.Stderr)336logger := log.NewLogfmtLogger(w)337338// Create fake promtail client339eh := fake.NewClient(func() {})340defer eh.Stop()341342port, err := freeport.GetFreePort()343require.NoError(t, err)344config := &PushConfig{345Labels: nil,346Server: &fnet.ServerConfig{347HTTP: &fnet.HTTPConfig{348ListenAddress: "localhost",349ListenPort: port,350},351// assign random grpc port352GRPC: &fnet.GRPCConfig{ListenPort: 0},353},354}355356prometheus.DefaultRegisterer = prometheus.NewRegistry()357metrics := NewMetrics(prometheus.DefaultRegisterer)358pt, err := NewPushTarget(metrics, logger, eh, t.Name()+"_test_job", config, nil, nil)359require.NoError(t, err)360defer func() {361_ = pt.Stop()362}()363364// Clear received lines after test case is ran365defer eh.Clear()366367for caseName, testPayload := range map[string]string{368"invalid JSON": "{",369"empty": "{}",370"missing subscription": `{371"message": {372"message_id": "123",373"data": "some data"374}375}`,376"missing message ID": `{377"subscription": "sub",378"message": {379"data": "data"380}381}`,382"missing data": `{383"subscription": "sub",384"message": {385"data": "",386"message_id":"123"387}388}`,389} {390t.Run(caseName, func(t *testing.T) {391req, err := makeGCPPushRequest(fmt.Sprintf("http://%s:%d", localhost, port), testPayload)392require.NoError(t, err, "expected request to be created successfully")393res, err := http.DefaultClient.Do(req)394res.Request.Body.Close()395require.NoError(t, err)396require.Equal(t, http.StatusBadRequest, res.StatusCode, "expected bad request status code")397})398}399}400401// blockingEntryHandler implements an loki.EntryHandler that has no space in402// it's receive channel, blocking when an loki.Entry is sent down the pipe.403type blockingEntryHandler struct {404ch chan loki.Entry405once sync.Once406}407408func newBlockingEntryHandler() *blockingEntryHandler {409filledChannel := make(chan loki.Entry)410return &blockingEntryHandler{ch: filledChannel}411}412413func (t *blockingEntryHandler) Chan() chan<- loki.Entry {414return t.ch415}416417func (t *blockingEntryHandler) Stop() {418t.once.Do(func() { close(t.ch) })419}420421func TestPushTarget_UsePushTimeout(t *testing.T) {422w := log.NewSyncWriter(os.Stderr)423logger := log.NewLogfmtLogger(w)424425eh := newBlockingEntryHandler()426defer eh.Stop()427428port, err := freeport.GetFreePort()429require.NoError(t, err)430config := &PushConfig{431Labels: nil,432UseIncomingTimestamp: true,433PushTimeout: time.Second,434Server: &fnet.ServerConfig{435HTTP: &fnet.HTTPConfig{436ListenAddress: "localhost",437ListenPort: port,438},439// assign random grpc port440GRPC: &fnet.GRPCConfig{ListenPort: 0},441},442}443444prometheus.DefaultRegisterer = prometheus.NewRegistry()445metrics := NewMetrics(prometheus.DefaultRegisterer)446tenantIDRelabelConfig := []*relabel.Config{447{448SourceLabels: model.LabelNames{"__tenant_id__"},449Regex: relabel.MustNewRegexp("(.*)"),450Replacement: "$1",451TargetLabel: "tenant_id",452Action: relabel.Replace,453},454}455pt, err := NewPushTarget(metrics, logger, eh, t.Name()+"_test_job", config, tenantIDRelabelConfig, nil)456require.NoError(t, err)457defer func() {458_ = pt.Stop()459}()460461req, err := makeGCPPushRequest(fmt.Sprintf("http://%s:%d", localhost, port), testPayload)462require.NoError(t, err, "expected request to be created successfully")463res, err := http.DefaultClient.Do(req)464require.NoError(t, err)465require.Equal(t, http.StatusServiceUnavailable, res.StatusCode, "expected timeout response")466}467468func waitForMessages(eh *fake.Client) {469countdown := 1000470for len(eh.Received()) != 1 && countdown > 0 {471time.Sleep(1 * time.Millisecond)472countdown--473}474}475476477