Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/common/loki/client/client_test.go
4096 views
1
package client
2
3
// This code is copied from Promtail. The client package is used to configure
4
// and run the clients that can send log entries to a Loki instance.
5
6
import (
7
"io"
8
"math"
9
"net/http"
10
"net/http/httptest"
11
"net/url"
12
"strings"
13
"testing"
14
"time"
15
16
"github.com/go-kit/log"
17
"github.com/grafana/agent/component/common/loki"
18
"github.com/grafana/dskit/backoff"
19
"github.com/grafana/dskit/flagext"
20
"github.com/grafana/loki/pkg/logproto"
21
"github.com/grafana/loki/pkg/util"
22
lokiflag "github.com/grafana/loki/pkg/util/flagext"
23
"github.com/prometheus/client_golang/prometheus"
24
"github.com/prometheus/client_golang/prometheus/testutil"
25
"github.com/prometheus/common/config"
26
"github.com/prometheus/common/model"
27
"github.com/stretchr/testify/assert"
28
"github.com/stretchr/testify/require"
29
)
30
31
var logEntries = []loki.Entry{
32
{Labels: model.LabelSet{}, Entry: logproto.Entry{Timestamp: time.Unix(1, 0).UTC(), Line: "line1"}},
33
{Labels: model.LabelSet{}, Entry: logproto.Entry{Timestamp: time.Unix(2, 0).UTC(), Line: "line2"}},
34
{Labels: model.LabelSet{}, Entry: logproto.Entry{Timestamp: time.Unix(3, 0).UTC(), Line: "line3"}},
35
{Labels: model.LabelSet{"__tenant_id__": "tenant-1"}, Entry: logproto.Entry{Timestamp: time.Unix(4, 0).UTC(), Line: "line4"}},
36
{Labels: model.LabelSet{"__tenant_id__": "tenant-1"}, Entry: logproto.Entry{Timestamp: time.Unix(5, 0).UTC(), Line: "line5"}},
37
{Labels: model.LabelSet{"__tenant_id__": "tenant-2"}, Entry: logproto.Entry{Timestamp: time.Unix(6, 0).UTC(), Line: "line6"}},
38
}
39
40
type receivedReq struct {
41
tenantID string
42
pushReq logproto.PushRequest
43
}
44
45
func TestClient_Handle(t *testing.T) {
46
tests := map[string]struct {
47
clientBatchSize int
48
clientBatchWait time.Duration
49
clientMaxRetries int
50
clientTenantID string
51
serverResponseStatus int
52
inputEntries []loki.Entry
53
inputDelay time.Duration
54
expectedReqs []receivedReq
55
expectedMetrics string
56
}{
57
"batch log entries together until the batch size is reached": {
58
clientBatchSize: 10,
59
clientBatchWait: 100 * time.Millisecond,
60
clientMaxRetries: 3,
61
serverResponseStatus: 200,
62
inputEntries: []loki.Entry{logEntries[0], logEntries[1], logEntries[2]},
63
expectedReqs: []receivedReq{
64
{
65
tenantID: "",
66
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry, logEntries[1].Entry}}}},
67
},
68
{
69
tenantID: "",
70
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[2].Entry}}}},
71
},
72
},
73
expectedMetrics: `
74
# HELP loki_write_sent_entries_total Number of log entries sent to the ingester.
75
# TYPE loki_write_sent_entries_total counter
76
loki_write_sent_entries_total{host="__HOST__"} 3.0
77
# HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
78
# TYPE loki_write_dropped_entries_total counter
79
loki_write_dropped_entries_total{host="__HOST__"} 0
80
`,
81
},
82
"batch log entries together until the batch wait time is reached": {
83
clientBatchSize: 10,
84
clientBatchWait: 100 * time.Millisecond,
85
clientMaxRetries: 3,
86
serverResponseStatus: 200,
87
inputEntries: []loki.Entry{logEntries[0], logEntries[1]},
88
inputDelay: 110 * time.Millisecond,
89
expectedReqs: []receivedReq{
90
{
91
tenantID: "",
92
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},
93
},
94
{
95
tenantID: "",
96
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[1].Entry}}}},
97
},
98
},
99
expectedMetrics: `
100
# HELP loki_write_sent_entries_total Number of log entries sent to the ingester.
101
# TYPE loki_write_sent_entries_total counter
102
loki_write_sent_entries_total{host="__HOST__"} 2.0
103
# HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
104
# TYPE loki_write_dropped_entries_total counter
105
loki_write_dropped_entries_total{host="__HOST__"} 0
106
`,
107
},
108
"retry send a batch up to backoff's max retries in case the server responds with a 5xx": {
109
clientBatchSize: 10,
110
clientBatchWait: 10 * time.Millisecond,
111
clientMaxRetries: 3,
112
serverResponseStatus: 500,
113
inputEntries: []loki.Entry{logEntries[0]},
114
expectedReqs: []receivedReq{
115
{
116
tenantID: "",
117
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},
118
},
119
{
120
tenantID: "",
121
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},
122
},
123
{
124
tenantID: "",
125
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},
126
},
127
},
128
expectedMetrics: `
129
# HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
130
# TYPE loki_write_dropped_entries_total counter
131
loki_write_dropped_entries_total{host="__HOST__"} 1.0
132
# HELP loki_write_sent_entries_total Number of log entries sent to the ingester.
133
# TYPE loki_write_sent_entries_total counter
134
loki_write_sent_entries_total{host="__HOST__"} 0
135
`,
136
},
137
"do not retry send a batch in case the server responds with a 4xx": {
138
clientBatchSize: 10,
139
clientBatchWait: 10 * time.Millisecond,
140
clientMaxRetries: 3,
141
serverResponseStatus: 400,
142
inputEntries: []loki.Entry{logEntries[0]},
143
expectedReqs: []receivedReq{
144
{
145
tenantID: "",
146
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},
147
},
148
},
149
expectedMetrics: `
150
# HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
151
# TYPE loki_write_dropped_entries_total counter
152
loki_write_dropped_entries_total{host="__HOST__"} 1.0
153
# HELP loki_write_sent_entries_total Number of log entries sent to the ingester.
154
# TYPE loki_write_sent_entries_total counter
155
loki_write_sent_entries_total{host="__HOST__"} 0
156
`,
157
},
158
"do retry sending a batch in case the server responds with a 429": {
159
clientBatchSize: 10,
160
clientBatchWait: 10 * time.Millisecond,
161
clientMaxRetries: 3,
162
serverResponseStatus: 429,
163
inputEntries: []loki.Entry{logEntries[0]},
164
expectedReqs: []receivedReq{
165
{
166
tenantID: "",
167
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},
168
},
169
{
170
tenantID: "",
171
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},
172
},
173
{
174
tenantID: "",
175
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},
176
},
177
},
178
expectedMetrics: `
179
# HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
180
# TYPE loki_write_dropped_entries_total counter
181
loki_write_dropped_entries_total{host="__HOST__"} 1.0
182
# HELP loki_write_sent_entries_total Number of log entries sent to the ingester.
183
# TYPE loki_write_sent_entries_total counter
184
loki_write_sent_entries_total{host="__HOST__"} 0
185
`,
186
},
187
"batch log entries together honoring the client tenant ID": {
188
clientBatchSize: 100,
189
clientBatchWait: 100 * time.Millisecond,
190
clientMaxRetries: 3,
191
clientTenantID: "tenant-default",
192
serverResponseStatus: 200,
193
inputEntries: []loki.Entry{logEntries[0], logEntries[1]},
194
expectedReqs: []receivedReq{
195
{
196
tenantID: "tenant-default",
197
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry, logEntries[1].Entry}}}},
198
},
199
},
200
expectedMetrics: `
201
# HELP loki_write_sent_entries_total Number of log entries sent to the ingester.
202
# TYPE loki_write_sent_entries_total counter
203
loki_write_sent_entries_total{host="__HOST__"} 2.0
204
# HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
205
# TYPE loki_write_dropped_entries_total counter
206
loki_write_dropped_entries_total{host="__HOST__"} 0
207
`,
208
},
209
"batch log entries together honoring the tenant ID overridden while processing the pipeline stages": {
210
clientBatchSize: 100,
211
clientBatchWait: 100 * time.Millisecond,
212
clientMaxRetries: 3,
213
clientTenantID: "tenant-default",
214
serverResponseStatus: 200,
215
inputEntries: []loki.Entry{logEntries[0], logEntries[3], logEntries[4], logEntries[5]},
216
expectedReqs: []receivedReq{
217
{
218
tenantID: "tenant-default",
219
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},
220
},
221
{
222
tenantID: "tenant-1",
223
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[3].Entry, logEntries[4].Entry}}}},
224
},
225
{
226
tenantID: "tenant-2",
227
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[5].Entry}}}},
228
},
229
},
230
expectedMetrics: `
231
# HELP loki_write_sent_entries_total Number of log entries sent to the ingester.
232
# TYPE loki_write_sent_entries_total counter
233
loki_write_sent_entries_total{host="__HOST__"} 4.0
234
# HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
235
# TYPE loki_write_dropped_entries_total counter
236
loki_write_dropped_entries_total{host="__HOST__"} 0
237
`,
238
},
239
}
240
241
for testName, testData := range tests {
242
t.Run(testName, func(t *testing.T) {
243
reg := prometheus.NewRegistry()
244
245
// Create a buffer channel where we do enqueue received requests
246
receivedReqsChan := make(chan receivedReq, 10)
247
248
// Start a local HTTP server
249
server := httptest.NewServer(createServerHandler(receivedReqsChan, testData.serverResponseStatus))
250
require.NotNil(t, server)
251
defer server.Close()
252
253
// Get the URL at which the local test server is listening to
254
serverURL := flagext.URLValue{}
255
err := serverURL.Set(server.URL)
256
require.NoError(t, err)
257
258
// Instance the client
259
cfg := Config{
260
URL: serverURL,
261
BatchWait: testData.clientBatchWait,
262
BatchSize: testData.clientBatchSize,
263
Client: config.HTTPClientConfig{},
264
BackoffConfig: backoff.Config{MinBackoff: 1 * time.Millisecond, MaxBackoff: 2 * time.Millisecond, MaxRetries: testData.clientMaxRetries},
265
ExternalLabels: lokiflag.LabelSet{},
266
Timeout: 1 * time.Second,
267
TenantID: testData.clientTenantID,
268
}
269
270
m := NewMetrics(reg, nil)
271
c, err := New(m, cfg, nil, 0, log.NewNopLogger())
272
require.NoError(t, err)
273
274
// Send all the input log entries
275
for i, logEntry := range testData.inputEntries {
276
c.Chan() <- logEntry
277
278
if testData.inputDelay > 0 && i < len(testData.inputEntries)-1 {
279
time.Sleep(testData.inputDelay)
280
}
281
}
282
283
// Wait until the expected push requests are received (with a timeout)
284
deadline := time.Now().Add(1 * time.Second)
285
for len(receivedReqsChan) < len(testData.expectedReqs) && time.Now().Before(deadline) {
286
time.Sleep(5 * time.Millisecond)
287
}
288
289
// Stop the client: it waits until the current batch is sent
290
c.Stop()
291
close(receivedReqsChan)
292
293
// Get all push requests received on the server side
294
receivedReqs := make([]receivedReq, 0)
295
for req := range receivedReqsChan {
296
receivedReqs = append(receivedReqs, req)
297
}
298
299
// Due to implementation details (maps iteration ordering is random) we just check
300
// that the expected requests are equal to the received requests, without checking
301
// the exact order which is not guaranteed in case of multi-tenant
302
require.ElementsMatch(t, testData.expectedReqs, receivedReqs)
303
304
expectedMetrics := strings.Replace(testData.expectedMetrics, "__HOST__", serverURL.Host, -1)
305
err = testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "loki_write_sent_entries_total", "loki_write_dropped_entries_total")
306
assert.NoError(t, err)
307
})
308
}
309
}
310
311
func TestClient_StopNow(t *testing.T) {
312
cases := []struct {
313
name string
314
clientBatchSize int
315
clientBatchWait time.Duration
316
clientMaxRetries int
317
clientTenantID string
318
serverResponseStatus int
319
inputEntries []loki.Entry
320
inputDelay time.Duration
321
expectedReqs []receivedReq
322
expectedMetrics string
323
}{
324
{
325
name: "send requests shouldn't be cancelled after StopNow()",
326
clientBatchSize: 10,
327
clientBatchWait: 100 * time.Millisecond,
328
clientMaxRetries: 3,
329
serverResponseStatus: 200,
330
inputEntries: []loki.Entry{logEntries[0], logEntries[1], logEntries[2]},
331
expectedReqs: []receivedReq{
332
{
333
tenantID: "",
334
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry, logEntries[1].Entry}}}},
335
},
336
{
337
tenantID: "",
338
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[2].Entry}}}},
339
},
340
},
341
expectedMetrics: `
342
# HELP loki_write_sent_entries_total Number of log entries sent to the ingester.
343
# TYPE loki_write_sent_entries_total counter
344
loki_write_sent_entries_total{host="__HOST__"} 3.0
345
# HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
346
# TYPE loki_write_dropped_entries_total counter
347
loki_write_dropped_entries_total{host="__HOST__"} 0
348
`,
349
},
350
{
351
name: "shouldn't retry after StopNow()",
352
clientBatchSize: 10,
353
clientBatchWait: 10 * time.Millisecond,
354
clientMaxRetries: 3,
355
serverResponseStatus: 429,
356
inputEntries: []loki.Entry{logEntries[0]},
357
expectedReqs: []receivedReq{
358
{
359
tenantID: "",
360
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},
361
},
362
},
363
expectedMetrics: `
364
# HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
365
# TYPE loki_write_dropped_entries_total counter
366
loki_write_dropped_entries_total{host="__HOST__"} 1.0
367
# HELP loki_write_sent_entries_total Number of log entries sent to the ingester.
368
# TYPE loki_write_sent_entries_total counter
369
loki_write_sent_entries_total{host="__HOST__"} 0
370
`,
371
},
372
}
373
374
for _, c := range cases {
375
t.Run(c.name, func(t *testing.T) {
376
reg := prometheus.NewRegistry()
377
378
// Create a buffer channel where we do enqueue received requests
379
receivedReqsChan := make(chan receivedReq, 10)
380
381
// Start a local HTTP server
382
server := httptest.NewServer(createServerHandler(receivedReqsChan, c.serverResponseStatus))
383
require.NotNil(t, server)
384
defer server.Close()
385
386
// Get the URL at which the local test server is listening to
387
serverURL := flagext.URLValue{}
388
err := serverURL.Set(server.URL)
389
require.NoError(t, err)
390
391
// Instance the client
392
cfg := Config{
393
URL: serverURL,
394
BatchWait: c.clientBatchWait,
395
BatchSize: c.clientBatchSize,
396
Client: config.HTTPClientConfig{},
397
BackoffConfig: backoff.Config{MinBackoff: 5 * time.Second, MaxBackoff: 10 * time.Second, MaxRetries: c.clientMaxRetries},
398
ExternalLabels: lokiflag.LabelSet{},
399
Timeout: 1 * time.Second,
400
TenantID: c.clientTenantID,
401
}
402
m := NewMetrics(reg, nil)
403
cl, err := New(m, cfg, nil, 0, log.NewNopLogger())
404
require.NoError(t, err)
405
406
// Send all the input log entries
407
for i, logEntry := range c.inputEntries {
408
cl.Chan() <- logEntry
409
410
if c.inputDelay > 0 && i < len(c.inputEntries)-1 {
411
time.Sleep(c.inputDelay)
412
}
413
}
414
415
// Wait until the expected push requests are received (with a timeout)
416
deadline := time.Now().Add(1 * time.Second)
417
for len(receivedReqsChan) < len(c.expectedReqs) && time.Now().Before(deadline) {
418
time.Sleep(5 * time.Millisecond)
419
}
420
421
// StopNow should have cancelled client's ctx
422
cc := cl.(*client)
423
require.NoError(t, cc.ctx.Err())
424
425
// Stop the client: it waits until the current batch is sent
426
cl.StopNow()
427
close(receivedReqsChan)
428
429
require.Error(t, cc.ctx.Err()) // non-nil error if its cancelled.
430
431
// Get all push requests received on the server side
432
receivedReqs := make([]receivedReq, 0)
433
for req := range receivedReqsChan {
434
receivedReqs = append(receivedReqs, req)
435
}
436
437
// Due to implementation details (maps iteration ordering is random) we just check
438
// that the expected requests are equal to the received requests, without checking
439
// the exact order which is not guaranteed in case of multi-tenant
440
require.ElementsMatch(t, c.expectedReqs, receivedReqs)
441
442
expectedMetrics := strings.Replace(c.expectedMetrics, "__HOST__", serverURL.Host, -1)
443
err = testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "loki_write_sent_entries_total", "loki_write_dropped_entries_total")
444
assert.NoError(t, err)
445
})
446
}
447
}
448
449
func createServerHandler(receivedReqsChan chan receivedReq, status int) http.HandlerFunc {
450
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
451
// Parse the request
452
var pushReq logproto.PushRequest
453
if err := util.ParseProtoReader(req.Context(), req.Body, int(req.ContentLength), math.MaxInt32, &pushReq, util.RawSnappy); err != nil {
454
rw.WriteHeader(500)
455
return
456
}
457
458
receivedReqsChan <- receivedReq{
459
tenantID: req.Header.Get("X-Scope-OrgID"),
460
pushReq: pushReq,
461
}
462
463
rw.WriteHeader(status)
464
})
465
}
466
467
type RoundTripperFunc func(*http.Request) (*http.Response, error)
468
469
func (r RoundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) {
470
return r(req)
471
}
472
473
func Test_Tripperware(t *testing.T) {
474
url, err := url.Parse("http://foo.com")
475
require.NoError(t, err)
476
var called bool
477
c, err := NewWithTripperware(metrics, Config{
478
URL: flagext.URLValue{URL: url},
479
}, nil, 0, log.NewNopLogger(), func(rt http.RoundTripper) http.RoundTripper {
480
return RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
481
require.Equal(t, r.URL.String(), "http://foo.com")
482
called = true
483
return &http.Response{
484
StatusCode: http.StatusOK,
485
Body: io.NopCloser(strings.NewReader("ok")),
486
}, nil
487
})
488
})
489
require.NoError(t, err)
490
491
c.Chan() <- loki.Entry{
492
Labels: model.LabelSet{"foo": "bar"},
493
Entry: logproto.Entry{Timestamp: time.Now(), Line: "foo"},
494
}
495
c.Stop()
496
require.True(t, called)
497
}
498
499