Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/internal/kafkatarget/kafkatarget_test.go
4096 views
1
package kafkatarget
2
3
// This code is copied from Promtail. The kafkatarget package is used to
4
// configure and run the targets that can read kafka entries and forward them
5
// to other loki components.
6
7
import (
8
"context"
9
"fmt"
10
"sync"
11
"testing"
12
"time"
13
14
"github.com/grafana/agent/component/common/loki/client/fake"
15
16
"github.com/Shopify/sarama"
17
"github.com/prometheus/common/model"
18
"github.com/prometheus/prometheus/model/relabel"
19
"github.com/stretchr/testify/require"
20
"go.uber.org/atomic"
21
)
22
23
// Consumergroup handler
24
type testConsumerGroupHandler struct {
25
handler sarama.ConsumerGroupHandler
26
ctx context.Context
27
topics []string
28
29
returnErr error
30
31
consuming atomic.Bool
32
mut sync.RWMutex
33
}
34
35
func (c *testConsumerGroupHandler) Consume(ctx context.Context, topics []string, handler sarama.ConsumerGroupHandler) error {
36
if c.returnErr != nil {
37
return c.returnErr
38
}
39
40
c.mut.Lock()
41
42
c.ctx = ctx
43
c.topics = topics
44
c.handler = handler
45
46
c.mut.Unlock()
47
48
c.consuming.Store(true)
49
<-ctx.Done()
50
c.consuming.Store(false)
51
return nil
52
}
53
54
func (c *testConsumerGroupHandler) GetTopics() []string {
55
c.mut.RLock()
56
defer c.mut.RUnlock()
57
58
return c.topics
59
}
60
61
func (c *testConsumerGroupHandler) Errors() <-chan error {
62
return nil
63
}
64
65
func (c *testConsumerGroupHandler) Close() error {
66
return nil
67
}
68
69
func (c *testConsumerGroupHandler) Pause(partitions map[string][]int32) {}
70
func (c *testConsumerGroupHandler) Resume(partitions map[string][]int32) {}
71
func (c *testConsumerGroupHandler) PauseAll() {}
72
func (c *testConsumerGroupHandler) ResumeAll() {}
73
74
type testSession struct {
75
markedMessage []*sarama.ConsumerMessage
76
}
77
78
func (s *testSession) Claims() map[string][]int32 { return nil }
79
func (s *testSession) MemberID() string { return "foo" }
80
func (s *testSession) GenerationID() int32 { return 10 }
81
func (s *testSession) MarkOffset(topic string, partition int32, offset int64, metadata string) {}
82
func (s *testSession) Commit() {}
83
func (s *testSession) ResetOffset(topic string, partition int32, offset int64, metadata string) {}
84
func (s *testSession) MarkMessage(msg *sarama.ConsumerMessage, metadata string) {
85
s.markedMessage = append(s.markedMessage, msg)
86
}
87
func (s *testSession) Context() context.Context { return context.Background() }
88
89
type testClaim struct {
90
topic string
91
partition int32
92
offset int64
93
messages chan *sarama.ConsumerMessage
94
}
95
96
func newTestClaim(topic string, partition int32, offset int64) *testClaim {
97
return &testClaim{
98
topic: topic,
99
partition: partition,
100
offset: offset,
101
messages: make(chan *sarama.ConsumerMessage),
102
}
103
}
104
105
func (t *testClaim) Topic() string { return t.topic }
106
func (t *testClaim) Partition() int32 { return t.partition }
107
func (t *testClaim) InitialOffset() int64 { return t.offset }
108
func (t *testClaim) HighWaterMarkOffset() int64 { return 0 }
109
func (t *testClaim) Messages() <-chan *sarama.ConsumerMessage { return t.messages }
110
func (t *testClaim) Send(m *sarama.ConsumerMessage) {
111
t.messages <- m
112
}
113
114
func (t *testClaim) Stop() {
115
close(t.messages)
116
}
117
118
func Test_TargetRun(t *testing.T) {
119
tc := []struct {
120
name string
121
inMessageKey string
122
inLS model.LabelSet
123
inDiscoveredLS model.LabelSet
124
relabels []*relabel.Config
125
expectedLS model.LabelSet
126
}{
127
{
128
name: "no relabel config",
129
inMessageKey: "foo",
130
inDiscoveredLS: model.LabelSet{"__meta_kafka_foo": "bar"},
131
inLS: model.LabelSet{"buzz": "bazz"},
132
relabels: nil,
133
expectedLS: model.LabelSet{"buzz": "bazz"},
134
},
135
{
136
name: "message key with relabel config",
137
inMessageKey: "foo",
138
inDiscoveredLS: model.LabelSet{"__meta_kafka_foo": "bar"},
139
inLS: model.LabelSet{"buzz": "bazz"},
140
relabels: []*relabel.Config{
141
{
142
SourceLabels: model.LabelNames{"__meta_kafka_message_key"},
143
Regex: relabel.MustNewRegexp("(.*)"),
144
TargetLabel: "message_key",
145
Replacement: "$1",
146
Action: "replace",
147
},
148
},
149
expectedLS: model.LabelSet{"buzz": "bazz", "message_key": "foo"},
150
},
151
{
152
name: "no message key with relabel config",
153
inMessageKey: "",
154
inDiscoveredLS: model.LabelSet{"__meta_kafka_foo": "bar"},
155
inLS: model.LabelSet{"buzz": "bazz"},
156
relabels: []*relabel.Config{
157
{
158
SourceLabels: model.LabelNames{"__meta_kafka_message_key"},
159
Regex: relabel.MustNewRegexp("(.*)"),
160
TargetLabel: "message_key",
161
Replacement: "$1",
162
Action: "replace",
163
},
164
},
165
expectedLS: model.LabelSet{"buzz": "bazz", "message_key": "none"},
166
},
167
}
168
for _, tt := range tc {
169
t.Run(tt.name, func(t *testing.T) {
170
session, claim := &testSession{}, newTestClaim("footopic", 10, 12)
171
var closed bool
172
fc := fake.NewClient(
173
func() {
174
closed = true
175
},
176
)
177
178
tg := NewKafkaTarget(nil, session, claim, tt.inDiscoveredLS, tt.inLS, tt.relabels, fc, true, &KafkaTargetMessageParser{})
179
180
var wg sync.WaitGroup
181
wg.Add(1)
182
go func() {
183
defer wg.Done()
184
tg.run()
185
}()
186
187
for i := 0; i < 10; i++ {
188
claim.Send(&sarama.ConsumerMessage{
189
Timestamp: time.Unix(0, int64(i)),
190
Value: []byte(fmt.Sprintf("%d", i)),
191
Key: []byte(tt.inMessageKey),
192
})
193
}
194
claim.Stop()
195
wg.Wait()
196
re := fc.Received()
197
198
require.Len(t, session.markedMessage, 10)
199
require.Len(t, re, 10)
200
require.True(t, closed)
201
for _, e := range re {
202
require.Equal(t, tt.expectedLS.String(), e.Labels.String())
203
}
204
})
205
}
206
}
207
208