Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/internal/kafkatarget/target_syncer_test.go
4096 views
1
package kafkatarget
2
3
import (
4
"context"
5
"fmt"
6
"reflect"
7
"testing"
8
"time"
9
10
"github.com/grafana/agent/component/common/loki/client/fake"
11
12
"github.com/grafana/dskit/flagext"
13
"github.com/prometheus/common/config"
14
15
"github.com/Shopify/sarama"
16
"github.com/go-kit/log"
17
"github.com/prometheus/client_golang/prometheus"
18
"github.com/prometheus/common/model"
19
"github.com/prometheus/prometheus/model/relabel"
20
"github.com/stretchr/testify/assert"
21
"github.com/stretchr/testify/require"
22
)
23
24
func Test_TopicDiscovery(t *testing.T) {
25
ctx, cancel := context.WithCancel(context.Background())
26
group := &testConsumerGroupHandler{}
27
TopicPollInterval = time.Microsecond
28
var closed bool
29
client := &mockKafkaClient{
30
topics: []string{"topic1"},
31
}
32
ts := &TargetSyncer{
33
ctx: ctx,
34
cancel: cancel,
35
logger: log.NewNopLogger(),
36
reg: prometheus.DefaultRegisterer,
37
topicManager: mustNewTopicsManager(client, []string{"topic1", "topic2"}),
38
close: func() error {
39
closed = true
40
return nil
41
},
42
consumer: consumer{
43
ctx: context.Background(),
44
cancel: func() {},
45
ConsumerGroup: group,
46
logger: log.NewNopLogger(),
47
discoverer: DiscovererFn(func(s sarama.ConsumerGroupSession, c sarama.ConsumerGroupClaim) (RunnableTarget, error) {
48
return nil, nil
49
}),
50
},
51
cfg: Config{
52
RelabelConfigs: []*relabel.Config{},
53
KafkaConfig: TargetConfig{
54
UseIncomingTimestamp: true,
55
Topics: []string{"topic1", "topic2"},
56
},
57
},
58
}
59
60
ts.loop()
61
62
require.Eventually(t, func() bool {
63
group.mut.Lock()
64
if !group.consuming.Load() {
65
return false
66
}
67
group.mut.Unlock()
68
return reflect.DeepEqual([]string{"topic1"}, group.GetTopics())
69
}, time.Second, time.Millisecond, "expected topics: %v, got: %v", []string{"topic1"}, group.GetTopics())
70
71
client.UpdateTopics([]string{"topic1", "topic2"})
72
73
require.Eventually(t, func() bool {
74
group.mut.Lock()
75
if !group.consuming.Load() {
76
return false
77
}
78
group.mut.Unlock()
79
return reflect.DeepEqual([]string{"topic1", "topic2"}, group.GetTopics())
80
}, time.Second, time.Millisecond, "expected topics: %v, got: %v", []string{"topic1", "topic2"}, group.GetTopics())
81
82
require.NoError(t, ts.Stop())
83
require.True(t, closed)
84
}
85
86
func Test_NewTarget(t *testing.T) {
87
ts := &TargetSyncer{
88
logger: log.NewNopLogger(),
89
reg: prometheus.DefaultRegisterer,
90
client: fake.NewClient(func() {}),
91
cfg: Config{
92
RelabelConfigs: []*relabel.Config{
93
{
94
SourceLabels: model.LabelNames{"__meta_kafka_topic"},
95
TargetLabel: "topic",
96
Replacement: "$1",
97
Action: relabel.Replace,
98
Regex: relabel.MustNewRegexp("(.*)"),
99
},
100
},
101
KafkaConfig: TargetConfig{
102
UseIncomingTimestamp: true,
103
GroupID: "group_1",
104
Topics: []string{"topic1", "topic2"},
105
Labels: model.LabelSet{"static": "static1"},
106
},
107
},
108
}
109
tg, err := ts.NewTarget(&testSession{}, newTestClaim("foo", 10, 1))
110
111
require.NoError(t, err)
112
require.Equal(t, ConsumerDetails{
113
MemberID: "foo",
114
GenerationID: 10,
115
Topic: "foo",
116
Partition: 10,
117
InitialOffset: 1,
118
}, tg.Details())
119
require.Equal(t, model.LabelSet{"static": "static1", "topic": "foo"}, tg.Labels())
120
require.Equal(t, model.LabelSet{"__meta_kafka_member_id": "foo", "__meta_kafka_partition": "10", "__meta_kafka_topic": "foo", "__meta_kafka_group_id": "group_1"}, tg.DiscoveredLabels())
121
}
122
123
func Test_NewDroppedTarget(t *testing.T) {
124
ts := &TargetSyncer{
125
logger: log.NewNopLogger(),
126
reg: prometheus.DefaultRegisterer,
127
cfg: Config{
128
KafkaConfig: TargetConfig{
129
UseIncomingTimestamp: true,
130
GroupID: "group1",
131
Topics: []string{"topic1", "topic2"},
132
},
133
},
134
}
135
tg, err := ts.NewTarget(&testSession{}, newTestClaim("foo", 10, 1))
136
137
require.NoError(t, err)
138
require.Equal(t, "dropping target, no labels", tg.Details())
139
require.Equal(t, model.LabelSet(nil), tg.Labels())
140
require.Equal(t, model.LabelSet{"__meta_kafka_member_id": "foo", "__meta_kafka_partition": "10", "__meta_kafka_topic": "foo", "__meta_kafka_group_id": "group1"}, tg.DiscoveredLabels())
141
}
142
143
func Test_validateConfig(t *testing.T) {
144
tests := []struct {
145
cfg *Config
146
wantErr bool
147
expected *Config
148
}{
149
{
150
&Config{
151
KafkaConfig: TargetConfig{
152
GroupID: "foo",
153
Topics: []string{"bar"},
154
},
155
},
156
true,
157
nil,
158
},
159
{
160
&Config{
161
KafkaConfig: TargetConfig{
162
Brokers: []string{"foo"},
163
GroupID: "bar",
164
},
165
},
166
true,
167
nil,
168
},
169
{
170
&Config{
171
KafkaConfig: TargetConfig{
172
Brokers: []string{"foo"},
173
},
174
},
175
true,
176
nil,
177
},
178
{
179
&Config{
180
KafkaConfig: TargetConfig{
181
Brokers: []string{"foo"},
182
Topics: []string{"bar"},
183
},
184
},
185
false,
186
&Config{
187
KafkaConfig: TargetConfig{
188
Brokers: []string{"foo"},
189
Topics: []string{"bar"},
190
GroupID: "promtail",
191
Version: "2.1.1",
192
},
193
},
194
},
195
}
196
197
for i, tt := range tests {
198
tt := tt
199
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
200
err := validateConfig(tt.cfg)
201
if (err != nil) != tt.wantErr {
202
t.Errorf("validateConfig() error = %v, wantErr %v", err, tt.wantErr)
203
}
204
if err == nil {
205
require.Equal(t, tt.expected, tt.cfg)
206
}
207
})
208
}
209
}
210
211
func Test_withAuthentication(t *testing.T) {
212
var (
213
tlsConf = config.TLSConfig{
214
CAFile: "testdata/example.com.ca.pem",
215
CertFile: "testdata/example.com.pem",
216
KeyFile: "testdata/example.com-key.pem",
217
ServerName: "example.com",
218
InsecureSkipVerify: true,
219
}
220
expectedTLSConf, _ = config.NewTLSConfig(&config.TLSConfig{
221
CAFile: "testdata/example.com.ca.pem",
222
CertFile: "testdata/example.com.pem",
223
KeyFile: "testdata/example.com-key.pem",
224
ServerName: "example.com",
225
InsecureSkipVerify: true,
226
})
227
cfg = sarama.NewConfig()
228
)
229
230
// no authentication
231
noAuthCfg, err := withAuthentication(*cfg, Authentication{
232
Type: AuthenticationTypeNone,
233
})
234
assert.Nil(t, err)
235
assert.Equal(t, false, noAuthCfg.Net.TLS.Enable)
236
assert.Equal(t, false, noAuthCfg.Net.SASL.Enable)
237
assert.NoError(t, noAuthCfg.Validate())
238
239
// specify unsupported auth type
240
illegalAuthTypeCfg, err := withAuthentication(*cfg, Authentication{
241
Type: "illegal",
242
})
243
assert.NotNil(t, err)
244
assert.Nil(t, illegalAuthTypeCfg)
245
246
// mTLS authentication
247
mTLSCfg, err := withAuthentication(*cfg, Authentication{
248
Type: AuthenticationTypeSSL,
249
TLSConfig: tlsConf,
250
})
251
assert.Nil(t, err)
252
assert.Equal(t, true, mTLSCfg.Net.TLS.Enable)
253
assert.NotNil(t, mTLSCfg.Net.TLS.Config)
254
assert.Equal(t, "example.com", mTLSCfg.Net.TLS.Config.ServerName)
255
assert.Equal(t, true, mTLSCfg.Net.TLS.Config.InsecureSkipVerify)
256
assert.Equal(t, expectedTLSConf.Certificates, mTLSCfg.Net.TLS.Config.Certificates)
257
assert.NotNil(t, mTLSCfg.Net.TLS.Config.RootCAs)
258
assert.NoError(t, mTLSCfg.Validate())
259
260
// mTLS authentication expect ignore sasl
261
mTLSCfg, err = withAuthentication(*cfg, Authentication{
262
Type: AuthenticationTypeSSL,
263
TLSConfig: tlsConf,
264
SASLConfig: SASLConfig{
265
Mechanism: sarama.SASLTypeSCRAMSHA256,
266
User: "user",
267
Password: flagext.SecretWithValue("pass"),
268
UseTLS: false,
269
},
270
})
271
assert.Nil(t, err)
272
assert.Equal(t, false, mTLSCfg.Net.SASL.Enable)
273
274
// SASL/PLAIN
275
saslCfg, err := withAuthentication(*cfg, Authentication{
276
Type: AuthenticationTypeSASL,
277
SASLConfig: SASLConfig{
278
Mechanism: sarama.SASLTypePlaintext,
279
User: "user",
280
Password: flagext.SecretWithValue("pass"),
281
},
282
})
283
assert.Nil(t, err)
284
assert.Equal(t, false, saslCfg.Net.TLS.Enable)
285
assert.Equal(t, true, saslCfg.Net.SASL.Enable)
286
assert.Equal(t, "user", saslCfg.Net.SASL.User)
287
assert.Equal(t, "pass", saslCfg.Net.SASL.Password)
288
assert.Equal(t, sarama.SASLTypePlaintext, string(saslCfg.Net.SASL.Mechanism))
289
assert.NoError(t, saslCfg.Validate())
290
291
// SASL/SCRAM
292
saslCfg, err = withAuthentication(*cfg, Authentication{
293
Type: AuthenticationTypeSASL,
294
SASLConfig: SASLConfig{
295
Mechanism: sarama.SASLTypeSCRAMSHA512,
296
User: "user",
297
Password: flagext.SecretWithValue("pass"),
298
},
299
})
300
assert.Nil(t, err)
301
assert.Equal(t, false, saslCfg.Net.TLS.Enable)
302
assert.Equal(t, true, saslCfg.Net.SASL.Enable)
303
assert.Equal(t, "user", saslCfg.Net.SASL.User)
304
assert.Equal(t, "pass", saslCfg.Net.SASL.Password)
305
assert.Equal(t, sarama.SASLTypeSCRAMSHA512, string(saslCfg.Net.SASL.Mechanism))
306
assert.NoError(t, saslCfg.Validate())
307
308
// SASL unsupported mechanism
309
_, err = withAuthentication(*cfg, Authentication{
310
Type: AuthenticationTypeSASL,
311
SASLConfig: SASLConfig{
312
Mechanism: sarama.SASLTypeGSSAPI,
313
User: "user",
314
Password: flagext.SecretWithValue("pass"),
315
},
316
})
317
assert.Error(t, err)
318
assert.Equal(t, err.Error(), "error unsupported sasl mechanism: GSSAPI")
319
320
// SASL over TLS
321
saslCfg, err = withAuthentication(*cfg, Authentication{
322
Type: AuthenticationTypeSASL,
323
SASLConfig: SASLConfig{
324
Mechanism: sarama.SASLTypeSCRAMSHA512,
325
User: "user",
326
Password: flagext.SecretWithValue("pass"),
327
UseTLS: true,
328
TLSConfig: tlsConf,
329
},
330
})
331
assert.Nil(t, err)
332
assert.Equal(t, true, saslCfg.Net.TLS.Enable)
333
assert.Equal(t, true, saslCfg.Net.SASL.Enable)
334
assert.NotNil(t, saslCfg.Net.TLS.Config)
335
assert.Equal(t, "example.com", saslCfg.Net.TLS.Config.ServerName)
336
assert.Equal(t, true, saslCfg.Net.TLS.Config.InsecureSkipVerify)
337
assert.Equal(t, expectedTLSConf.Certificates, saslCfg.Net.TLS.Config.Certificates)
338
assert.NotNil(t, saslCfg.Net.TLS.Config.RootCAs)
339
assert.NoError(t, saslCfg.Validate())
340
}
341
342