Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/internal/kafkatarget/config.go
4096 views
1
package kafkatarget
2
3
import (
4
"github.com/Shopify/sarama"
5
"github.com/grafana/agent/component/common/loki"
6
"github.com/grafana/dskit/flagext"
7
promconfig "github.com/prometheus/common/config"
8
"github.com/prometheus/common/model"
9
"github.com/prometheus/prometheus/model/relabel"
10
)
11
12
// Config describes a job to scrape.
13
type Config struct {
14
KafkaConfig TargetConfig `mapstructure:"kafka,omitempty" yaml:"kafka,omitempty"`
15
RelabelConfigs []*relabel.Config `mapstructure:"relabel_configs,omitempty" yaml:"relabel_configs,omitempty"`
16
// List of Docker service discovery configurations.
17
}
18
19
type TargetConfig struct {
20
// Labels optionally holds labels to associate with each log line.
21
Labels model.LabelSet `yaml:"labels"`
22
23
// UseIncomingTimestamp sets the timestamp to the incoming kafka messages
24
// timestamp if it's set.
25
UseIncomingTimestamp bool `yaml:"use_incoming_timestamp"`
26
27
// The list of brokers to connect to kafka (Required).
28
Brokers []string `yaml:"brokers"`
29
30
// The consumer group id (Required).
31
GroupID string `yaml:"group_id"`
32
33
// Kafka Topics to consume (Required).
34
Topics []string `yaml:"topics"`
35
36
// Kafka version. Default to 2.2.1
37
Version string `yaml:"version"`
38
39
// Rebalancing strategy to use. (e.g. sticky, roundrobin or range)
40
Assignor string `yaml:"assignor"`
41
42
// Authentication strategy with Kafka brokers
43
Authentication Authentication `yaml:"authentication"`
44
45
MessageParser MessageParser
46
}
47
48
// AuthenticationType specifies method to authenticate with Kafka brokers
49
type AuthenticationType string
50
51
const (
52
// AuthenticationTypeNone represents using no authentication
53
AuthenticationTypeNone = "none"
54
// AuthenticationTypeSSL represents using SSL/TLS to authenticate
55
AuthenticationTypeSSL = "ssl"
56
// AuthenticationTypeSASL represents using SASL to authenticate
57
AuthenticationTypeSASL = "sasl"
58
)
59
60
// Authentication describe the configuration for authentication with Kafka brokers
61
type Authentication struct {
62
// Type is authentication type
63
// Possible values: none, sasl and ssl (defaults to none).
64
Type AuthenticationType `yaml:"type"`
65
66
// TLSConfig is used for TLS encryption and authentication with Kafka brokers
67
TLSConfig promconfig.TLSConfig `yaml:"tls_config,omitempty"`
68
69
// SASLConfig is used for SASL authentication with Kafka brokers
70
SASLConfig SASLConfig `yaml:"sasl_config,omitempty"`
71
}
72
73
// TokenProviderType specifies the provider used for resolving the access token
74
type TokenProviderType string
75
76
const (
77
// TokenProviderTypeAzure represents using the Azure as the token provider
78
TokenProviderTypeAzure TokenProviderType = "azure"
79
)
80
81
// KafkaSASLConfig describe the SASL configuration for authentication with Kafka brokers
82
type SASLConfig struct {
83
// SASL mechanism. Supports PLAIN, SCRAM-SHA-256 and SCRAM-SHA-512
84
Mechanism sarama.SASLMechanism `yaml:"mechanism"`
85
86
// SASL Username
87
User string `yaml:"user"`
88
89
// SASL Password for the User
90
Password flagext.Secret `yaml:"password"`
91
92
// UseTLS sets whether TLS is used with SASL
93
UseTLS bool `yaml:"use_tls"`
94
95
// TLSConfig is used for SASL over TLS. It is used only when UseTLS is true
96
TLSConfig promconfig.TLSConfig `yaml:",inline"`
97
98
// OAuthConfig is used for configuring the token provider
99
OAuthConfig OAuthConfig `yaml:"oauth_provider_config,omitempty"`
100
}
101
102
type OAuthConfig struct {
103
// TokenProvider is used for resolving the OAuth access token
104
TokenProvider TokenProviderType `yaml:"token_provider,omitempty"`
105
106
Scopes []string
107
}
108
109
// MessageParser defines parsing for each incoming message
110
type MessageParser interface {
111
Parse(message *sarama.ConsumerMessage, labels model.LabelSet, relabels []*relabel.Config, useIncomingTimestamp bool) ([]loki.Entry, error)
112
}
113
114