Path: blob/main/component/loki/source/cloudflare/internal/cloudflaretarget/target_test.go
4097 views
package cloudflaretarget12// This code is copied from Promtail. The cloudflaretarget package is used to3// configure and run a target that can read from the Cloudflare Logpull API and4// forward entries to other loki components.56import (7"context"8"errors"9"os"10"sort"11"testing"12"time"1314"github.com/grafana/agent/component/common/loki/client/fake"1516"github.com/go-kit/log"17"github.com/grafana/agent/component/common/loki/positions"18"github.com/prometheus/client_golang/prometheus"19"github.com/prometheus/common/model"20"github.com/stretchr/testify/assert"21"github.com/stretchr/testify/mock"22"github.com/stretchr/testify/require"23)2425func Test_CloudflareTarget(t *testing.T) {26var (27w = log.NewSyncWriter(os.Stderr)28logger = log.NewLogfmtLogger(w)29cfg = &Config{30APIToken: "foo",31ZoneID: "bar",32Labels: model.LabelSet{"job": "cloudflare"},33PullRange: model.Duration(time.Minute),34FieldsType: string(FieldsTypeDefault),35Workers: 3,36}37end = time.Unix(0, time.Hour.Nanoseconds())38start = time.Unix(0, time.Hour.Nanoseconds()-int64(cfg.PullRange))39client = fake.NewClient(func() {})40cfClient = newFakeCloudflareClient()41)42ps, err := positions.New(logger, positions.Config{43SyncPeriod: 10 * time.Second,44PositionsFile: t.TempDir() + "/positions.yml",45})46// set our end time to be the last time we have a position47ps.Put(positions.CursorKey(cfg.ZoneID), cfg.Labels.String(), end.UnixNano())48require.NoError(t, err)4950// setup response for the first pull batch of 1 minutes.51cfClient.On("LogpullReceived", mock.Anything, start, start.Add(time.Duration(cfg.PullRange/3))).Return(&fakeLogIterator{52logs: []string{53`{"EdgeStartTimestamp":1, "EdgeRequestHost":"foo.com"}`,54},55}, nil)56cfClient.On("LogpullReceived", mock.Anything, start.Add(time.Duration(cfg.PullRange/3)), start.Add(time.Duration(2*cfg.PullRange/3))).Return(&fakeLogIterator{57logs: []string{58`{"EdgeStartTimestamp":2, "EdgeRequestHost":"bar.com"}`,59},60}, nil)61cfClient.On("LogpullReceived", mock.Anything, start.Add(time.Duration(2*cfg.PullRange/3)), end).Return(&fakeLogIterator{62logs: []string{63`{"EdgeStartTimestamp":3, "EdgeRequestHost":"buzz.com"}`,64`{"EdgeRequestHost":"fuzz.com"}`,65},66}, nil)67// setup empty response for the rest.68cfClient.On("LogpullReceived", mock.Anything, mock.Anything, mock.Anything).Return(&fakeLogIterator{69logs: []string{},70}, nil)71// replace the client.72getClient = func(apiKey, zoneID string, fields []string) (Client, error) {73return cfClient, nil74}7576ta, err := NewTarget(NewMetrics(prometheus.NewRegistry()), logger, client, ps, cfg)77require.NoError(t, err)78require.True(t, ta.Ready())7980require.Eventually(t, func() bool {81return len(client.Received()) == 482}, 5*time.Second, 100*time.Millisecond)8384received := client.Received()85sort.Slice(received, func(i, j int) bool {86return received[i].Timestamp.After(received[j].Timestamp)87})88for _, e := range received {89require.Equal(t, model.LabelValue("cloudflare"), e.Labels["job"])90}91require.WithinDuration(t, time.Now(), received[0].Timestamp, time.Minute) // no timestamp default to now.92require.Equal(t, `{"EdgeRequestHost":"fuzz.com"}`, received[0].Line)9394require.Equal(t, `{"EdgeStartTimestamp":3, "EdgeRequestHost":"buzz.com"}`, received[1].Line)95require.Equal(t, time.Unix(0, 3), received[1].Timestamp)96require.Equal(t, `{"EdgeStartTimestamp":2, "EdgeRequestHost":"bar.com"}`, received[2].Line)97require.Equal(t, time.Unix(0, 2), received[2].Timestamp)98require.Equal(t, `{"EdgeStartTimestamp":1, "EdgeRequestHost":"foo.com"}`, received[3].Line)99require.Equal(t, time.Unix(0, 1), received[3].Timestamp)100cfClient.AssertExpectations(t)101ta.Stop()102ps.Stop()103// Make sure we save the last position.104newPos, _ := ps.Get(positions.CursorKey(cfg.ZoneID), cfg.Labels.String())105require.Greater(t, newPos, end.UnixNano())106}107108func Test_RetryErrorLogpullReceived(t *testing.T) {109var (110w = log.NewSyncWriter(os.Stderr)111logger = log.NewLogfmtLogger(w)112end = time.Unix(0, time.Hour.Nanoseconds())113start = time.Unix(0, end.Add(-30*time.Minute).UnixNano())114client = fake.NewClient(func() {})115cfClient = newFakeCloudflareClient()116)117cfClient.On("LogpullReceived", mock.Anything, start, end).Return(&fakeLogIterator{118err: ErrorLogpullReceived,119}, nil).Times(2) // just retry once120// replace the client121getClient = func(apiKey, zoneID string, fields []string) (Client, error) {122return cfClient, nil123}124defaultBackoff.MinBackoff = 0125defaultBackoff.MaxBackoff = 5126ta := &Target{127logger: logger,128handler: client,129client: cfClient,130config: &Config{131Labels: make(model.LabelSet),132},133metrics: NewMetrics(nil),134}135136require.NoError(t, ta.pull(context.Background(), start, end))137}138139func Test_RetryErrorIterating(t *testing.T) {140var (141w = log.NewSyncWriter(os.Stderr)142logger = log.NewLogfmtLogger(w)143end = time.Unix(0, time.Hour.Nanoseconds())144start = time.Unix(0, end.Add(-30*time.Minute).UnixNano())145client = fake.NewClient(func() {})146cfClient = newFakeCloudflareClient()147)148cfClient.On("LogpullReceived", mock.Anything, start, end).Return(&fakeLogIterator{149logs: []string{150`{"EdgeStartTimestamp":1, "EdgeRequestHost":"foo.com"}`,151`error`,152},153}, nil).Once()154// setup response for the first pull batch of 1 minutes.155cfClient.On("LogpullReceived", mock.Anything, start, end).Return(&fakeLogIterator{156logs: []string{157`{"EdgeStartTimestamp":1, "EdgeRequestHost":"foo.com"}`,158`{"EdgeStartTimestamp":2, "EdgeRequestHost":"foo.com"}`,159`{"EdgeStartTimestamp":3, "EdgeRequestHost":"foo.com"}`,160},161}, nil).Once()162cfClient.On("LogpullReceived", mock.Anything, start, end).Return(&fakeLogIterator{163err: ErrorLogpullReceived,164}, nil).Once()165// replace the client.166getClient = func(apiKey, zoneID string, fields []string) (Client, error) {167return cfClient, nil168}169// retries as fast as possible.170defaultBackoff.MinBackoff = 0171defaultBackoff.MaxBackoff = 0172metrics := NewMetrics(prometheus.NewRegistry())173ta := &Target{174logger: logger,175handler: client,176client: cfClient,177config: &Config{178Labels: make(model.LabelSet),179},180metrics: metrics,181}182183require.NoError(t, ta.pull(context.Background(), start, end))184require.Eventually(t, func() bool {185return len(client.Received()) == 4186}, 5*time.Second, 100*time.Millisecond)187}188189func Test_CloudflareTargetError(t *testing.T) {190var (191w = log.NewSyncWriter(os.Stderr)192logger = log.NewLogfmtLogger(w)193cfg = &Config{194APIToken: "foo",195ZoneID: "bar",196Labels: model.LabelSet{"job": "cloudflare"},197PullRange: model.Duration(time.Minute),198FieldsType: string(FieldsTypeDefault),199Workers: 3,200}201end = time.Unix(0, time.Hour.Nanoseconds())202client = fake.NewClient(func() {})203cfClient = newFakeCloudflareClient()204)205ps, err := positions.New(logger, positions.Config{206SyncPeriod: 10 * time.Second,207PositionsFile: t.TempDir() + "/positions.yml",208})209// retries as fast as possible.210defaultBackoff.MinBackoff = 0211defaultBackoff.MaxBackoff = 0212213// set our end time to be the last time we have a position214ps.Put(positions.CursorKey(cfg.ZoneID), cfg.Labels.String(), end.UnixNano())215require.NoError(t, err)216217// setup errors for all retries218cfClient.On("LogpullReceived", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("no logs"))219// replace the client.220getClient = func(apiKey, zoneID string, fields []string) (Client, error) {221return cfClient, nil222}223224ta, err := NewTarget(NewMetrics(prometheus.NewRegistry()), logger, client, ps, cfg)225require.NoError(t, err)226require.True(t, ta.Ready())227228// wait for the target to be stopped.229require.Eventually(t, func() bool {230return !ta.Ready()231}, 5*time.Second, 100*time.Millisecond)232233require.Len(t, client.Received(), 0)234require.GreaterOrEqual(t, cfClient.CallCount(), 5)235require.NotEmpty(t, ta.Details()["error"])236ta.Stop()237ps.Stop()238239// Make sure we save the last position.240newEnd, _ := ps.Get(positions.CursorKey(cfg.ZoneID), cfg.Labels.String())241require.Equal(t, newEnd, end.UnixNano())242}243244func Test_CloudflareTargetError168h(t *testing.T) {245var (246w = log.NewSyncWriter(os.Stderr)247logger = log.NewLogfmtLogger(w)248cfg = &Config{249APIToken: "foo",250ZoneID: "bar",251Labels: model.LabelSet{"job": "cloudflare"},252PullRange: model.Duration(time.Minute),253FieldsType: string(FieldsTypeDefault),254Workers: 3,255}256end = time.Unix(0, time.Hour.Nanoseconds())257client = fake.NewClient(func() {})258cfClient = newFakeCloudflareClient()259)260ps, err := positions.New(logger, positions.Config{261SyncPeriod: 10 * time.Second,262PositionsFile: t.TempDir() + "/positions.yml",263})264// retries as fast as possible.265defaultBackoff.MinBackoff = 0266defaultBackoff.MaxBackoff = 0267268// set our end time to be the last time we have a position269ps.Put(positions.CursorKey(cfg.ZoneID), cfg.Labels.String(), end.UnixNano())270require.NoError(t, err)271272// setup errors for all retries273cfClient.On("LogpullReceived", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("HTTP status 400: bad query: error parsing time: invalid time range: too early: logs older than 168h0m0s are not available"))274// replace the client.275getClient = func(apiKey, zoneID string, fields []string) (Client, error) {276return cfClient, nil277}278279ta, err := NewTarget(NewMetrics(prometheus.NewRegistry()), logger, client, ps, cfg)280require.NoError(t, err)281require.True(t, ta.Ready())282283// wait for the target to be stopped.284require.Eventually(t, func() bool {285return cfClient.CallCount() >= 5286}, 5*time.Second, 100*time.Millisecond)287288require.Len(t, client.Received(), 0)289require.GreaterOrEqual(t, cfClient.CallCount(), 5)290ta.Stop()291ps.Stop()292293// Make sure we move on from the save the last position.294newEnd, _ := ps.Get(positions.CursorKey(cfg.ZoneID), cfg.Labels.String())295require.Greater(t, newEnd, end.UnixNano())296}297298func Test_splitRequests(t *testing.T) {299tests := []struct {300start time.Time301end time.Time302want []pullRequest303}{304// perfectly divisible305{306time.Unix(0, 0),307time.Unix(0, int64(time.Minute)),308[]pullRequest{309{start: time.Unix(0, 0), end: time.Unix(0, int64(time.Minute/3))},310{start: time.Unix(0, int64(time.Minute/3)), end: time.Unix(0, int64(time.Minute*2/3))},311{start: time.Unix(0, int64(time.Minute*2/3)), end: time.Unix(0, int64(time.Minute))},312},313},314// not divisible315{316time.Unix(0, 0),317time.Unix(0, int64(time.Minute+1)),318[]pullRequest{319{start: time.Unix(0, 0), end: time.Unix(0, int64(time.Minute/3))},320{start: time.Unix(0, int64(time.Minute/3)), end: time.Unix(0, int64(time.Minute*2/3))},321{start: time.Unix(0, int64(time.Minute*2/3)), end: time.Unix(0, int64(time.Minute+1))},322},323},324}325for _, tt := range tests {326t.Run("", func(t *testing.T) {327got := splitRequests(tt.start, tt.end, 3)328if !assert.Equal(t, tt.want, got) {329for i := range got {330if !assert.Equal(t, tt.want[i].start, got[i].start) {331t.Logf("expected i:%d start: %d , got: %d", i, tt.want[i].start.UnixNano(), got[i].start.UnixNano())332}333if !assert.Equal(t, tt.want[i].end, got[i].end) {334t.Logf("expected i:%d end: %d , got: %d", i, tt.want[i].end.UnixNano(), got[i].end.UnixNano())335}336}337}338})339}340}341342343