Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/internal/kafkatarget/topics_test.go
4096 views
1
package kafkatarget
2
3
// This code is copied from Promtail. The kafkatarget package is used to
4
// configure and run the targets that can read kafka entries and forward them
5
// to other loki components.
6
7
import (
8
"errors"
9
"strings"
10
"sync"
11
"testing"
12
13
"github.com/stretchr/testify/require"
14
)
15
16
type mockKafkaClient struct {
17
topics []string
18
err error
19
20
mut sync.RWMutex
21
}
22
23
func (m *mockKafkaClient) RefreshMetadata(topics ...string) error {
24
return nil
25
}
26
27
func (m *mockKafkaClient) Topics() ([]string, error) {
28
m.mut.RLock()
29
defer m.mut.RUnlock()
30
return m.topics, m.err
31
}
32
33
func (m *mockKafkaClient) UpdateTopics(topics []string) {
34
m.mut.Lock()
35
defer m.mut.Unlock()
36
m.topics = topics
37
}
38
39
func Test_NewTopicManager(t *testing.T) {
40
t.Parallel()
41
42
for _, tt := range []struct {
43
in []string
44
expectedErr bool
45
}{
46
{
47
[]string{""},
48
true,
49
},
50
{
51
[]string{"^("},
52
true,
53
},
54
{
55
[]string{"foo"},
56
false,
57
},
58
{
59
[]string{"foo", "^foo.*"},
60
false,
61
},
62
} {
63
tt := tt
64
t.Run(strings.Join(tt.in, ","), func(t *testing.T) {
65
t.Parallel()
66
_, err := newTopicManager(&mockKafkaClient{}, tt.in)
67
if tt.expectedErr {
68
require.Error(t, err)
69
return
70
}
71
require.NoError(t, err)
72
})
73
}
74
}
75
76
func Test_Topics(t *testing.T) {
77
t.Parallel()
78
79
for _, tt := range []struct {
80
manager *topicManager
81
expected []string
82
expectedErr bool
83
}{
84
{
85
mustNewTopicsManager(&mockKafkaClient{err: errors.New("")}, []string{"foo"}),
86
[]string{},
87
true,
88
},
89
{
90
mustNewTopicsManager(&mockKafkaClient{topics: []string{"foo", "foobar", "buzz"}}, []string{"^foo"}),
91
[]string{"foo", "foobar"},
92
false,
93
},
94
{
95
mustNewTopicsManager(&mockKafkaClient{topics: []string{"foo", "foobar", "buzz"}}, []string{"^foo.*", "buzz"}),
96
[]string{"buzz", "foo", "foobar"},
97
false,
98
},
99
} {
100
tt := tt
101
t.Run("", func(t *testing.T) {
102
t.Parallel()
103
104
actual, err := tt.manager.Topics()
105
if tt.expectedErr {
106
require.Error(t, err)
107
return
108
}
109
require.NoError(t, err)
110
require.Equal(t, tt.expected, actual)
111
})
112
}
113
}
114
115
func mustNewTopicsManager(client topicClient, topics []string) *topicManager {
116
t, err := newTopicManager(client, topics)
117
if err != nil {
118
panic(err)
119
}
120
return t
121
}
122
123