Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/kafka/kafka.go
4096 views
1
package kafka
2
3
import (
4
"context"
5
"sync"
6
7
"github.com/Shopify/sarama"
8
"github.com/go-kit/log/level"
9
"github.com/grafana/agent/component"
10
"github.com/grafana/agent/component/common/config"
11
"github.com/grafana/agent/component/common/loki"
12
flow_relabel "github.com/grafana/agent/component/common/relabel"
13
kt "github.com/grafana/agent/component/loki/source/internal/kafkatarget"
14
"github.com/grafana/dskit/flagext"
15
"github.com/prometheus/common/model"
16
)
17
18
func init() {
19
component.Register(component.Registration{
20
Name: "loki.source.kafka",
21
Args: Arguments{},
22
23
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
24
return New(opts, args.(Arguments))
25
},
26
})
27
}
28
29
// Arguments holds values which are used to configure the loki.source.kafka
30
// component.
31
type Arguments struct {
32
Brokers []string `river:"brokers,attr"`
33
Topics []string `river:"topics,attr"`
34
GroupID string `river:"group_id,attr,optional"`
35
Assignor string `river:"assignor,attr,optional"`
36
Version string `river:"version,attr,optional"`
37
Authentication KafkaAuthentication `river:"authentication,block,optional"`
38
UseIncomingTimestamp bool `river:"use_incoming_timestamp,attr,optional"`
39
Labels map[string]string `river:"labels,attr,optional"`
40
41
ForwardTo []loki.LogsReceiver `river:"forward_to,attr"`
42
RelabelRules flow_relabel.Rules `river:"relabel_rules,attr,optional"`
43
}
44
45
// KafkaAuthentication describe the configuration for authentication with Kafka brokers
46
type KafkaAuthentication struct {
47
Type string `river:"type,attr,optional"`
48
TLSConfig config.TLSConfig `river:"tls_config,block,optional"`
49
SASLConfig KafkaSASLConfig `river:"sasl_config,block,optional"`
50
}
51
52
// KafkaSASLConfig describe the SASL configuration for authentication with Kafka brokers
53
type KafkaSASLConfig struct {
54
Mechanism string `river:"mechanism,attr,optional"`
55
User string `river:"user,attr,optional"`
56
Password string `river:"password,attr,optional"`
57
UseTLS bool `river:"use_tls,attr,optional"`
58
TLSConfig config.TLSConfig `river:"tls_config,block,optional"`
59
OAuthConfig OAuthConfigConfig `river:"oauth_config,block,optional"`
60
}
61
62
type OAuthConfigConfig struct {
63
TokenProvider string `river:"token_provider,attr"`
64
Scopes []string `river:"scopes,attr"`
65
}
66
67
// DefaultArguments provides the default arguments for a kafka component.
68
var DefaultArguments = Arguments{
69
GroupID: "loki.source.kafka",
70
Assignor: "range",
71
Version: "2.2.1",
72
Authentication: KafkaAuthentication{
73
Type: "none",
74
SASLConfig: KafkaSASLConfig{
75
Mechanism: sarama.SASLTypePlaintext,
76
UseTLS: false,
77
},
78
},
79
UseIncomingTimestamp: false,
80
}
81
82
// UnmarshalRiver implements river.Unmarshaler.
83
func (a *Arguments) UnmarshalRiver(f func(interface{}) error) error {
84
*a = DefaultArguments
85
86
type kafkacfg Arguments
87
err := f((*kafkacfg)(a))
88
if err != nil {
89
return err
90
}
91
92
return nil
93
}
94
95
// Component implements the loki.source.kafka component.
96
type Component struct {
97
opts component.Options
98
99
mut sync.RWMutex
100
fanout []loki.LogsReceiver
101
target *kt.TargetSyncer
102
103
handler loki.LogsReceiver
104
}
105
106
// New creates a new loki.source.kafka component.
107
func New(o component.Options, args Arguments) (*Component, error) {
108
c := &Component{
109
opts: o,
110
mut: sync.RWMutex{},
111
fanout: args.ForwardTo,
112
target: nil,
113
handler: make(loki.LogsReceiver),
114
}
115
116
// Call to Update() to start readers and set receivers once at the start.
117
if err := c.Update(args); err != nil {
118
return nil, err
119
}
120
121
return c, nil
122
}
123
124
// Run implements component.Component.
125
func (c *Component) Run(ctx context.Context) error {
126
defer func() {
127
c.mut.Lock()
128
defer c.mut.Unlock()
129
130
level.Info(c.opts.Logger).Log("msg", "loki.source.kafka component shutting down, stopping target")
131
if c.target != nil {
132
err := c.target.Stop()
133
if err != nil {
134
level.Error(c.opts.Logger).Log("msg", "error while stopping kafka target", "err", err)
135
}
136
}
137
}()
138
139
for {
140
select {
141
case <-ctx.Done():
142
return nil
143
case entry := <-c.handler:
144
c.mut.RLock()
145
for _, receiver := range c.fanout {
146
receiver <- entry
147
}
148
c.mut.RUnlock()
149
}
150
}
151
}
152
153
// Update implements component.Component.
154
func (c *Component) Update(args component.Arguments) error {
155
c.mut.Lock()
156
defer c.mut.Unlock()
157
158
newArgs := args.(Arguments)
159
c.fanout = newArgs.ForwardTo
160
161
if c.target != nil {
162
err := c.target.Stop()
163
if err != nil {
164
level.Error(c.opts.Logger).Log("msg", "error while stopping kafka target", "err", err)
165
}
166
}
167
168
entryHandler := loki.NewEntryHandler(c.handler, func() {})
169
t, err := kt.NewSyncer(c.opts.Registerer, c.opts.Logger, newArgs.Convert(), entryHandler, &kt.KafkaTargetMessageParser{})
170
if err != nil {
171
level.Error(c.opts.Logger).Log("msg", "failed to create kafka client with provided config", "err", err)
172
return err
173
}
174
175
c.target = t
176
177
return nil
178
}
179
180
// Convert is used to bridge between the River and Promtail types.
181
func (args *Arguments) Convert() kt.Config {
182
lbls := make(model.LabelSet, len(args.Labels))
183
for k, v := range args.Labels {
184
lbls[model.LabelName(k)] = model.LabelValue(v)
185
}
186
187
return kt.Config{
188
KafkaConfig: kt.TargetConfig{
189
Labels: lbls,
190
UseIncomingTimestamp: args.UseIncomingTimestamp,
191
Brokers: args.Brokers,
192
GroupID: args.GroupID,
193
Topics: args.Topics,
194
Version: args.Version,
195
Assignor: args.Assignor,
196
Authentication: args.Authentication.Convert(),
197
},
198
RelabelConfigs: flow_relabel.ComponentToPromRelabelConfigs(args.RelabelRules),
199
}
200
}
201
202
func (auth KafkaAuthentication) Convert() kt.Authentication {
203
var secret flagext.Secret
204
if auth.SASLConfig.Password != "" {
205
err := secret.Set(auth.SASLConfig.Password)
206
if err != nil {
207
panic("Unable to set kafka SASLConfig password")
208
}
209
}
210
211
return kt.Authentication{
212
Type: kt.AuthenticationType(auth.Type),
213
TLSConfig: *auth.TLSConfig.Convert(),
214
SASLConfig: kt.SASLConfig{
215
Mechanism: sarama.SASLMechanism(auth.SASLConfig.Mechanism),
216
User: auth.SASLConfig.User,
217
Password: secret,
218
UseTLS: auth.SASLConfig.UseTLS,
219
TLSConfig: *auth.SASLConfig.TLSConfig.Convert(),
220
OAuthConfig: kt.OAuthConfig{
221
TokenProvider: kt.TokenProviderType(auth.SASLConfig.OAuthConfig.TokenProvider),
222
Scopes: auth.SASLConfig.OAuthConfig.Scopes,
223
},
224
},
225
}
226
}
227
228