Path: blob/main/component/loki/source/internal/kafkatarget/target_syncer.go
4096 views
package kafkatarget12// This code is copied from Promtail. The kafkatarget package is used to3// configure and run the targets that can read kafka entries and forward them4// to other loki components.56import (7"context"8"errors"9"fmt"10"sync"11"time"1213"github.com/Shopify/sarama"14"github.com/go-kit/log"15"github.com/go-kit/log/level"16"github.com/prometheus/client_golang/prometheus"17promconfig "github.com/prometheus/common/config"18"github.com/prometheus/common/model"19"github.com/prometheus/prometheus/model/labels"2021"github.com/grafana/loki/clients/pkg/promtail/targets/target"2223"github.com/grafana/agent/component/common/loki"24)2526var TopicPollInterval = 30 * time.Second2728type TopicManager interface {29Topics() ([]string, error)30}3132type TargetSyncer struct {33logger log.Logger34cfg Config35reg prometheus.Registerer36client loki.EntryHandler3738topicManager TopicManager39consumer40close func() error4142ctx context.Context43cancel context.CancelFunc44wg sync.WaitGroup45previousTopics []string46messageParser MessageParser47}4849func NewSyncer(50reg prometheus.Registerer,51logger log.Logger,52cfg Config,53pushClient loki.EntryHandler,54messageParser MessageParser,55) (*TargetSyncer, error) {5657if err := validateConfig(&cfg); err != nil {58return nil, err59}60version, err := sarama.ParseKafkaVersion(cfg.KafkaConfig.Version)61if err != nil {62return nil, err63}64config := sarama.NewConfig()65config.Version = version66config.Consumer.Offsets.Initial = sarama.OffsetOldest6768switch cfg.KafkaConfig.Assignor {69case sarama.StickyBalanceStrategyName:70config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky71case sarama.RoundRobinBalanceStrategyName:72config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin73case sarama.RangeBalanceStrategyName, "":74config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange75default:76return nil, fmt.Errorf("unrecognized consumer group partition assignor: %s", cfg.KafkaConfig.Assignor)77}78config, err = withAuthentication(*config, cfg.KafkaConfig.Authentication)79if err != nil {80return nil, fmt.Errorf("error setting up kafka authentication: %w", err)81}82client, err := sarama.NewClient(cfg.KafkaConfig.Brokers, config)83if err != nil {84return nil, fmt.Errorf("error creating kafka client: %w", err)85}86group, err := sarama.NewConsumerGroup(cfg.KafkaConfig.Brokers, cfg.KafkaConfig.GroupID, config)87if err != nil {88return nil, fmt.Errorf("error creating consumer group client: %w", err)89}90topicManager, err := newTopicManager(client, cfg.KafkaConfig.Topics)91if err != nil {92return nil, fmt.Errorf("error creating topic manager: %w", err)93}94ctx, cancel := context.WithCancel(context.Background())95t := &TargetSyncer{96logger: logger,97ctx: ctx,98cancel: cancel,99topicManager: topicManager,100cfg: cfg,101reg: reg,102client: pushClient,103close: func() error {104if err := group.Close(); err != nil {105level.Warn(logger).Log("msg", "error while closing consumer group", "err", err)106}107return client.Close()108},109consumer: consumer{110ctx: context.Background(),111cancel: func() {},112ConsumerGroup: group,113logger: logger,114},115messageParser: messageParser,116}117t.discoverer = t118t.loop()119return t, nil120}121122func withAuthentication(cfg sarama.Config, authCfg Authentication) (*sarama.Config, error) {123if len(authCfg.Type) == 0 || authCfg.Type == AuthenticationTypeNone {124return &cfg, nil125}126127switch authCfg.Type {128case AuthenticationTypeSSL:129return withSSLAuthentication(cfg, authCfg)130case AuthenticationTypeSASL:131return withSASLAuthentication(cfg, authCfg)132default:133return nil, fmt.Errorf("unsupported authentication type %s", authCfg.Type)134}135}136137func withSSLAuthentication(cfg sarama.Config, authCfg Authentication) (*sarama.Config, error) {138cfg.Net.TLS.Enable = true139tc, err := promconfig.NewTLSConfig(&authCfg.TLSConfig)140if err != nil {141return nil, err142}143cfg.Net.TLS.Config = tc144return &cfg, nil145}146147func withSASLAuthentication(cfg sarama.Config, authCfg Authentication) (*sarama.Config, error) {148cfg.Net.SASL.Enable = true149cfg.Net.SASL.User = authCfg.SASLConfig.User150cfg.Net.SASL.Password = authCfg.SASLConfig.Password.String()151cfg.Net.SASL.Mechanism = authCfg.SASLConfig.Mechanism152if cfg.Net.SASL.Mechanism == "" {153cfg.Net.SASL.Mechanism = sarama.SASLTypePlaintext154}155156supportedMechanism := []string{157sarama.SASLTypeSCRAMSHA512,158sarama.SASLTypeSCRAMSHA256,159sarama.SASLTypePlaintext,160sarama.SASLTypeOAuth,161}162if !StringsContain(supportedMechanism, string(authCfg.SASLConfig.Mechanism)) {163return nil, fmt.Errorf("error unsupported sasl mechanism: %s", authCfg.SASLConfig.Mechanism)164}165166if cfg.Net.SASL.Mechanism == sarama.SASLTypeSCRAMSHA512 {167cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {168return &XDGSCRAMClient{169HashGeneratorFcn: SHA512,170}171}172}173if cfg.Net.SASL.Mechanism == sarama.SASLTypeSCRAMSHA256 {174cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {175return &XDGSCRAMClient{176HashGeneratorFcn: SHA256,177}178}179}180181if cfg.Net.SASL.Mechanism == sarama.SASLTypeOAuth {182accessTokenProvider, err := NewOAuthProvider(authCfg.SASLConfig.OAuthConfig)183if err != nil {184return nil, fmt.Errorf("unable to create new access token provider: %w", err)185}186cfg.Net.SASL.TokenProvider = accessTokenProvider187}188189if authCfg.SASLConfig.UseTLS {190tc, err := promconfig.NewTLSConfig(&authCfg.SASLConfig.TLSConfig)191if err != nil {192return nil, err193}194cfg.Net.TLS.Config = tc195cfg.Net.TLS.Enable = true196}197return &cfg, nil198}199200func (ts *TargetSyncer) loop() {201topicChanged := make(chan []string)202ts.wg.Add(2)203go func() {204defer ts.wg.Done()205for {206select {207case <-ts.ctx.Done():208return209case topics := <-topicChanged:210level.Info(ts.logger).Log("msg", "new topics received", "topics", fmt.Sprintf("%+v", topics))211ts.stop()212if len(topics) > 0 { // no topics we don't need to start.213ts.start(ts.ctx, topics)214}215}216}217}()218go func() {219defer ts.wg.Done()220ticker := time.NewTicker(TopicPollInterval)221defer ticker.Stop()222223tick := func() {224select {225case <-ts.ctx.Done():226case <-ticker.C:227}228}229for ; true; tick() { // instant tick.230if ts.ctx.Err() != nil {231ts.stop()232close(topicChanged)233return234}235newTopics, ok, err := ts.fetchTopics()236if err != nil {237level.Warn(ts.logger).Log("msg", "failed to fetch topics", "err", err)238continue239}240if ok {241topicChanged <- newTopics242}243}244}()245}246247// fetchTopics fetches and return new topics, if there's a difference with previous found topics248// it will return true as second return value.249func (ts *TargetSyncer) fetchTopics() ([]string, bool, error) {250new, err := ts.topicManager.Topics()251if err != nil {252return nil, false, err253}254if len(ts.previousTopics) != len(new) {255ts.previousTopics = new256return new, true, nil257}258for i, v := range ts.previousTopics {259if v != new[i] {260ts.previousTopics = new261return new, true, nil262}263}264return nil, false, nil265}266267func (ts *TargetSyncer) Stop() error {268ts.cancel()269ts.wg.Wait()270return ts.close()271}272273// NewTarget creates a new targets based on the current kafka claim and group session.274func (ts *TargetSyncer) NewTarget(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) (RunnableTarget, error) {275discoveredLabels := model.LabelSet{276"__meta_kafka_topic": model.LabelValue(claim.Topic()),277"__meta_kafka_partition": model.LabelValue(fmt.Sprintf("%d", claim.Partition())),278"__meta_kafka_member_id": model.LabelValue(session.MemberID()),279"__meta_kafka_group_id": model.LabelValue(ts.cfg.KafkaConfig.GroupID),280}281details := newDetails(session, claim)282labelMap := make(map[string]string)283for k, v := range discoveredLabels.Clone().Merge(ts.cfg.KafkaConfig.Labels) {284labelMap[string(k)] = string(v)285}286labelOut := format(labels.FromMap(labelMap), ts.cfg.RelabelConfigs)287if len(labelOut) == 0 {288level.Warn(ts.logger).Log("msg", "dropping target", "reason", "no labels", "details", details, "discovered_labels", discoveredLabels.String())289return &runnableDroppedTarget{290Target: target.NewDroppedTarget("dropping target, no labels", discoveredLabels),291runFn: func() {292for range claim.Messages() {293}294},295}, nil296}297t := NewKafkaTarget(298ts.logger,299session,300claim,301discoveredLabels,302labelOut,303ts.cfg.RelabelConfigs,304ts.client,305ts.cfg.KafkaConfig.UseIncomingTimestamp,306ts.messageParser,307)308309return t, nil310}311312func validateConfig(cfg *Config) error {313if cfg.KafkaConfig.Version == "" {314cfg.KafkaConfig.Version = "2.1.1"315}316if len(cfg.KafkaConfig.Brokers) == 0 {317return errors.New("no Kafka bootstrap brokers defined")318}319320if len(cfg.KafkaConfig.Topics) == 0 {321return errors.New("no topics given to be consumed")322}323324if cfg.KafkaConfig.GroupID == "" {325cfg.KafkaConfig.GroupID = "promtail"326}327return nil328}329330// StringsContain returns true if the search value is within the list of input values.331func StringsContain(values []string, search string) bool {332for _, v := range values {333if search == v {334return true335}336}337338return false339}340341342