Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/api/api_test.go
4096 views
1
package api
2
3
import (
4
"context"
5
"fmt"
6
"net/http"
7
"testing"
8
"time"
9
10
"github.com/phayes/freeport"
11
12
"github.com/grafana/agent/component"
13
"github.com/grafana/agent/component/common/loki"
14
"github.com/grafana/agent/component/common/loki/client"
15
"github.com/grafana/agent/component/common/loki/client/fake"
16
"github.com/grafana/agent/component/common/net"
17
"github.com/grafana/agent/component/common/relabel"
18
"github.com/grafana/agent/pkg/util"
19
"github.com/grafana/dskit/flagext"
20
"github.com/grafana/loki/pkg/logproto"
21
"github.com/grafana/regexp"
22
"github.com/prometheus/client_golang/prometheus"
23
"github.com/prometheus/common/model"
24
"github.com/stretchr/testify/assert"
25
"github.com/stretchr/testify/require"
26
)
27
28
func TestLokiSourceAPI_Simple(t *testing.T) {
29
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
30
defer cancel()
31
32
receiver := fake.NewClient(func() {})
33
defer receiver.Stop()
34
35
args := testArgsWith(t, func(a *Arguments) {
36
a.Server.HTTP.ListenPort = 8532
37
a.ForwardTo = []loki.LogsReceiver{receiver.LogsReceiver()}
38
a.UseIncomingTimestamp = true
39
})
40
opts := defaultOptions(t)
41
_, shutdown := startTestComponent(t, opts, args, ctx)
42
defer shutdown()
43
44
lokiClient := newTestLokiClient(t, args, opts)
45
defer lokiClient.Stop()
46
47
now := time.Now()
48
select {
49
case lokiClient.Chan() <- loki.Entry{
50
Labels: map[model.LabelName]model.LabelValue{"source": "test"},
51
Entry: logproto.Entry{Timestamp: now, Line: "hello world!"},
52
}:
53
case <-ctx.Done():
54
t.Fatalf("timed out while sending test entries via loki client")
55
}
56
57
require.Eventually(
58
t,
59
func() bool { return len(receiver.Received()) == 1 },
60
5*time.Second,
61
10*time.Millisecond,
62
"did not receive the forwarded message within the timeout",
63
)
64
received := receiver.Received()[0]
65
assert.Equal(t, received.Line, "hello world!")
66
assert.Equal(t, received.Timestamp.Unix(), now.Unix())
67
assert.Equal(t, received.Labels, model.LabelSet{
68
"source": "test",
69
"foo": "bar",
70
"fizz": "buzz",
71
})
72
}
73
74
func TestLokiSourceAPI_Update(t *testing.T) {
75
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
76
defer cancel()
77
78
receiver := fake.NewClient(func() {})
79
defer receiver.Stop()
80
81
args := testArgsWith(t, func(a *Arguments) {
82
a.Server.HTTP.ListenPort = 8583
83
a.ForwardTo = []loki.LogsReceiver{receiver.LogsReceiver()}
84
a.UseIncomingTimestamp = true
85
a.Labels = map[string]string{"test_label": "before"}
86
})
87
opts := defaultOptions(t)
88
c, shutdown := startTestComponent(t, opts, args, ctx)
89
defer shutdown()
90
91
lokiClient := newTestLokiClient(t, args, opts)
92
defer lokiClient.Stop()
93
94
now := time.Now()
95
select {
96
case lokiClient.Chan() <- loki.Entry{
97
Labels: map[model.LabelName]model.LabelValue{"source": "test"},
98
Entry: logproto.Entry{Timestamp: now, Line: "hello world!"},
99
}:
100
case <-ctx.Done():
101
t.Fatalf("timed out while sending test entries via loki client")
102
}
103
104
require.Eventually(
105
t,
106
func() bool { return len(receiver.Received()) == 1 },
107
5*time.Second,
108
10*time.Millisecond,
109
"did not receive the forwarded message within the timeout",
110
)
111
received := receiver.Received()[0]
112
assert.Equal(t, received.Line, "hello world!")
113
assert.Equal(t, received.Timestamp.Unix(), now.Unix())
114
assert.Equal(t, received.Labels, model.LabelSet{
115
"test_label": "before",
116
"source": "test",
117
})
118
119
args.Labels = map[string]string{"test_label": "after"}
120
err := c.Update(args)
121
require.NoError(t, err)
122
123
receiver.Clear()
124
125
select {
126
case lokiClient.Chan() <- loki.Entry{
127
Labels: map[model.LabelName]model.LabelValue{"source": "test"},
128
Entry: logproto.Entry{Timestamp: now, Line: "hello brave new world!"},
129
}:
130
case <-ctx.Done():
131
t.Fatalf("timed out while sending test entries via loki client")
132
}
133
require.Eventually(
134
t,
135
func() bool { return len(receiver.Received()) == 1 },
136
5*time.Second,
137
10*time.Millisecond,
138
"did not receive the forwarded message within the timeout",
139
)
140
received = receiver.Received()[0]
141
assert.Equal(t, received.Line, "hello brave new world!")
142
assert.Equal(t, received.Timestamp.Unix(), now.Unix())
143
assert.Equal(t, received.Labels, model.LabelSet{
144
"test_label": "after",
145
"source": "test",
146
})
147
}
148
149
func TestLokiSourceAPI_FanOut(t *testing.T) {
150
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
151
defer cancel()
152
153
const receiversCount = 10
154
var receivers = make([]*fake.Client, receiversCount)
155
for i := 0; i < receiversCount; i++ {
156
receivers[i] = fake.NewClient(func() {})
157
}
158
159
args := testArgsWith(t, func(a *Arguments) {
160
a.Server.HTTP.ListenPort = 8537
161
a.ForwardTo = mapToChannels(receivers)
162
})
163
opts := defaultOptions(t)
164
165
comp, err := New(opts, args)
166
require.NoError(t, err)
167
go func() {
168
err := comp.Run(ctx)
169
require.NoError(t, err)
170
}()
171
172
lokiClient := newTestLokiClient(t, args, opts)
173
defer lokiClient.Stop()
174
175
const messagesCount = 100
176
for i := 0; i < messagesCount; i++ {
177
entry := loki.Entry{
178
Labels: map[model.LabelName]model.LabelValue{"source": "test"},
179
Entry: logproto.Entry{Line: fmt.Sprintf("test message #%d", i)},
180
}
181
select {
182
case lokiClient.Chan() <- entry:
183
case <-ctx.Done():
184
t.Log("timed out while sending test entries via loki client")
185
}
186
}
187
188
require.Eventually(
189
t,
190
func() bool {
191
for i := 0; i < receiversCount; i++ {
192
if len(receivers[i].Received()) != messagesCount {
193
return false
194
}
195
}
196
return true
197
},
198
5*time.Second,
199
10*time.Millisecond,
200
"did not receive all the expected messages within the timeout",
201
)
202
}
203
204
func TestComponent_detectsWhenUpdateRequiresARestart(t *testing.T) {
205
httpPort := getFreePort(t)
206
grpcPort := getFreePort(t)
207
tests := []struct {
208
name string
209
args Arguments
210
newArgs Arguments
211
restartRequired bool
212
}{
213
{
214
name: "identical args don't require server restart",
215
args: testArgsWithPorts(httpPort, grpcPort),
216
newArgs: testArgsWithPorts(httpPort, grpcPort),
217
restartRequired: false,
218
},
219
{
220
name: "change in address requires server restart",
221
args: testArgsWithPorts(httpPort, grpcPort),
222
newArgs: testArgsWith(t, func(args *Arguments) {
223
args.Server.HTTP.ListenAddress = "localhost"
224
args.Server.HTTP.ListenPort = httpPort
225
args.Server.GRPC.ListenPort = grpcPort
226
}),
227
restartRequired: true,
228
},
229
{
230
name: "change in port requires server restart",
231
args: testArgsWithPorts(httpPort, grpcPort),
232
newArgs: testArgsWithPorts(getFreePort(t), grpcPort),
233
restartRequired: true,
234
},
235
{
236
name: "change in forwardTo does not require server restart",
237
args: testArgsWithPorts(httpPort, grpcPort),
238
newArgs: testArgsWith(t, func(args *Arguments) {
239
args.ForwardTo = []loki.LogsReceiver{}
240
args.Server.HTTP.ListenPort = httpPort
241
args.Server.GRPC.ListenPort = grpcPort
242
}),
243
restartRequired: false,
244
},
245
{
246
name: "change in labels does not require server restart",
247
args: testArgsWithPorts(httpPort, grpcPort),
248
newArgs: testArgsWith(t, func(args *Arguments) {
249
args.Labels = map[string]string{"some": "label"}
250
args.Server.HTTP.ListenPort = httpPort
251
args.Server.GRPC.ListenPort = grpcPort
252
}),
253
restartRequired: false,
254
},
255
{
256
name: "change in relabel rules does not require server restart",
257
args: testArgsWithPorts(httpPort, grpcPort),
258
newArgs: testArgsWith(t, func(args *Arguments) {
259
args.RelabelRules = relabel.Rules{}
260
args.Server.HTTP.ListenPort = httpPort
261
args.Server.GRPC.ListenPort = grpcPort
262
}),
263
restartRequired: false,
264
},
265
{
266
name: "change in use incoming timestamp does not require server restart",
267
args: testArgsWithPorts(httpPort, grpcPort),
268
newArgs: testArgsWith(t, func(args *Arguments) {
269
args.UseIncomingTimestamp = !args.UseIncomingTimestamp
270
args.Server.HTTP.ListenPort = httpPort
271
args.Server.GRPC.ListenPort = grpcPort
272
}),
273
restartRequired: false,
274
},
275
}
276
for _, tc := range tests {
277
t.Run(tc.name, func(t *testing.T) {
278
comp, err := New(
279
defaultOptions(t),
280
tc.args,
281
)
282
require.NoError(t, err)
283
284
c, ok := comp.(*Component)
285
require.True(t, ok)
286
287
// in order to cleanly update, we want to make sure the server is running first.
288
waitForServerToBeReady(t, c)
289
290
serverBefore := c.server
291
err = c.Update(tc.newArgs)
292
require.NoError(t, err)
293
294
restarted := serverBefore != c.server
295
assert.Equal(t, restarted, tc.restartRequired)
296
297
// in order to cleanly shutdown, we want to make sure the server is running first.
298
waitForServerToBeReady(t, c)
299
c.stop()
300
})
301
}
302
}
303
304
func TestDefaultServerConfig(t *testing.T) {
305
args := testArgs(t)
306
args.Server = nil // user did not define server options
307
308
comp, err := New(
309
defaultOptions(t),
310
args,
311
)
312
require.NoError(t, err)
313
314
c, ok := comp.(*Component)
315
require.True(t, ok)
316
317
require.Eventuallyf(t, func() bool {
318
resp, err := http.Get(fmt.Sprintf(
319
"http://%v:%d/wrong/url",
320
"localhost",
321
net.DefaultHTTPPort,
322
))
323
return err == nil && resp.StatusCode == 404
324
}, 5*time.Second, 20*time.Millisecond, "server failed to start before timeout")
325
326
c.stop()
327
}
328
329
func startTestComponent(
330
t *testing.T,
331
opts component.Options,
332
args Arguments,
333
ctx context.Context,
334
) (component.Component, func()) {
335
336
comp, err := New(opts, args)
337
require.NoError(t, err)
338
go func() {
339
err := comp.Run(ctx)
340
require.NoError(t, err)
341
}()
342
343
c, ok := comp.(*Component)
344
require.True(t, ok)
345
346
return comp, func() {
347
// in order to cleanly shutdown, we want to make sure the server is running first.
348
waitForServerToBeReady(t, c)
349
c.stop()
350
}
351
}
352
353
func waitForServerToBeReady(t *testing.T, comp *Component) {
354
require.Eventuallyf(t, func() bool {
355
resp, err := http.Get(fmt.Sprintf(
356
"http://%v:%d/wrong/url",
357
comp.server.ServerConfig().HTTP.ListenAddress,
358
comp.server.ServerConfig().HTTP.ListenPort,
359
))
360
return err == nil && resp.StatusCode == 404
361
}, 5*time.Second, 20*time.Millisecond, "server failed to start before timeout")
362
}
363
364
func mapToChannels(clients []*fake.Client) []loki.LogsReceiver {
365
channels := make([]loki.LogsReceiver, len(clients))
366
for i := 0; i < len(clients); i++ {
367
channels[i] = clients[i].LogsReceiver()
368
}
369
return channels
370
}
371
372
func newTestLokiClient(t *testing.T, args Arguments, opts component.Options) client.Client {
373
url := flagext.URLValue{}
374
err := url.Set(fmt.Sprintf(
375
"http://%s:%d/api/v1/push",
376
args.Server.HTTP.ListenAddress,
377
args.Server.HTTP.ListenPort,
378
))
379
require.NoError(t, err)
380
381
lokiClient, err := client.New(
382
client.NewMetrics(nil, nil),
383
client.Config{
384
URL: url,
385
Timeout: 5 * time.Second,
386
},
387
[]string{},
388
0,
389
opts.Logger,
390
)
391
require.NoError(t, err)
392
return lokiClient
393
}
394
395
func defaultOptions(t *testing.T) component.Options {
396
return component.Options{
397
ID: "loki.source.api.test",
398
Logger: util.TestFlowLogger(t),
399
Registerer: prometheus.NewRegistry(),
400
}
401
}
402
403
func testArgsWith(t *testing.T, mutator func(arguments *Arguments)) Arguments {
404
a := testArgs(t)
405
mutator(&a)
406
return a
407
}
408
409
func testArgs(t *testing.T) Arguments {
410
return testArgsWithPorts(getFreePort(t), getFreePort(t))
411
}
412
413
func testArgsWithPorts(httpPort int, grpcPort int) Arguments {
414
return Arguments{
415
Server: &net.ServerConfig{
416
HTTP: &net.HTTPConfig{
417
ListenAddress: "127.0.0.1",
418
ListenPort: httpPort,
419
},
420
GRPC: &net.GRPCConfig{
421
ListenAddress: "127.0.0.1",
422
ListenPort: grpcPort,
423
},
424
},
425
ForwardTo: []loki.LogsReceiver{make(chan loki.Entry), make(chan loki.Entry)},
426
Labels: map[string]string{"foo": "bar", "fizz": "buzz"},
427
RelabelRules: relabel.Rules{
428
{
429
SourceLabels: []string{"tag"},
430
Regex: relabel.Regexp{Regexp: regexp.MustCompile("ignore")},
431
Action: relabel.Drop,
432
},
433
},
434
UseIncomingTimestamp: false,
435
}
436
}
437
438
func getFreePort(t *testing.T) int {
439
port, err := freeport.GetFreePort()
440
require.NoError(t, err)
441
return port
442
}
443
444