Path: blob/main/component/loki/source/internal/kafkatarget/topics_test.go
4096 views
package kafkatarget12// This code is copied from Promtail. The kafkatarget package is used to3// configure and run the targets that can read kafka entries and forward them4// to other loki components.56import (7"errors"8"strings"9"sync"10"testing"1112"github.com/stretchr/testify/require"13)1415type mockKafkaClient struct {16topics []string17err error1819mut sync.RWMutex20}2122func (m *mockKafkaClient) RefreshMetadata(topics ...string) error {23return nil24}2526func (m *mockKafkaClient) Topics() ([]string, error) {27m.mut.RLock()28defer m.mut.RUnlock()29return m.topics, m.err30}3132func (m *mockKafkaClient) UpdateTopics(topics []string) {33m.mut.Lock()34defer m.mut.Unlock()35m.topics = topics36}3738func Test_NewTopicManager(t *testing.T) {39t.Parallel()4041for _, tt := range []struct {42in []string43expectedErr bool44}{45{46[]string{""},47true,48},49{50[]string{"^("},51true,52},53{54[]string{"foo"},55false,56},57{58[]string{"foo", "^foo.*"},59false,60},61} {62tt := tt63t.Run(strings.Join(tt.in, ","), func(t *testing.T) {64t.Parallel()65_, err := newTopicManager(&mockKafkaClient{}, tt.in)66if tt.expectedErr {67require.Error(t, err)68return69}70require.NoError(t, err)71})72}73}7475func Test_Topics(t *testing.T) {76t.Parallel()7778for _, tt := range []struct {79manager *topicManager80expected []string81expectedErr bool82}{83{84mustNewTopicsManager(&mockKafkaClient{err: errors.New("")}, []string{"foo"}),85[]string{},86true,87},88{89mustNewTopicsManager(&mockKafkaClient{topics: []string{"foo", "foobar", "buzz"}}, []string{"^foo"}),90[]string{"foo", "foobar"},91false,92},93{94mustNewTopicsManager(&mockKafkaClient{topics: []string{"foo", "foobar", "buzz"}}, []string{"^foo.*", "buzz"}),95[]string{"buzz", "foo", "foobar"},96false,97},98} {99tt := tt100t.Run("", func(t *testing.T) {101t.Parallel()102103actual, err := tt.manager.Topics()104if tt.expectedErr {105require.Error(t, err)106return107}108require.NoError(t, err)109require.Equal(t, tt.expected, actual)110})111}112}113114func mustNewTopicsManager(client topicClient, topics []string) *topicManager {115t, err := newTopicManager(client, topics)116if err != nil {117panic(err)118}119return t120}121122123