Path: blob/main/component/loki/source/internal/kafkatarget/consumer.go
4096 views
package kafkatarget12// This code is copied from Promtail. The kafkatarget package is used to3// configure and run the targets that can read kafka entries and forward them4// to other loki components.56import (7"context"8"fmt"9"sync"10"time"1112"github.com/Shopify/sarama"13"github.com/go-kit/log"14"github.com/go-kit/log/level"15"github.com/grafana/dskit/backoff"1617"github.com/grafana/loki/clients/pkg/promtail/targets/target"18)1920var defaultBackOff = backoff.Config{21MinBackoff: 1 * time.Second,22MaxBackoff: 60 * time.Second,23MaxRetries: 20,24}2526type RunnableTarget interface {27target.Target28run()29}3031type TargetDiscoverer interface {32NewTarget(sarama.ConsumerGroupSession, sarama.ConsumerGroupClaim) (RunnableTarget, error)33}3435// consumer handle a group consumer instance.36// It will create a new target for every consumer claim using the `TargetDiscoverer`.37type consumer struct {38sarama.ConsumerGroup39discoverer TargetDiscoverer40logger log.Logger4142ctx context.Context43cancel context.CancelFunc44wg sync.WaitGroup4546mutex sync.Mutex // used during rebalancing setup and tear down47activeTargets []target.Target48droppedTargets []target.Target49}5051// start starts the consumer for a given list of topics.52func (c *consumer) start(ctx context.Context, topics []string) {53c.wg.Wait()54c.wg.Add(1)5556c.ctx, c.cancel = context.WithCancel(ctx)57level.Info(c.logger).Log("msg", "starting consumer", "topics", fmt.Sprintf("%+v", topics))5859go func() {60defer c.wg.Done()61backoff := backoff.New(c.ctx, defaultBackOff)62for {63// Calling Consume in an infinite loop in case rebalancing is kicking in.64// In which case all claims will be renewed.65err := c.ConsumerGroup.Consume(c.ctx, topics, c)66if err != nil && err != context.Canceled {67level.Error(c.logger).Log("msg", "error from the consumer, retrying...", "err", err)68// backoff before re-trying.69backoff.Wait()70if backoff.Ongoing() {71continue72}73level.Error(c.logger).Log("msg", "maximum error from the consumer reached", "last_err", err)74return75}76if c.ctx.Err() != nil || err == context.Canceled {77level.Info(c.logger).Log("msg", "stopping consumer", "topics", fmt.Sprintf("%+v", topics))78return79}80backoff.Reset()81}82}()83}8485// ConsumeClaim creates a target for the given received claim and start reading message from it.86func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {87c.wg.Add(1)88defer c.wg.Done()8990t, err := c.discoverer.NewTarget(session, claim)91if err != nil {92return err93}94if len(t.Labels()) == 0 {95c.addDroppedTarget(t)96t.run()97return nil98}99c.addTarget(t)100level.Info(c.logger).Log("msg", "consuming topic", "details", t.Details())101t.run()102103return nil104}105106// Setup is run at the beginning of a new session, before ConsumeClaim107func (c *consumer) Setup(session sarama.ConsumerGroupSession) error {108c.resetTargets()109return nil110}111112// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited113func (c *consumer) Cleanup(sarama.ConsumerGroupSession) error {114c.resetTargets()115return nil116}117118// stop stops the consumer.119func (c *consumer) stop() {120c.cancel()121c.wg.Wait()122c.resetTargets()123}124125func (c *consumer) resetTargets() {126c.mutex.Lock()127defer c.mutex.Unlock()128c.activeTargets = nil129c.droppedTargets = nil130}131132func (c *consumer) getActiveTargets() []target.Target {133c.mutex.Lock()134defer c.mutex.Unlock()135return c.activeTargets136}137138func (c *consumer) getDroppedTargets() []target.Target {139c.mutex.Lock()140defer c.mutex.Unlock()141return c.droppedTargets142}143144func (c *consumer) addTarget(t target.Target) {145c.mutex.Lock()146defer c.mutex.Unlock()147c.activeTargets = append(c.activeTargets, t)148}149150func (c *consumer) addDroppedTarget(t target.Target) {151c.mutex.Lock()152defer c.mutex.Unlock()153c.droppedTargets = append(c.droppedTargets, t)154}155156157