Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/internal/kafkatarget/kafkatarget.go
4096 views
1
package kafkatarget
2
3
// This code is copied from Promtail. The kafkatarget package is used to
4
// configure and run the targets that can read kafka entries and forward them
5
// to other loki components.
6
7
import (
8
"fmt"
9
"time"
10
11
"github.com/Shopify/sarama"
12
"github.com/go-kit/log"
13
"github.com/go-kit/log/level"
14
"github.com/grafana/agent/component/common/loki"
15
"github.com/grafana/loki/clients/pkg/promtail/targets/target"
16
"github.com/prometheus/common/model"
17
"github.com/prometheus/prometheus/model/labels"
18
"github.com/prometheus/prometheus/model/relabel"
19
)
20
21
type runnableDroppedTarget struct {
22
target.Target
23
runFn func()
24
}
25
26
func (d *runnableDroppedTarget) run() {
27
d.runFn()
28
}
29
30
type KafkaTarget struct {
31
logger log.Logger
32
discoveredLabels model.LabelSet
33
lbs model.LabelSet
34
details ConsumerDetails
35
claim sarama.ConsumerGroupClaim
36
session sarama.ConsumerGroupSession
37
client loki.EntryHandler
38
relabelConfig []*relabel.Config
39
useIncomingTimestamp bool
40
messageParser MessageParser
41
}
42
43
func NewKafkaTarget(
44
logger log.Logger,
45
session sarama.ConsumerGroupSession,
46
claim sarama.ConsumerGroupClaim,
47
discoveredLabels, lbs model.LabelSet,
48
relabelConfig []*relabel.Config,
49
client loki.EntryHandler,
50
useIncomingTimestamp bool,
51
messageParser MessageParser,
52
) *KafkaTarget {
53
54
return &KafkaTarget{
55
logger: logger,
56
discoveredLabels: discoveredLabels,
57
lbs: lbs,
58
details: newDetails(session, claim),
59
claim: claim,
60
session: session,
61
client: client,
62
relabelConfig: relabelConfig,
63
useIncomingTimestamp: useIncomingTimestamp,
64
messageParser: messageParser,
65
}
66
}
67
68
const (
69
defaultKafkaMessageKey = "none"
70
labelKeyKafkaMessageKey = "__meta_kafka_message_key"
71
)
72
73
func (t *KafkaTarget) run() {
74
defer t.client.Stop()
75
for message := range t.claim.Messages() {
76
mk := string(message.Key)
77
if len(mk) == 0 {
78
mk = defaultKafkaMessageKey
79
}
80
81
// TODO: Possibly need to format after merging with discovered labels because we can specify multiple labels in source labels
82
// https://github.com/grafana/loki/pull/4745#discussion_r750022234
83
lbs := format([]labels.Label{{
84
Name: labelKeyKafkaMessageKey,
85
Value: mk,
86
}}, t.relabelConfig)
87
88
out := t.lbs.Clone()
89
if len(lbs) > 0 {
90
out = out.Merge(lbs)
91
}
92
entries, err := t.messageParser.Parse(message, out, t.relabelConfig, t.useIncomingTimestamp)
93
if err != nil {
94
level.Error(t.logger).Log("msg", "message parsing error", "err", err)
95
} else {
96
for _, entry := range entries {
97
t.client.Chan() <- entry
98
}
99
}
100
101
t.session.MarkMessage(message, "")
102
}
103
}
104
105
func timestamp(useIncoming bool, incoming time.Time) time.Time {
106
if useIncoming {
107
return incoming
108
}
109
return time.Now()
110
}
111
112
func (t *KafkaTarget) Type() target.TargetType {
113
return target.KafkaTargetType
114
}
115
116
func (t *KafkaTarget) Ready() bool {
117
return true
118
}
119
120
func (t *KafkaTarget) DiscoveredLabels() model.LabelSet {
121
return t.discoveredLabels
122
}
123
124
func (t *KafkaTarget) Labels() model.LabelSet {
125
return t.lbs
126
}
127
128
// Details returns target-specific details.
129
func (t *KafkaTarget) Details() interface{} {
130
return t.details
131
}
132
133
type ConsumerDetails struct {
134
135
// MemberID returns the cluster member ID.
136
MemberID string
137
138
// GenerationID returns the current generation ID.
139
GenerationID int32
140
141
Topic string
142
Partition int32
143
InitialOffset int64
144
}
145
146
func (c ConsumerDetails) String() string {
147
return fmt.Sprintf("member_id=%s generation_id=%d topic=%s partition=%d initial_offset=%d", c.MemberID, c.GenerationID, c.Topic, c.Partition, c.InitialOffset)
148
}
149
150
func newDetails(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) ConsumerDetails {
151
return ConsumerDetails{
152
MemberID: session.MemberID(),
153
GenerationID: session.GenerationID(),
154
Topic: claim.Topic(),
155
Partition: claim.Partition(),
156
InitialOffset: claim.InitialOffset(),
157
}
158
}
159
160