Path: blob/main/component/loki/source/gcplog/gcplog_test.go
4096 views
package gcplog12import (3"context"4"fmt"5"net/http"6"strings"7"testing"8"time"910"github.com/grafana/agent/component"11"github.com/grafana/agent/component/common/loki"12fnet "github.com/grafana/agent/component/common/net"13flow_relabel "github.com/grafana/agent/component/common/relabel"14gt "github.com/grafana/agent/component/loki/source/gcplog/internal/gcplogtarget"1516"github.com/grafana/agent/pkg/util"17"github.com/grafana/regexp"18"github.com/phayes/freeport"19"github.com/prometheus/client_golang/prometheus"20"github.com/prometheus/common/model"21"github.com/stretchr/testify/require"22)2324// TODO (@tpaschalis) We can't test this easily as there's no way to inject25// the mock PubSub client inside the component, but we'll find a workaround.26func TestPull(t *testing.T) {}2728func TestPush(t *testing.T) {29opts := component.Options{30Logger: util.TestFlowLogger(t),31Registerer: prometheus.NewRegistry(),32OnStateChange: func(e component.Exports) {},33}3435ch1, ch2 := make(chan loki.Entry), make(chan loki.Entry)36args := Arguments{}3738port, err := freeport.GetFreePort()39require.NoError(t, err)40args.PushTarget = >.PushConfig{41Server: &fnet.ServerConfig{42HTTP: &fnet.HTTPConfig{43ListenAddress: "localhost",44ListenPort: port,45},46// assign random grpc port47GRPC: &fnet.GRPCConfig{ListenPort: 0},48},49Labels: map[string]string{50"foo": "bar",51},52}53args.ForwardTo = []loki.LogsReceiver{ch1, ch2}54args.RelabelRules = exportedRules5556// Create and run the component.57c, err := New(opts, args)58require.NoError(t, err)5960go c.Run(context.Background())61time.Sleep(200 * time.Millisecond)6263// Create a GCP PushRequest and send it to the launched server.64req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://localhost:%d/gcp/api/v1/push", port), strings.NewReader(testPushPayload))65require.NoError(t, err)6667res, err := http.DefaultClient.Do(req)68require.NoError(t, err)69require.Equal(t, http.StatusNoContent, res.StatusCode)7071// Check the received log entries72wantLabelSet := model.LabelSet{"foo": "bar", "message_id": "5187581549398349", "resource_type": "k8s_cluster"}73wantLogLine := "{\"insertId\":\"4affa858-e5f2-47f7-9254-e609b5c014d0\",\"labels\":{},\"logName\":\"projects/test-project/logs/cloudaudit.googleapis.com%2Fdata_access\",\"receiveTimestamp\":\"2022-09-06T18:07:43.417714046Z\",\"resource\":{\"labels\":{\"cluster_name\":\"dev-us-central-42\",\"location\":\"us-central1\",\"project_id\":\"test-project\"},\"type\":\"k8s_cluster\"},\"timestamp\":\"2022-09-06T18:07:42.363113Z\"}\n"7475for i := 0; i < 2; i++ {76select {77case logEntry := <-ch1:78require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)79require.Equal(t, wantLogLine, logEntry.Line)80require.Equal(t, wantLabelSet, logEntry.Labels)81case logEntry := <-ch2:82require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)83require.Equal(t, wantLogLine, logEntry.Line)84require.Equal(t, wantLabelSet, logEntry.Labels)85case <-time.After(5 * time.Second):86require.FailNow(t, "failed waiting for log line")87}88}89}9091const testPushPayload = `92{93"message": {94"attributes": {95"logging.googleapis.com/timestamp": "2022-07-25T22:19:09.903683708Z"96},97"data": "eyJpbnNlcnRJZCI6IjRhZmZhODU4LWU1ZjItNDdmNy05MjU0LWU2MDliNWMwMTRkMCIsImxhYmVscyI6e30sImxvZ05hbWUiOiJwcm9qZWN0cy90ZXN0LXByb2plY3QvbG9ncy9jbG91ZGF1ZGl0Lmdvb2dsZWFwaXMuY29tJTJGZGF0YV9hY2Nlc3MiLCJyZWNlaXZlVGltZXN0YW1wIjoiMjAyMi0wOS0wNlQxODowNzo0My40MTc3MTQwNDZaIiwicmVzb3VyY2UiOnsibGFiZWxzIjp7ImNsdXN0ZXJfbmFtZSI6ImRldi11cy1jZW50cmFsLTQyIiwibG9jYXRpb24iOiJ1cy1jZW50cmFsMSIsInByb2plY3RfaWQiOiJ0ZXN0LXByb2plY3QifSwidHlwZSI6Ims4c19jbHVzdGVyIn0sInRpbWVzdGFtcCI6IjIwMjItMDktMDZUMTg6MDc6NDIuMzYzMTEzWiJ9Cg==",98"messageId": "5187581549398349",99"message_id": "5187581549398349",100"publishTime": "2022-07-25T22:19:15.56Z",101"publish_time": "2022-07-25T22:19:15.56Z"102},103"subscription": "projects/test-project/subscriptions/test"104}`105106var exportedRules = flow_relabel.Rules{107{108SourceLabels: []string{"__gcp_message_id"},109Regex: mustNewRegexp("(.*)"),110Action: flow_relabel.Replace,111Replacement: "$1",112TargetLabel: "message_id",113},114{115SourceLabels: []string{"__gcp_resource_type"},116Regex: mustNewRegexp("(.*)"),117Action: flow_relabel.Replace,118Replacement: "$1",119TargetLabel: "resource_type",120},121}122123func mustNewRegexp(s string) flow_relabel.Regexp {124re, err := regexp.Compile("^(?:" + s + ")$")125if err != nil {126panic(err)127}128return flow_relabel.Regexp{Regexp: re}129}130131132