Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/internal/kafkatarget/consumer.go
4096 views
1
package kafkatarget
2
3
// This code is copied from Promtail. The kafkatarget package is used to
4
// configure and run the targets that can read kafka entries and forward them
5
// to other loki components.
6
7
import (
8
"context"
9
"fmt"
10
"sync"
11
"time"
12
13
"github.com/Shopify/sarama"
14
"github.com/go-kit/log"
15
"github.com/go-kit/log/level"
16
"github.com/grafana/dskit/backoff"
17
18
"github.com/grafana/loki/clients/pkg/promtail/targets/target"
19
)
20
21
var defaultBackOff = backoff.Config{
22
MinBackoff: 1 * time.Second,
23
MaxBackoff: 60 * time.Second,
24
MaxRetries: 20,
25
}
26
27
type RunnableTarget interface {
28
target.Target
29
run()
30
}
31
32
type TargetDiscoverer interface {
33
NewTarget(sarama.ConsumerGroupSession, sarama.ConsumerGroupClaim) (RunnableTarget, error)
34
}
35
36
// consumer handle a group consumer instance.
37
// It will create a new target for every consumer claim using the `TargetDiscoverer`.
38
type consumer struct {
39
sarama.ConsumerGroup
40
discoverer TargetDiscoverer
41
logger log.Logger
42
43
ctx context.Context
44
cancel context.CancelFunc
45
wg sync.WaitGroup
46
47
mutex sync.Mutex // used during rebalancing setup and tear down
48
activeTargets []target.Target
49
droppedTargets []target.Target
50
}
51
52
// start starts the consumer for a given list of topics.
53
func (c *consumer) start(ctx context.Context, topics []string) {
54
c.wg.Wait()
55
c.wg.Add(1)
56
57
c.ctx, c.cancel = context.WithCancel(ctx)
58
level.Info(c.logger).Log("msg", "starting consumer", "topics", fmt.Sprintf("%+v", topics))
59
60
go func() {
61
defer c.wg.Done()
62
backoff := backoff.New(c.ctx, defaultBackOff)
63
for {
64
// Calling Consume in an infinite loop in case rebalancing is kicking in.
65
// In which case all claims will be renewed.
66
err := c.ConsumerGroup.Consume(c.ctx, topics, c)
67
if err != nil && err != context.Canceled {
68
level.Error(c.logger).Log("msg", "error from the consumer, retrying...", "err", err)
69
// backoff before re-trying.
70
backoff.Wait()
71
if backoff.Ongoing() {
72
continue
73
}
74
level.Error(c.logger).Log("msg", "maximum error from the consumer reached", "last_err", err)
75
return
76
}
77
if c.ctx.Err() != nil || err == context.Canceled {
78
level.Info(c.logger).Log("msg", "stopping consumer", "topics", fmt.Sprintf("%+v", topics))
79
return
80
}
81
backoff.Reset()
82
}
83
}()
84
}
85
86
// ConsumeClaim creates a target for the given received claim and start reading message from it.
87
func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
88
c.wg.Add(1)
89
defer c.wg.Done()
90
91
t, err := c.discoverer.NewTarget(session, claim)
92
if err != nil {
93
return err
94
}
95
if len(t.Labels()) == 0 {
96
c.addDroppedTarget(t)
97
t.run()
98
return nil
99
}
100
c.addTarget(t)
101
level.Info(c.logger).Log("msg", "consuming topic", "details", t.Details())
102
t.run()
103
104
return nil
105
}
106
107
// Setup is run at the beginning of a new session, before ConsumeClaim
108
func (c *consumer) Setup(session sarama.ConsumerGroupSession) error {
109
c.resetTargets()
110
return nil
111
}
112
113
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
114
func (c *consumer) Cleanup(sarama.ConsumerGroupSession) error {
115
c.resetTargets()
116
return nil
117
}
118
119
// stop stops the consumer.
120
func (c *consumer) stop() {
121
c.cancel()
122
c.wg.Wait()
123
c.resetTargets()
124
}
125
126
func (c *consumer) resetTargets() {
127
c.mutex.Lock()
128
defer c.mutex.Unlock()
129
c.activeTargets = nil
130
c.droppedTargets = nil
131
}
132
133
func (c *consumer) getActiveTargets() []target.Target {
134
c.mutex.Lock()
135
defer c.mutex.Unlock()
136
return c.activeTargets
137
}
138
139
func (c *consumer) getDroppedTargets() []target.Target {
140
c.mutex.Lock()
141
defer c.mutex.Unlock()
142
return c.droppedTargets
143
}
144
145
func (c *consumer) addTarget(t target.Target) {
146
c.mutex.Lock()
147
defer c.mutex.Unlock()
148
c.activeTargets = append(c.activeTargets, t)
149
}
150
151
func (c *consumer) addDroppedTarget(t target.Target) {
152
c.mutex.Lock()
153
defer c.mutex.Unlock()
154
c.droppedTargets = append(c.droppedTargets, t)
155
}
156
157