Path: blob/main/component/otelcol/receiver/kafka/kafka.go
4096 views
// Package kafka provides an otelcol.receiver.kafka component.1package kafka23import (4"time"56"github.com/grafana/agent/component"7"github.com/grafana/agent/component/otelcol"8"github.com/grafana/agent/component/otelcol/receiver"9"github.com/grafana/agent/pkg/river"10"github.com/grafana/agent/pkg/river/rivertypes"11"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"12"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver"13otelcomponent "go.opentelemetry.io/collector/component"14otelconfig "go.opentelemetry.io/collector/config"15)1617func init() {18component.Register(component.Registration{19Name: "otelcol.receiver.kafka",20Args: Arguments{},2122Build: func(opts component.Options, args component.Arguments) (component.Component, error) {23fact := kafkareceiver.NewFactory()24return receiver.New(opts, fact, args.(Arguments))25},26})27}2829// Arguments configures the otelcol.receiver.kafka component.30type Arguments struct {31Brokers []string `river:"brokers,attr"`32ProtocolVersion string `river:"protocol_version,attr"`33Topic string `river:"topic,attr,optional"`34Encoding string `river:"encoding,attr,optional"`35GroupID string `river:"group_id,attr,optional"`36ClientID string `river:"client_id,attr,optional"`3738Authentication AuthenticationArguments `river:"authentication,block,optional"`39Metadata MetadataArguments `river:"metadata,block,optional"`40AutoCommit AutoCommitArguments `river:"autocommit,block,optional"`41MessageMarking MessageMarkingArguments `river:"message_marking,block,optional"`4243// Output configures where to send received data. Required.44Output *otelcol.ConsumerArguments `river:"output,block"`45}4647var (48_ river.Unmarshaler = (*Arguments)(nil)49_ receiver.Arguments = Arguments{}50)5152// DefaultArguments holds default values for Arguments.53var DefaultArguments = Arguments{54// We use the defaults from the upstream OpenTelemetry Collector component55// for compatibility, even though that means using a client and group ID of56// "otel-collector".5758Topic: "otlp_spans",59Encoding: "otlp_proto",60Brokers: []string{"localhost:9092"},61ClientID: "otel-collector",62GroupID: "otel-collector",63Metadata: MetadataArguments{64IncludeAllTopics: true,65Retry: MetadataRetryArguments{66MaxRetries: 3,67Backoff: 250 * time.Millisecond,68},69},70AutoCommit: AutoCommitArguments{71Enable: true,72Interval: time.Second,73},74MessageMarking: MessageMarkingArguments{75AfterExecution: false,76IncludeUnsuccessful: false,77},78}7980// UnmarshalRiver implements river.Unmarshaler and applies default settings.81func (args *Arguments) UnmarshalRiver(f func(interface{}) error) error {82*args = DefaultArguments8384type arguments Arguments85return f((*arguments)(args))86}8788// Convert implements receiver.Arguments.89func (args Arguments) Convert() (otelconfig.Receiver, error) {90return &kafkareceiver.Config{91ReceiverSettings: otelconfig.NewReceiverSettings(otelconfig.NewComponentID("kafka")),9293Brokers: args.Brokers,94ProtocolVersion: args.ProtocolVersion,95Topic: args.Topic,96Encoding: args.Encoding,97GroupID: args.GroupID,98ClientID: args.ClientID,99100Authentication: args.Authentication.Convert(),101Metadata: args.Metadata.Convert(),102AutoCommit: args.AutoCommit.Convert(),103MessageMarking: args.MessageMarking.Convert(),104}, nil105}106107// Extensions implements receiver.Arguments.108func (args Arguments) Extensions() map[otelconfig.ComponentID]otelcomponent.Extension {109return nil110}111112// Exporters implements receiver.Arguments.113func (args Arguments) Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter {114return nil115}116117// NextConsumers implements receiver.Arguments.118func (args Arguments) NextConsumers() *otelcol.ConsumerArguments {119return args.Output120}121122// AuthenticationArguments configures how to authenticate to the Kafka broker.123type AuthenticationArguments struct {124Plaintext *PlaintextArguments `river:"plaintext,block,optional"`125SASL *SASLArguments `river:"sasl,block,optional"`126TLS *otelcol.TLSClientArguments `river:"tls,block,optional"`127Kerberos *KerberosArguments `river:"kerberos,block,optional"`128}129130// Convert converts args into the upstream type.131func (args AuthenticationArguments) Convert() kafkaexporter.Authentication {132var res kafkaexporter.Authentication133134if args.Plaintext != nil {135conv := args.Plaintext.Convert()136res.PlainText = &conv137}138if args.SASL != nil {139conv := args.SASL.Convert()140res.SASL = &conv141}142if args.TLS != nil {143res.TLS = args.TLS.Convert()144}145if args.Kerberos != nil {146conv := args.Kerberos.Convert()147res.Kerberos = &conv148}149150return res151}152153// PlaintextArguments configures plaintext authentication against the Kafka154// broker.155type PlaintextArguments struct {156Username string `river:"username,attr"`157Password rivertypes.Secret `river:"password,attr"`158}159160// Convert converts args into the upstream type.161func (args PlaintextArguments) Convert() kafkaexporter.PlainTextConfig {162return kafkaexporter.PlainTextConfig{163Username: args.Username,164Password: string(args.Password),165}166}167168// SASLArguments configures SASL authentication against the Kafka broker.169type SASLArguments struct {170Username string `river:"username,attr"`171Password rivertypes.Secret `river:"password,attr"`172Mechanism string `river:"mechanism,attr"`173AWSMSK AWSMSKArguments `river:"aws_msk,block,optional"`174}175176// Convert converts args into the upstream type.177func (args SASLArguments) Convert() kafkaexporter.SASLConfig {178return kafkaexporter.SASLConfig{179Username: args.Username,180Password: string(args.Password),181Mechanism: args.Mechanism,182AWSMSK: args.AWSMSK.Convert(),183}184}185186// AWSMSKArguments exposes additional SASL authentication measures required to187// use the AWS_MSK_IAM mechanism.188type AWSMSKArguments struct {189Region string `river:"region,attr"`190BrokerAddr string `river:"broker_addr,attr"`191}192193// Convert converts args into the upstream type.194func (args AWSMSKArguments) Convert() kafkaexporter.AWSMSKConfig {195return kafkaexporter.AWSMSKConfig{196Region: args.Region,197BrokerAddr: args.BrokerAddr,198}199}200201// KerberosArguments configures Kerberos authentication against the Kafka202// broker.203type KerberosArguments struct {204ServiceName string `river:"service_name,attr,optional"`205Realm string `river:"realm,attr,optional"`206UseKeyTab bool `river:"use_keytab,attr,optional"`207Username string `river:"username,attr"`208Password rivertypes.Secret `river:"password,attr,optional"`209ConfigPath string `river:"config_file,attr,optional"`210KeyTabPath string `river:"keytab_file,attr,optional"`211}212213// Convert converts args into the upstream type.214func (args KerberosArguments) Convert() kafkaexporter.KerberosConfig {215return kafkaexporter.KerberosConfig{216ServiceName: args.ServiceName,217Realm: args.Realm,218UseKeyTab: args.UseKeyTab,219Username: args.Username,220Password: string(args.Password),221ConfigPath: args.ConfigPath,222KeyTabPath: args.KeyTabPath,223}224}225226// MetadataArguments configures how the otelcol.receiver.kafka component will227// retrieve metadata from the Kafka broker.228type MetadataArguments struct {229IncludeAllTopics bool `river:"include_all_topics,attr,optional"`230Retry MetadataRetryArguments `river:"retry,block,optional"`231}232233// Convert converts args into the upstream type.234func (args MetadataArguments) Convert() kafkaexporter.Metadata {235return kafkaexporter.Metadata{236Full: args.IncludeAllTopics,237Retry: args.Retry.Convert(),238}239}240241// MetadataRetryArguments configures how to retry retrieving metadata from the242// Kafka broker. Retrying is useful to avoid race conditions when the Kafka243// broker is starting at the same time as the otelcol.receiver.kafka component.244type MetadataRetryArguments struct {245MaxRetries int `river:"max_retries,attr,optional"`246Backoff time.Duration `river:"backoff,attr,optional"`247}248249// Convert converts args into the upstream type.250func (args MetadataRetryArguments) Convert() kafkaexporter.MetadataRetry {251return kafkaexporter.MetadataRetry{252Max: args.MaxRetries,253Backoff: args.Backoff,254}255}256257// AutoCommitArguments configures how to automatically commit updated topic258// offsets back to the Kafka broker.259type AutoCommitArguments struct {260Enable bool `river:"enable,attr,optional"`261Interval time.Duration `river:"interval,attr,optional"`262}263264// Convert converts args into the upstream type.265func (args AutoCommitArguments) Convert() kafkareceiver.AutoCommit {266return kafkareceiver.AutoCommit{267Enable: args.Enable,268Interval: args.Interval,269}270}271272// MessageMarkingArguments configures when Kafka messages are marked as read.273type MessageMarkingArguments struct {274AfterExecution bool `river:"after_execution,attr,optional"`275IncludeUnsuccessful bool `river:"include_unsuccessful,attr,optional"`276}277278// Convert converts args into the upstream type.279func (args MessageMarkingArguments) Convert() kafkareceiver.MessageMarking {280return kafkareceiver.MessageMarking{281After: args.AfterExecution,282OnError: args.IncludeUnsuccessful,283}284}285286287