Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/internal/kafkatarget/topics.go
4096 views
1
package kafkatarget
2
3
// This code is copied from Promtail. The kafkatarget package is used to
4
// configure and run the targets that can read kafka entries and forward them
5
// to other loki components.
6
7
import (
8
"errors"
9
"fmt"
10
"regexp"
11
"sort"
12
)
13
14
type topicClient interface {
15
RefreshMetadata(topics ...string) error
16
Topics() ([]string, error)
17
}
18
19
type topicManager struct {
20
client topicClient
21
22
patterns []*regexp.Regexp
23
matches []string
24
}
25
26
// newTopicManager fetches topics and returns matchings one based on list of requested topics.
27
// If a topic starts with a '^' it is treated as a regexp and can match multiple topics.
28
func newTopicManager(client topicClient, topics []string) (*topicManager, error) {
29
var (
30
patterns []*regexp.Regexp
31
matches []string
32
)
33
for _, t := range topics {
34
if len(t) == 0 {
35
return nil, errors.New("invalid empty topic")
36
}
37
if t[0] != '^' {
38
matches = append(matches, t)
39
}
40
re, err := regexp.Compile(t)
41
if err != nil {
42
return nil, fmt.Errorf("invalid topic pattern: %w", err)
43
}
44
patterns = append(patterns, re)
45
}
46
return &topicManager{
47
client: client,
48
patterns: patterns,
49
matches: matches,
50
}, nil
51
}
52
53
func (tm *topicManager) Topics() ([]string, error) {
54
if err := tm.client.RefreshMetadata(); err != nil {
55
return nil, err
56
}
57
topics, err := tm.client.Topics()
58
if err != nil {
59
return nil, err
60
}
61
62
result := make([]string, 0, len(topics))
63
64
Outer:
65
for _, topic := range topics {
66
for _, m := range tm.matches {
67
if m == topic {
68
result = append(result, topic)
69
continue Outer
70
}
71
}
72
for _, p := range tm.patterns {
73
if p.MatchString(topic) {
74
result = append(result, topic)
75
continue Outer
76
}
77
}
78
}
79
80
sort.Strings(result)
81
return result, nil
82
}
83
84