Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/internal/kafkatarget/target_syncer.go
4096 views
1
package kafkatarget
2
3
// This code is copied from Promtail. The kafkatarget package is used to
4
// configure and run the targets that can read kafka entries and forward them
5
// to other loki components.
6
7
import (
8
"context"
9
"errors"
10
"fmt"
11
"sync"
12
"time"
13
14
"github.com/Shopify/sarama"
15
"github.com/go-kit/log"
16
"github.com/go-kit/log/level"
17
"github.com/prometheus/client_golang/prometheus"
18
promconfig "github.com/prometheus/common/config"
19
"github.com/prometheus/common/model"
20
"github.com/prometheus/prometheus/model/labels"
21
22
"github.com/grafana/loki/clients/pkg/promtail/targets/target"
23
24
"github.com/grafana/agent/component/common/loki"
25
)
26
27
var TopicPollInterval = 30 * time.Second
28
29
type TopicManager interface {
30
Topics() ([]string, error)
31
}
32
33
type TargetSyncer struct {
34
logger log.Logger
35
cfg Config
36
reg prometheus.Registerer
37
client loki.EntryHandler
38
39
topicManager TopicManager
40
consumer
41
close func() error
42
43
ctx context.Context
44
cancel context.CancelFunc
45
wg sync.WaitGroup
46
previousTopics []string
47
messageParser MessageParser
48
}
49
50
func NewSyncer(
51
reg prometheus.Registerer,
52
logger log.Logger,
53
cfg Config,
54
pushClient loki.EntryHandler,
55
messageParser MessageParser,
56
) (*TargetSyncer, error) {
57
58
if err := validateConfig(&cfg); err != nil {
59
return nil, err
60
}
61
version, err := sarama.ParseKafkaVersion(cfg.KafkaConfig.Version)
62
if err != nil {
63
return nil, err
64
}
65
config := sarama.NewConfig()
66
config.Version = version
67
config.Consumer.Offsets.Initial = sarama.OffsetOldest
68
69
switch cfg.KafkaConfig.Assignor {
70
case sarama.StickyBalanceStrategyName:
71
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
72
case sarama.RoundRobinBalanceStrategyName:
73
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
74
case sarama.RangeBalanceStrategyName, "":
75
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
76
default:
77
return nil, fmt.Errorf("unrecognized consumer group partition assignor: %s", cfg.KafkaConfig.Assignor)
78
}
79
config, err = withAuthentication(*config, cfg.KafkaConfig.Authentication)
80
if err != nil {
81
return nil, fmt.Errorf("error setting up kafka authentication: %w", err)
82
}
83
client, err := sarama.NewClient(cfg.KafkaConfig.Brokers, config)
84
if err != nil {
85
return nil, fmt.Errorf("error creating kafka client: %w", err)
86
}
87
group, err := sarama.NewConsumerGroup(cfg.KafkaConfig.Brokers, cfg.KafkaConfig.GroupID, config)
88
if err != nil {
89
return nil, fmt.Errorf("error creating consumer group client: %w", err)
90
}
91
topicManager, err := newTopicManager(client, cfg.KafkaConfig.Topics)
92
if err != nil {
93
return nil, fmt.Errorf("error creating topic manager: %w", err)
94
}
95
ctx, cancel := context.WithCancel(context.Background())
96
t := &TargetSyncer{
97
logger: logger,
98
ctx: ctx,
99
cancel: cancel,
100
topicManager: topicManager,
101
cfg: cfg,
102
reg: reg,
103
client: pushClient,
104
close: func() error {
105
if err := group.Close(); err != nil {
106
level.Warn(logger).Log("msg", "error while closing consumer group", "err", err)
107
}
108
return client.Close()
109
},
110
consumer: consumer{
111
ctx: context.Background(),
112
cancel: func() {},
113
ConsumerGroup: group,
114
logger: logger,
115
},
116
messageParser: messageParser,
117
}
118
t.discoverer = t
119
t.loop()
120
return t, nil
121
}
122
123
func withAuthentication(cfg sarama.Config, authCfg Authentication) (*sarama.Config, error) {
124
if len(authCfg.Type) == 0 || authCfg.Type == AuthenticationTypeNone {
125
return &cfg, nil
126
}
127
128
switch authCfg.Type {
129
case AuthenticationTypeSSL:
130
return withSSLAuthentication(cfg, authCfg)
131
case AuthenticationTypeSASL:
132
return withSASLAuthentication(cfg, authCfg)
133
default:
134
return nil, fmt.Errorf("unsupported authentication type %s", authCfg.Type)
135
}
136
}
137
138
func withSSLAuthentication(cfg sarama.Config, authCfg Authentication) (*sarama.Config, error) {
139
cfg.Net.TLS.Enable = true
140
tc, err := promconfig.NewTLSConfig(&authCfg.TLSConfig)
141
if err != nil {
142
return nil, err
143
}
144
cfg.Net.TLS.Config = tc
145
return &cfg, nil
146
}
147
148
func withSASLAuthentication(cfg sarama.Config, authCfg Authentication) (*sarama.Config, error) {
149
cfg.Net.SASL.Enable = true
150
cfg.Net.SASL.User = authCfg.SASLConfig.User
151
cfg.Net.SASL.Password = authCfg.SASLConfig.Password.String()
152
cfg.Net.SASL.Mechanism = authCfg.SASLConfig.Mechanism
153
if cfg.Net.SASL.Mechanism == "" {
154
cfg.Net.SASL.Mechanism = sarama.SASLTypePlaintext
155
}
156
157
supportedMechanism := []string{
158
sarama.SASLTypeSCRAMSHA512,
159
sarama.SASLTypeSCRAMSHA256,
160
sarama.SASLTypePlaintext,
161
sarama.SASLTypeOAuth,
162
}
163
if !StringsContain(supportedMechanism, string(authCfg.SASLConfig.Mechanism)) {
164
return nil, fmt.Errorf("error unsupported sasl mechanism: %s", authCfg.SASLConfig.Mechanism)
165
}
166
167
if cfg.Net.SASL.Mechanism == sarama.SASLTypeSCRAMSHA512 {
168
cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
169
return &XDGSCRAMClient{
170
HashGeneratorFcn: SHA512,
171
}
172
}
173
}
174
if cfg.Net.SASL.Mechanism == sarama.SASLTypeSCRAMSHA256 {
175
cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
176
return &XDGSCRAMClient{
177
HashGeneratorFcn: SHA256,
178
}
179
}
180
}
181
182
if cfg.Net.SASL.Mechanism == sarama.SASLTypeOAuth {
183
accessTokenProvider, err := NewOAuthProvider(authCfg.SASLConfig.OAuthConfig)
184
if err != nil {
185
return nil, fmt.Errorf("unable to create new access token provider: %w", err)
186
}
187
cfg.Net.SASL.TokenProvider = accessTokenProvider
188
}
189
190
if authCfg.SASLConfig.UseTLS {
191
tc, err := promconfig.NewTLSConfig(&authCfg.SASLConfig.TLSConfig)
192
if err != nil {
193
return nil, err
194
}
195
cfg.Net.TLS.Config = tc
196
cfg.Net.TLS.Enable = true
197
}
198
return &cfg, nil
199
}
200
201
func (ts *TargetSyncer) loop() {
202
topicChanged := make(chan []string)
203
ts.wg.Add(2)
204
go func() {
205
defer ts.wg.Done()
206
for {
207
select {
208
case <-ts.ctx.Done():
209
return
210
case topics := <-topicChanged:
211
level.Info(ts.logger).Log("msg", "new topics received", "topics", fmt.Sprintf("%+v", topics))
212
ts.stop()
213
if len(topics) > 0 { // no topics we don't need to start.
214
ts.start(ts.ctx, topics)
215
}
216
}
217
}
218
}()
219
go func() {
220
defer ts.wg.Done()
221
ticker := time.NewTicker(TopicPollInterval)
222
defer ticker.Stop()
223
224
tick := func() {
225
select {
226
case <-ts.ctx.Done():
227
case <-ticker.C:
228
}
229
}
230
for ; true; tick() { // instant tick.
231
if ts.ctx.Err() != nil {
232
ts.stop()
233
close(topicChanged)
234
return
235
}
236
newTopics, ok, err := ts.fetchTopics()
237
if err != nil {
238
level.Warn(ts.logger).Log("msg", "failed to fetch topics", "err", err)
239
continue
240
}
241
if ok {
242
topicChanged <- newTopics
243
}
244
}
245
}()
246
}
247
248
// fetchTopics fetches and return new topics, if there's a difference with previous found topics
249
// it will return true as second return value.
250
func (ts *TargetSyncer) fetchTopics() ([]string, bool, error) {
251
new, err := ts.topicManager.Topics()
252
if err != nil {
253
return nil, false, err
254
}
255
if len(ts.previousTopics) != len(new) {
256
ts.previousTopics = new
257
return new, true, nil
258
}
259
for i, v := range ts.previousTopics {
260
if v != new[i] {
261
ts.previousTopics = new
262
return new, true, nil
263
}
264
}
265
return nil, false, nil
266
}
267
268
func (ts *TargetSyncer) Stop() error {
269
ts.cancel()
270
ts.wg.Wait()
271
return ts.close()
272
}
273
274
// NewTarget creates a new targets based on the current kafka claim and group session.
275
func (ts *TargetSyncer) NewTarget(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) (RunnableTarget, error) {
276
discoveredLabels := model.LabelSet{
277
"__meta_kafka_topic": model.LabelValue(claim.Topic()),
278
"__meta_kafka_partition": model.LabelValue(fmt.Sprintf("%d", claim.Partition())),
279
"__meta_kafka_member_id": model.LabelValue(session.MemberID()),
280
"__meta_kafka_group_id": model.LabelValue(ts.cfg.KafkaConfig.GroupID),
281
}
282
details := newDetails(session, claim)
283
labelMap := make(map[string]string)
284
for k, v := range discoveredLabels.Clone().Merge(ts.cfg.KafkaConfig.Labels) {
285
labelMap[string(k)] = string(v)
286
}
287
labelOut := format(labels.FromMap(labelMap), ts.cfg.RelabelConfigs)
288
if len(labelOut) == 0 {
289
level.Warn(ts.logger).Log("msg", "dropping target", "reason", "no labels", "details", details, "discovered_labels", discoveredLabels.String())
290
return &runnableDroppedTarget{
291
Target: target.NewDroppedTarget("dropping target, no labels", discoveredLabels),
292
runFn: func() {
293
for range claim.Messages() {
294
}
295
},
296
}, nil
297
}
298
t := NewKafkaTarget(
299
ts.logger,
300
session,
301
claim,
302
discoveredLabels,
303
labelOut,
304
ts.cfg.RelabelConfigs,
305
ts.client,
306
ts.cfg.KafkaConfig.UseIncomingTimestamp,
307
ts.messageParser,
308
)
309
310
return t, nil
311
}
312
313
func validateConfig(cfg *Config) error {
314
if cfg.KafkaConfig.Version == "" {
315
cfg.KafkaConfig.Version = "2.1.1"
316
}
317
if len(cfg.KafkaConfig.Brokers) == 0 {
318
return errors.New("no Kafka bootstrap brokers defined")
319
}
320
321
if len(cfg.KafkaConfig.Topics) == 0 {
322
return errors.New("no topics given to be consumed")
323
}
324
325
if cfg.KafkaConfig.GroupID == "" {
326
cfg.KafkaConfig.GroupID = "promtail"
327
}
328
return nil
329
}
330
331
// StringsContain returns true if the search value is within the list of input values.
332
func StringsContain(values []string, search string) bool {
333
for _, v := range values {
334
if search == v {
335
return true
336
}
337
}
338
339
return false
340
}
341
342