Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/internal/kafkatarget/consumer_test.go
4096 views
1
package kafkatarget
2
3
// This code is copied from Promtail. The kafkatarget package is used to
4
// configure and run the targets that can read kafka entries and forward them
5
// to other loki components.
6
7
import (
8
"context"
9
"errors"
10
"testing"
11
"time"
12
13
"github.com/Shopify/sarama"
14
"github.com/go-kit/log"
15
"github.com/prometheus/common/model"
16
"github.com/stretchr/testify/require"
17
18
"github.com/grafana/loki/clients/pkg/promtail/targets/target"
19
)
20
21
type DiscovererFn func(sarama.ConsumerGroupSession, sarama.ConsumerGroupClaim) (RunnableTarget, error)
22
23
func (d DiscovererFn) NewTarget(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) (RunnableTarget, error) {
24
return d(session, claim)
25
}
26
27
type fakeTarget struct {
28
ctx context.Context
29
lbs model.LabelSet
30
}
31
32
func (f *fakeTarget) run() { <-f.ctx.Done() }
33
func (f *fakeTarget) Type() target.TargetType { return "" }
34
func (f *fakeTarget) DiscoveredLabels() model.LabelSet { return nil }
35
func (f *fakeTarget) Labels() model.LabelSet { return f.lbs }
36
func (f *fakeTarget) Ready() bool { return true }
37
func (f *fakeTarget) Details() interface{} { return nil }
38
39
func Test_ConsumerConsume(t *testing.T) {
40
var (
41
group = &testConsumerGroupHandler{}
42
session = &testSession{}
43
ctx, cancel = context.WithCancel(context.Background())
44
c = &consumer{
45
logger: log.NewNopLogger(),
46
ctx: context.Background(),
47
cancel: func() {},
48
ConsumerGroup: group,
49
discoverer: DiscovererFn(func(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) (RunnableTarget, error) {
50
if claim.Topic() != "dropped" {
51
return &fakeTarget{
52
ctx: ctx,
53
lbs: model.LabelSet{"topic": model.LabelValue(claim.Topic())},
54
}, nil
55
}
56
return &fakeTarget{
57
ctx: ctx,
58
}, nil
59
}),
60
}
61
)
62
63
c.start(ctx, []string{"foo"})
64
require.Eventually(t, group.consuming.Load, 5*time.Second, 100*time.Microsecond)
65
require.NoError(t, group.handler.Setup(session))
66
go func() {
67
err := group.handler.ConsumeClaim(session, newTestClaim("foo", 1, 2))
68
require.NoError(t, err)
69
}()
70
go func() {
71
err := group.handler.ConsumeClaim(session, newTestClaim("bar", 1, 2))
72
require.NoError(t, err)
73
}()
74
go func() {
75
err := group.handler.ConsumeClaim(session, newTestClaim("dropped", 1, 2))
76
require.NoError(t, err)
77
}()
78
require.Eventually(t, func() bool {
79
return len(c.getActiveTargets()) == 2
80
}, 2*time.Second, 100*time.Millisecond)
81
require.Eventually(t, func() bool {
82
return len(c.getDroppedTargets()) == 1
83
}, 2*time.Second, 100*time.Millisecond)
84
err := group.handler.Cleanup(session)
85
require.NoError(t, err)
86
cancel()
87
c.stop()
88
}
89
90
func Test_ConsumerRetry(t *testing.T) {
91
var (
92
group = &testConsumerGroupHandler{
93
returnErr: errors.New("foo"),
94
}
95
ctx, cancel = context.WithCancel(context.Background())
96
c = &consumer{
97
logger: log.NewNopLogger(),
98
ctx: context.Background(),
99
cancel: func() {},
100
ConsumerGroup: group,
101
}
102
)
103
defer cancel()
104
c.start(ctx, []string{"foo"})
105
<-time.After(2 * time.Second)
106
c.stop()
107
}
108
109