Path: blob/main/component/loki/source/internal/kafkatarget/topics.go
4096 views
package kafkatarget12// This code is copied from Promtail. The kafkatarget package is used to3// configure and run the targets that can read kafka entries and forward them4// to other loki components.56import (7"errors"8"fmt"9"regexp"10"sort"11)1213type topicClient interface {14RefreshMetadata(topics ...string) error15Topics() ([]string, error)16}1718type topicManager struct {19client topicClient2021patterns []*regexp.Regexp22matches []string23}2425// newTopicManager fetches topics and returns matchings one based on list of requested topics.26// If a topic starts with a '^' it is treated as a regexp and can match multiple topics.27func newTopicManager(client topicClient, topics []string) (*topicManager, error) {28var (29patterns []*regexp.Regexp30matches []string31)32for _, t := range topics {33if len(t) == 0 {34return nil, errors.New("invalid empty topic")35}36if t[0] != '^' {37matches = append(matches, t)38}39re, err := regexp.Compile(t)40if err != nil {41return nil, fmt.Errorf("invalid topic pattern: %w", err)42}43patterns = append(patterns, re)44}45return &topicManager{46client: client,47patterns: patterns,48matches: matches,49}, nil50}5152func (tm *topicManager) Topics() ([]string, error) {53if err := tm.client.RefreshMetadata(); err != nil {54return nil, err55}56topics, err := tm.client.Topics()57if err != nil {58return nil, err59}6061result := make([]string, 0, len(topics))6263Outer:64for _, topic := range topics {65for _, m := range tm.matches {66if m == topic {67result = append(result, topic)68continue Outer69}70}71for _, p := range tm.patterns {72if p.MatchString(topic) {73result = append(result, topic)74continue Outer75}76}77}7879sort.Strings(result)80return result, nil81}828384