Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/otelcol/receiver/kafka/kafka.go
4096 views
1
// Package kafka provides an otelcol.receiver.kafka component.
2
package kafka
3
4
import (
5
"time"
6
7
"github.com/grafana/agent/component"
8
"github.com/grafana/agent/component/otelcol"
9
"github.com/grafana/agent/component/otelcol/receiver"
10
"github.com/grafana/agent/pkg/river"
11
"github.com/grafana/agent/pkg/river/rivertypes"
12
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
13
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver"
14
otelcomponent "go.opentelemetry.io/collector/component"
15
otelconfig "go.opentelemetry.io/collector/config"
16
)
17
18
func init() {
19
component.Register(component.Registration{
20
Name: "otelcol.receiver.kafka",
21
Args: Arguments{},
22
23
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
24
fact := kafkareceiver.NewFactory()
25
return receiver.New(opts, fact, args.(Arguments))
26
},
27
})
28
}
29
30
// Arguments configures the otelcol.receiver.kafka component.
31
type Arguments struct {
32
Brokers []string `river:"brokers,attr"`
33
ProtocolVersion string `river:"protocol_version,attr"`
34
Topic string `river:"topic,attr,optional"`
35
Encoding string `river:"encoding,attr,optional"`
36
GroupID string `river:"group_id,attr,optional"`
37
ClientID string `river:"client_id,attr,optional"`
38
39
Authentication AuthenticationArguments `river:"authentication,block,optional"`
40
Metadata MetadataArguments `river:"metadata,block,optional"`
41
AutoCommit AutoCommitArguments `river:"autocommit,block,optional"`
42
MessageMarking MessageMarkingArguments `river:"message_marking,block,optional"`
43
44
// Output configures where to send received data. Required.
45
Output *otelcol.ConsumerArguments `river:"output,block"`
46
}
47
48
var (
49
_ river.Unmarshaler = (*Arguments)(nil)
50
_ receiver.Arguments = Arguments{}
51
)
52
53
// DefaultArguments holds default values for Arguments.
54
var DefaultArguments = Arguments{
55
// We use the defaults from the upstream OpenTelemetry Collector component
56
// for compatibility, even though that means using a client and group ID of
57
// "otel-collector".
58
59
Topic: "otlp_spans",
60
Encoding: "otlp_proto",
61
Brokers: []string{"localhost:9092"},
62
ClientID: "otel-collector",
63
GroupID: "otel-collector",
64
Metadata: MetadataArguments{
65
IncludeAllTopics: true,
66
Retry: MetadataRetryArguments{
67
MaxRetries: 3,
68
Backoff: 250 * time.Millisecond,
69
},
70
},
71
AutoCommit: AutoCommitArguments{
72
Enable: true,
73
Interval: time.Second,
74
},
75
MessageMarking: MessageMarkingArguments{
76
AfterExecution: false,
77
IncludeUnsuccessful: false,
78
},
79
}
80
81
// UnmarshalRiver implements river.Unmarshaler and applies default settings.
82
func (args *Arguments) UnmarshalRiver(f func(interface{}) error) error {
83
*args = DefaultArguments
84
85
type arguments Arguments
86
return f((*arguments)(args))
87
}
88
89
// Convert implements receiver.Arguments.
90
func (args Arguments) Convert() (otelconfig.Receiver, error) {
91
return &kafkareceiver.Config{
92
ReceiverSettings: otelconfig.NewReceiverSettings(otelconfig.NewComponentID("kafka")),
93
94
Brokers: args.Brokers,
95
ProtocolVersion: args.ProtocolVersion,
96
Topic: args.Topic,
97
Encoding: args.Encoding,
98
GroupID: args.GroupID,
99
ClientID: args.ClientID,
100
101
Authentication: args.Authentication.Convert(),
102
Metadata: args.Metadata.Convert(),
103
AutoCommit: args.AutoCommit.Convert(),
104
MessageMarking: args.MessageMarking.Convert(),
105
}, nil
106
}
107
108
// Extensions implements receiver.Arguments.
109
func (args Arguments) Extensions() map[otelconfig.ComponentID]otelcomponent.Extension {
110
return nil
111
}
112
113
// Exporters implements receiver.Arguments.
114
func (args Arguments) Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter {
115
return nil
116
}
117
118
// NextConsumers implements receiver.Arguments.
119
func (args Arguments) NextConsumers() *otelcol.ConsumerArguments {
120
return args.Output
121
}
122
123
// AuthenticationArguments configures how to authenticate to the Kafka broker.
124
type AuthenticationArguments struct {
125
Plaintext *PlaintextArguments `river:"plaintext,block,optional"`
126
SASL *SASLArguments `river:"sasl,block,optional"`
127
TLS *otelcol.TLSClientArguments `river:"tls,block,optional"`
128
Kerberos *KerberosArguments `river:"kerberos,block,optional"`
129
}
130
131
// Convert converts args into the upstream type.
132
func (args AuthenticationArguments) Convert() kafkaexporter.Authentication {
133
var res kafkaexporter.Authentication
134
135
if args.Plaintext != nil {
136
conv := args.Plaintext.Convert()
137
res.PlainText = &conv
138
}
139
if args.SASL != nil {
140
conv := args.SASL.Convert()
141
res.SASL = &conv
142
}
143
if args.TLS != nil {
144
res.TLS = args.TLS.Convert()
145
}
146
if args.Kerberos != nil {
147
conv := args.Kerberos.Convert()
148
res.Kerberos = &conv
149
}
150
151
return res
152
}
153
154
// PlaintextArguments configures plaintext authentication against the Kafka
155
// broker.
156
type PlaintextArguments struct {
157
Username string `river:"username,attr"`
158
Password rivertypes.Secret `river:"password,attr"`
159
}
160
161
// Convert converts args into the upstream type.
162
func (args PlaintextArguments) Convert() kafkaexporter.PlainTextConfig {
163
return kafkaexporter.PlainTextConfig{
164
Username: args.Username,
165
Password: string(args.Password),
166
}
167
}
168
169
// SASLArguments configures SASL authentication against the Kafka broker.
170
type SASLArguments struct {
171
Username string `river:"username,attr"`
172
Password rivertypes.Secret `river:"password,attr"`
173
Mechanism string `river:"mechanism,attr"`
174
AWSMSK AWSMSKArguments `river:"aws_msk,block,optional"`
175
}
176
177
// Convert converts args into the upstream type.
178
func (args SASLArguments) Convert() kafkaexporter.SASLConfig {
179
return kafkaexporter.SASLConfig{
180
Username: args.Username,
181
Password: string(args.Password),
182
Mechanism: args.Mechanism,
183
AWSMSK: args.AWSMSK.Convert(),
184
}
185
}
186
187
// AWSMSKArguments exposes additional SASL authentication measures required to
188
// use the AWS_MSK_IAM mechanism.
189
type AWSMSKArguments struct {
190
Region string `river:"region,attr"`
191
BrokerAddr string `river:"broker_addr,attr"`
192
}
193
194
// Convert converts args into the upstream type.
195
func (args AWSMSKArguments) Convert() kafkaexporter.AWSMSKConfig {
196
return kafkaexporter.AWSMSKConfig{
197
Region: args.Region,
198
BrokerAddr: args.BrokerAddr,
199
}
200
}
201
202
// KerberosArguments configures Kerberos authentication against the Kafka
203
// broker.
204
type KerberosArguments struct {
205
ServiceName string `river:"service_name,attr,optional"`
206
Realm string `river:"realm,attr,optional"`
207
UseKeyTab bool `river:"use_keytab,attr,optional"`
208
Username string `river:"username,attr"`
209
Password rivertypes.Secret `river:"password,attr,optional"`
210
ConfigPath string `river:"config_file,attr,optional"`
211
KeyTabPath string `river:"keytab_file,attr,optional"`
212
}
213
214
// Convert converts args into the upstream type.
215
func (args KerberosArguments) Convert() kafkaexporter.KerberosConfig {
216
return kafkaexporter.KerberosConfig{
217
ServiceName: args.ServiceName,
218
Realm: args.Realm,
219
UseKeyTab: args.UseKeyTab,
220
Username: args.Username,
221
Password: string(args.Password),
222
ConfigPath: args.ConfigPath,
223
KeyTabPath: args.KeyTabPath,
224
}
225
}
226
227
// MetadataArguments configures how the otelcol.receiver.kafka component will
228
// retrieve metadata from the Kafka broker.
229
type MetadataArguments struct {
230
IncludeAllTopics bool `river:"include_all_topics,attr,optional"`
231
Retry MetadataRetryArguments `river:"retry,block,optional"`
232
}
233
234
// Convert converts args into the upstream type.
235
func (args MetadataArguments) Convert() kafkaexporter.Metadata {
236
return kafkaexporter.Metadata{
237
Full: args.IncludeAllTopics,
238
Retry: args.Retry.Convert(),
239
}
240
}
241
242
// MetadataRetryArguments configures how to retry retrieving metadata from the
243
// Kafka broker. Retrying is useful to avoid race conditions when the Kafka
244
// broker is starting at the same time as the otelcol.receiver.kafka component.
245
type MetadataRetryArguments struct {
246
MaxRetries int `river:"max_retries,attr,optional"`
247
Backoff time.Duration `river:"backoff,attr,optional"`
248
}
249
250
// Convert converts args into the upstream type.
251
func (args MetadataRetryArguments) Convert() kafkaexporter.MetadataRetry {
252
return kafkaexporter.MetadataRetry{
253
Max: args.MaxRetries,
254
Backoff: args.Backoff,
255
}
256
}
257
258
// AutoCommitArguments configures how to automatically commit updated topic
259
// offsets back to the Kafka broker.
260
type AutoCommitArguments struct {
261
Enable bool `river:"enable,attr,optional"`
262
Interval time.Duration `river:"interval,attr,optional"`
263
}
264
265
// Convert converts args into the upstream type.
266
func (args AutoCommitArguments) Convert() kafkareceiver.AutoCommit {
267
return kafkareceiver.AutoCommit{
268
Enable: args.Enable,
269
Interval: args.Interval,
270
}
271
}
272
273
// MessageMarkingArguments configures when Kafka messages are marked as read.
274
type MessageMarkingArguments struct {
275
AfterExecution bool `river:"after_execution,attr,optional"`
276
IncludeUnsuccessful bool `river:"include_unsuccessful,attr,optional"`
277
}
278
279
// Convert converts args into the upstream type.
280
func (args MessageMarkingArguments) Convert() kafkareceiver.MessageMarking {
281
return kafkareceiver.MessageMarking{
282
After: args.AfterExecution,
283
OnError: args.IncludeUnsuccessful,
284
}
285
}
286
287