Path: blob/main/component/loki/source/gcplog/internal/gcplogtarget/pull_target_test.go
4096 views
package gcplogtarget12import (3"context"4"errors"5"io"6"testing"7"time"89"github.com/grafana/agent/component/common/loki/client/fake"1011"cloud.google.com/go/pubsub"12"github.com/go-kit/log"13"github.com/grafana/dskit/backoff"14"github.com/prometheus/client_golang/prometheus"15"github.com/stretchr/testify/require"16"gotest.tools/assert"17)1819func TestPullTarget_RunStop(t *testing.T) {20t.Run("it sends messages to the promclient and stopps when Stop() is called", func(t *testing.T) {21tc := testPullTarget(t)2223runErr := make(chan error)24go func() {25runErr <- tc.target.run()26}()2728tc.sub.messages <- &pubsub.Message{Data: []byte(gcpLogEntry)}29require.Eventually(t, func() bool {30return len(tc.promClient.Received()) > 031}, time.Second, 50*time.Millisecond)3233require.NoError(t, tc.target.Stop())34require.EqualError(t, <-runErr, "context canceled")35})3637t.Run("it retries when there is an error", func(t *testing.T) {38tc := testPullTarget(t)3940runErr := make(chan error)41go func() {42runErr <- tc.target.run()43}()4445tc.sub.errors <- errors.New("something bad")46tc.sub.messages <- &pubsub.Message{Data: []byte(gcpLogEntry)}47require.Eventually(t, func() bool {48return len(tc.promClient.Received()) > 049}, time.Second, 50*time.Millisecond)5051require.NoError(t, tc.target.Stop())5253require.Eventually(t, func() bool {54select {55case e := <-runErr:56return e.Error() == "context canceled"57default:58return false59}60}, time.Second, 50*time.Millisecond)61})6263t.Run("a successful message resets retries", func(t *testing.T) {64tc := testPullTarget(t)6566runErr := make(chan error)67go func() {68runErr <- tc.target.run()69}()7071tc.sub.errors <- errors.New("something bad")72tc.sub.errors <- errors.New("something bad")73tc.sub.errors <- errors.New("something bad")74tc.sub.errors <- errors.New("something bad")75tc.sub.messages <- &pubsub.Message{Data: []byte(gcpLogEntry)}76tc.sub.errors <- errors.New("something bad")77tc.sub.errors <- errors.New("something bad")78tc.sub.messages <- &pubsub.Message{Data: []byte(gcpLogEntry)}7980require.Eventually(t, func() bool {81return len(tc.promClient.Received()) > 182}, time.Second, 50*time.Millisecond)8384require.NoError(t, tc.target.Stop())85})86}8788// func TestPullTarget_Ready(t *testing.T) {89// tc := testPullTarget(t)90// assert.Equal(t, true, tc.target.Ready())91// }9293func TestPullTarget_Labels(t *testing.T) {94tc := testPullTarget(t)9596assert.Equal(t, `{job="test-gcplogtarget"}`, tc.target.Labels().String())97}9899type testContext struct {100target *PullTarget101promClient *fake.Client102sub *fakeSubscription103}104105func testPullTarget(t *testing.T) *testContext {106t.Helper()107108ctx, cancel := context.WithCancel(context.Background())109sub := newFakeSubscription()110promClient := fake.NewClient(func() {})111target := &PullTarget{112metrics: NewMetrics(prometheus.NewRegistry()),113logger: log.NewNopLogger(),114handler: promClient,115relabelConfig: nil,116ctx: ctx,117cancel: cancel,118config: testConfig,119jobName: t.Name() + "job-test-gcplogtarget",120ps: io.NopCloser(nil),121sub: sub,122msgs: make(chan *pubsub.Message),123backoff: backoff.New(ctx, testBackoff),124}125126return &testContext{127target: target,128promClient: promClient,129sub: sub,130}131}132133const (134project = "test-project"135subscription = "test-subscription"136gcpLogEntry = `137{138"insertId": "ajv4d1f1ch8dr",139"logName": "projects/grafanalabs-dev/logs/cloudaudit.googleapis.com%2Fdata_access",140"protoPayload": {141"@type": "type.googleapis.com/google.cloud.audit.AuditLog",142"authenticationInfo": {143"principalEmail": "[email protected]",144"serviceAccountDelegationInfo": [145{146"firstPartyPrincipal": {147"principalEmail": "[email protected]"148}149}150]151},152"authorizationInfo": [153{154"granted": true,155"permission": "storage.objects.list",156"resource": "projects/_/buckets/dev-us-central1-cortex-tsdb-dev",157"resourceAttributes": {158}159},160{161"permission": "storage.objects.get",162"resource": "projects/_/buckets/dev-us-central1-cortex-tsdb-dev/objects/load-generator-20/01EM34PFBC2SCV3ETBGRAQZ090/deletion-mark.json",163"resourceAttributes": {164}165}166],167"methodName": "storage.objects.get",168"requestMetadata": {169"callerIp": "34.66.19.193",170"callerNetwork": "//compute.googleapis.com/projects/grafanalabs-dev/global/networks/__unknown__",171"callerSuppliedUserAgent": "thanos-store-gateway/1.5.0 (go1.14.9),gzip(gfe)",172"destinationAttributes": {173},174"requestAttributes": {175"auth": {176},177"time": "2021-01-01T02:17:10.661405637Z"178}179},180"resourceLocation": {181"currentLocations": [182"us-central1"183]184},185"resourceName": "projects/_/buckets/dev-us-central1-cortex-tsdb-dev/objects/load-generator-20/01EM34PFBC2SCV3ETBGRAQZ090/deletion-mark.json",186"serviceName": "storage.googleapis.com",187"status": {188}189},190"receiveTimestamp": "2021-01-01T02:17:10.82013623Z",191"resource": {192"labels": {193"bucket_name": "dev-us-central1-cortex-tsdb-dev",194"location": "us-central1",195"project_id": "grafanalabs-dev"196},197"type": "gcs_bucket"198},199"severity": "INFO",200"timestamp": "2021-01-01T02:17:10.655982344Z"201}202`203)204205var testConfig = &PullConfig{206ProjectID: project,207Subscription: subscription,208Labels: map[string]string{209"job": "test-gcplogtarget",210},211}212213func newFakeSubscription() *fakeSubscription {214return &fakeSubscription{215messages: make(chan *pubsub.Message),216errors: make(chan error),217}218}219220type fakeSubscription struct {221messages chan *pubsub.Message222errors chan error223}224225func (s *fakeSubscription) Receive(ctx context.Context, f func(context.Context, *pubsub.Message)) error {226for {227select {228case m := <-s.messages:229f(ctx, m)230case e := <-s.errors:231return e232}233}234}235236var testBackoff = backoff.Config{237MinBackoff: 1 * time.Millisecond,238MaxBackoff: 10 * time.Millisecond,239}240241242