Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/cloudflare/internal/cloudflaretarget/target_test.go
4097 views
1
package cloudflaretarget
2
3
// This code is copied from Promtail. The cloudflaretarget package is used to
4
// configure and run a target that can read from the Cloudflare Logpull API and
5
// forward entries to other loki components.
6
7
import (
8
"context"
9
"errors"
10
"os"
11
"sort"
12
"testing"
13
"time"
14
15
"github.com/grafana/agent/component/common/loki/client/fake"
16
17
"github.com/go-kit/log"
18
"github.com/grafana/agent/component/common/loki/positions"
19
"github.com/prometheus/client_golang/prometheus"
20
"github.com/prometheus/common/model"
21
"github.com/stretchr/testify/assert"
22
"github.com/stretchr/testify/mock"
23
"github.com/stretchr/testify/require"
24
)
25
26
func Test_CloudflareTarget(t *testing.T) {
27
var (
28
w = log.NewSyncWriter(os.Stderr)
29
logger = log.NewLogfmtLogger(w)
30
cfg = &Config{
31
APIToken: "foo",
32
ZoneID: "bar",
33
Labels: model.LabelSet{"job": "cloudflare"},
34
PullRange: model.Duration(time.Minute),
35
FieldsType: string(FieldsTypeDefault),
36
Workers: 3,
37
}
38
end = time.Unix(0, time.Hour.Nanoseconds())
39
start = time.Unix(0, time.Hour.Nanoseconds()-int64(cfg.PullRange))
40
client = fake.NewClient(func() {})
41
cfClient = newFakeCloudflareClient()
42
)
43
ps, err := positions.New(logger, positions.Config{
44
SyncPeriod: 10 * time.Second,
45
PositionsFile: t.TempDir() + "/positions.yml",
46
})
47
// set our end time to be the last time we have a position
48
ps.Put(positions.CursorKey(cfg.ZoneID), cfg.Labels.String(), end.UnixNano())
49
require.NoError(t, err)
50
51
// setup response for the first pull batch of 1 minutes.
52
cfClient.On("LogpullReceived", mock.Anything, start, start.Add(time.Duration(cfg.PullRange/3))).Return(&fakeLogIterator{
53
logs: []string{
54
`{"EdgeStartTimestamp":1, "EdgeRequestHost":"foo.com"}`,
55
},
56
}, nil)
57
cfClient.On("LogpullReceived", mock.Anything, start.Add(time.Duration(cfg.PullRange/3)), start.Add(time.Duration(2*cfg.PullRange/3))).Return(&fakeLogIterator{
58
logs: []string{
59
`{"EdgeStartTimestamp":2, "EdgeRequestHost":"bar.com"}`,
60
},
61
}, nil)
62
cfClient.On("LogpullReceived", mock.Anything, start.Add(time.Duration(2*cfg.PullRange/3)), end).Return(&fakeLogIterator{
63
logs: []string{
64
`{"EdgeStartTimestamp":3, "EdgeRequestHost":"buzz.com"}`,
65
`{"EdgeRequestHost":"fuzz.com"}`,
66
},
67
}, nil)
68
// setup empty response for the rest.
69
cfClient.On("LogpullReceived", mock.Anything, mock.Anything, mock.Anything).Return(&fakeLogIterator{
70
logs: []string{},
71
}, nil)
72
// replace the client.
73
getClient = func(apiKey, zoneID string, fields []string) (Client, error) {
74
return cfClient, nil
75
}
76
77
ta, err := NewTarget(NewMetrics(prometheus.NewRegistry()), logger, client, ps, cfg)
78
require.NoError(t, err)
79
require.True(t, ta.Ready())
80
81
require.Eventually(t, func() bool {
82
return len(client.Received()) == 4
83
}, 5*time.Second, 100*time.Millisecond)
84
85
received := client.Received()
86
sort.Slice(received, func(i, j int) bool {
87
return received[i].Timestamp.After(received[j].Timestamp)
88
})
89
for _, e := range received {
90
require.Equal(t, model.LabelValue("cloudflare"), e.Labels["job"])
91
}
92
require.WithinDuration(t, time.Now(), received[0].Timestamp, time.Minute) // no timestamp default to now.
93
require.Equal(t, `{"EdgeRequestHost":"fuzz.com"}`, received[0].Line)
94
95
require.Equal(t, `{"EdgeStartTimestamp":3, "EdgeRequestHost":"buzz.com"}`, received[1].Line)
96
require.Equal(t, time.Unix(0, 3), received[1].Timestamp)
97
require.Equal(t, `{"EdgeStartTimestamp":2, "EdgeRequestHost":"bar.com"}`, received[2].Line)
98
require.Equal(t, time.Unix(0, 2), received[2].Timestamp)
99
require.Equal(t, `{"EdgeStartTimestamp":1, "EdgeRequestHost":"foo.com"}`, received[3].Line)
100
require.Equal(t, time.Unix(0, 1), received[3].Timestamp)
101
cfClient.AssertExpectations(t)
102
ta.Stop()
103
ps.Stop()
104
// Make sure we save the last position.
105
newPos, _ := ps.Get(positions.CursorKey(cfg.ZoneID), cfg.Labels.String())
106
require.Greater(t, newPos, end.UnixNano())
107
}
108
109
func Test_RetryErrorLogpullReceived(t *testing.T) {
110
var (
111
w = log.NewSyncWriter(os.Stderr)
112
logger = log.NewLogfmtLogger(w)
113
end = time.Unix(0, time.Hour.Nanoseconds())
114
start = time.Unix(0, end.Add(-30*time.Minute).UnixNano())
115
client = fake.NewClient(func() {})
116
cfClient = newFakeCloudflareClient()
117
)
118
cfClient.On("LogpullReceived", mock.Anything, start, end).Return(&fakeLogIterator{
119
err: ErrorLogpullReceived,
120
}, nil).Times(2) // just retry once
121
// replace the client
122
getClient = func(apiKey, zoneID string, fields []string) (Client, error) {
123
return cfClient, nil
124
}
125
defaultBackoff.MinBackoff = 0
126
defaultBackoff.MaxBackoff = 5
127
ta := &Target{
128
logger: logger,
129
handler: client,
130
client: cfClient,
131
config: &Config{
132
Labels: make(model.LabelSet),
133
},
134
metrics: NewMetrics(nil),
135
}
136
137
require.NoError(t, ta.pull(context.Background(), start, end))
138
}
139
140
func Test_RetryErrorIterating(t *testing.T) {
141
var (
142
w = log.NewSyncWriter(os.Stderr)
143
logger = log.NewLogfmtLogger(w)
144
end = time.Unix(0, time.Hour.Nanoseconds())
145
start = time.Unix(0, end.Add(-30*time.Minute).UnixNano())
146
client = fake.NewClient(func() {})
147
cfClient = newFakeCloudflareClient()
148
)
149
cfClient.On("LogpullReceived", mock.Anything, start, end).Return(&fakeLogIterator{
150
logs: []string{
151
`{"EdgeStartTimestamp":1, "EdgeRequestHost":"foo.com"}`,
152
`error`,
153
},
154
}, nil).Once()
155
// setup response for the first pull batch of 1 minutes.
156
cfClient.On("LogpullReceived", mock.Anything, start, end).Return(&fakeLogIterator{
157
logs: []string{
158
`{"EdgeStartTimestamp":1, "EdgeRequestHost":"foo.com"}`,
159
`{"EdgeStartTimestamp":2, "EdgeRequestHost":"foo.com"}`,
160
`{"EdgeStartTimestamp":3, "EdgeRequestHost":"foo.com"}`,
161
},
162
}, nil).Once()
163
cfClient.On("LogpullReceived", mock.Anything, start, end).Return(&fakeLogIterator{
164
err: ErrorLogpullReceived,
165
}, nil).Once()
166
// replace the client.
167
getClient = func(apiKey, zoneID string, fields []string) (Client, error) {
168
return cfClient, nil
169
}
170
// retries as fast as possible.
171
defaultBackoff.MinBackoff = 0
172
defaultBackoff.MaxBackoff = 0
173
metrics := NewMetrics(prometheus.NewRegistry())
174
ta := &Target{
175
logger: logger,
176
handler: client,
177
client: cfClient,
178
config: &Config{
179
Labels: make(model.LabelSet),
180
},
181
metrics: metrics,
182
}
183
184
require.NoError(t, ta.pull(context.Background(), start, end))
185
require.Eventually(t, func() bool {
186
return len(client.Received()) == 4
187
}, 5*time.Second, 100*time.Millisecond)
188
}
189
190
func Test_CloudflareTargetError(t *testing.T) {
191
var (
192
w = log.NewSyncWriter(os.Stderr)
193
logger = log.NewLogfmtLogger(w)
194
cfg = &Config{
195
APIToken: "foo",
196
ZoneID: "bar",
197
Labels: model.LabelSet{"job": "cloudflare"},
198
PullRange: model.Duration(time.Minute),
199
FieldsType: string(FieldsTypeDefault),
200
Workers: 3,
201
}
202
end = time.Unix(0, time.Hour.Nanoseconds())
203
client = fake.NewClient(func() {})
204
cfClient = newFakeCloudflareClient()
205
)
206
ps, err := positions.New(logger, positions.Config{
207
SyncPeriod: 10 * time.Second,
208
PositionsFile: t.TempDir() + "/positions.yml",
209
})
210
// retries as fast as possible.
211
defaultBackoff.MinBackoff = 0
212
defaultBackoff.MaxBackoff = 0
213
214
// set our end time to be the last time we have a position
215
ps.Put(positions.CursorKey(cfg.ZoneID), cfg.Labels.String(), end.UnixNano())
216
require.NoError(t, err)
217
218
// setup errors for all retries
219
cfClient.On("LogpullReceived", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("no logs"))
220
// replace the client.
221
getClient = func(apiKey, zoneID string, fields []string) (Client, error) {
222
return cfClient, nil
223
}
224
225
ta, err := NewTarget(NewMetrics(prometheus.NewRegistry()), logger, client, ps, cfg)
226
require.NoError(t, err)
227
require.True(t, ta.Ready())
228
229
// wait for the target to be stopped.
230
require.Eventually(t, func() bool {
231
return !ta.Ready()
232
}, 5*time.Second, 100*time.Millisecond)
233
234
require.Len(t, client.Received(), 0)
235
require.GreaterOrEqual(t, cfClient.CallCount(), 5)
236
require.NotEmpty(t, ta.Details()["error"])
237
ta.Stop()
238
ps.Stop()
239
240
// Make sure we save the last position.
241
newEnd, _ := ps.Get(positions.CursorKey(cfg.ZoneID), cfg.Labels.String())
242
require.Equal(t, newEnd, end.UnixNano())
243
}
244
245
func Test_CloudflareTargetError168h(t *testing.T) {
246
var (
247
w = log.NewSyncWriter(os.Stderr)
248
logger = log.NewLogfmtLogger(w)
249
cfg = &Config{
250
APIToken: "foo",
251
ZoneID: "bar",
252
Labels: model.LabelSet{"job": "cloudflare"},
253
PullRange: model.Duration(time.Minute),
254
FieldsType: string(FieldsTypeDefault),
255
Workers: 3,
256
}
257
end = time.Unix(0, time.Hour.Nanoseconds())
258
client = fake.NewClient(func() {})
259
cfClient = newFakeCloudflareClient()
260
)
261
ps, err := positions.New(logger, positions.Config{
262
SyncPeriod: 10 * time.Second,
263
PositionsFile: t.TempDir() + "/positions.yml",
264
})
265
// retries as fast as possible.
266
defaultBackoff.MinBackoff = 0
267
defaultBackoff.MaxBackoff = 0
268
269
// set our end time to be the last time we have a position
270
ps.Put(positions.CursorKey(cfg.ZoneID), cfg.Labels.String(), end.UnixNano())
271
require.NoError(t, err)
272
273
// setup errors for all retries
274
cfClient.On("LogpullReceived", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("HTTP status 400: bad query: error parsing time: invalid time range: too early: logs older than 168h0m0s are not available"))
275
// replace the client.
276
getClient = func(apiKey, zoneID string, fields []string) (Client, error) {
277
return cfClient, nil
278
}
279
280
ta, err := NewTarget(NewMetrics(prometheus.NewRegistry()), logger, client, ps, cfg)
281
require.NoError(t, err)
282
require.True(t, ta.Ready())
283
284
// wait for the target to be stopped.
285
require.Eventually(t, func() bool {
286
return cfClient.CallCount() >= 5
287
}, 5*time.Second, 100*time.Millisecond)
288
289
require.Len(t, client.Received(), 0)
290
require.GreaterOrEqual(t, cfClient.CallCount(), 5)
291
ta.Stop()
292
ps.Stop()
293
294
// Make sure we move on from the save the last position.
295
newEnd, _ := ps.Get(positions.CursorKey(cfg.ZoneID), cfg.Labels.String())
296
require.Greater(t, newEnd, end.UnixNano())
297
}
298
299
func Test_splitRequests(t *testing.T) {
300
tests := []struct {
301
start time.Time
302
end time.Time
303
want []pullRequest
304
}{
305
// perfectly divisible
306
{
307
time.Unix(0, 0),
308
time.Unix(0, int64(time.Minute)),
309
[]pullRequest{
310
{start: time.Unix(0, 0), end: time.Unix(0, int64(time.Minute/3))},
311
{start: time.Unix(0, int64(time.Minute/3)), end: time.Unix(0, int64(time.Minute*2/3))},
312
{start: time.Unix(0, int64(time.Minute*2/3)), end: time.Unix(0, int64(time.Minute))},
313
},
314
},
315
// not divisible
316
{
317
time.Unix(0, 0),
318
time.Unix(0, int64(time.Minute+1)),
319
[]pullRequest{
320
{start: time.Unix(0, 0), end: time.Unix(0, int64(time.Minute/3))},
321
{start: time.Unix(0, int64(time.Minute/3)), end: time.Unix(0, int64(time.Minute*2/3))},
322
{start: time.Unix(0, int64(time.Minute*2/3)), end: time.Unix(0, int64(time.Minute+1))},
323
},
324
},
325
}
326
for _, tt := range tests {
327
t.Run("", func(t *testing.T) {
328
got := splitRequests(tt.start, tt.end, 3)
329
if !assert.Equal(t, tt.want, got) {
330
for i := range got {
331
if !assert.Equal(t, tt.want[i].start, got[i].start) {
332
t.Logf("expected i:%d start: %d , got: %d", i, tt.want[i].start.UnixNano(), got[i].start.UnixNano())
333
}
334
if !assert.Equal(t, tt.want[i].end, got[i].end) {
335
t.Logf("expected i:%d end: %d , got: %d", i, tt.want[i].end.UnixNano(), got[i].end.UnixNano())
336
}
337
}
338
}
339
})
340
}
341
}
342
343