Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/azure_event_hubs/azure_event_hubs.go
4096 views
1
package azure_event_hubs
2
3
import (
4
"context"
5
"fmt"
6
"net"
7
"sync"
8
9
"github.com/Shopify/sarama"
10
"github.com/go-kit/log/level"
11
"github.com/grafana/agent/component"
12
"github.com/grafana/agent/component/common/loki"
13
flow_relabel "github.com/grafana/agent/component/common/relabel"
14
"github.com/grafana/agent/component/loki/source/azure_event_hubs/internal/parser"
15
kt "github.com/grafana/agent/component/loki/source/internal/kafkatarget"
16
"github.com/grafana/dskit/flagext"
17
18
"github.com/prometheus/common/model"
19
)
20
21
func init() {
22
component.Register(component.Registration{
23
Name: "loki.source.azure_event_hubs",
24
Args: Arguments{},
25
26
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
27
return New(opts, args.(Arguments))
28
},
29
})
30
}
31
32
// Arguments holds values which are used to configure the loki.source.azure_event_hubs component.
33
type Arguments struct {
34
FullyQualifiedNamespace string `river:"fully_qualified_namespace,attr"`
35
EventHubs []string `river:"event_hubs,attr"`
36
37
Authentication AzureEventHubsAuthentication `river:"authentication,block"`
38
39
GroupID string `river:"group_id,attr,optional"`
40
UseIncomingTimestamp bool `river:"use_incoming_timestamp,attr,optional"`
41
DisallowCustomMessages bool `river:"disallow_custom_messages,attr,optional"`
42
RelabelRules flow_relabel.Rules `river:"relabel_rules,attr,optional"`
43
Labels map[string]string `river:"labels,attr,optional"`
44
Assignor string `river:"assignor,attr,optional"`
45
46
ForwardTo []loki.LogsReceiver `river:"forward_to,attr"`
47
}
48
49
// AzureEventHubsAuthentication describe the configuration for authentication with Azure Event Hub
50
type AzureEventHubsAuthentication struct {
51
Mechanism string `river:"mechanism,attr"`
52
Scopes []string `river:"scopes,attr,optional"`
53
ConnectionString string `river:"connection_string,attr,optional"`
54
}
55
56
func getDefault() Arguments {
57
return Arguments{
58
GroupID: "loki.source.azure_event_hubs",
59
Labels: map[string]string{"job": "loki.source.azure_event_hubs"},
60
Assignor: "range",
61
}
62
}
63
64
// UnmarshalRiver implements river.Unmarshaler.
65
func (a *Arguments) UnmarshalRiver(f func(interface{}) error) error {
66
*a = getDefault()
67
type arguments Arguments
68
if err := f((*arguments)(a)); err != nil {
69
return err
70
}
71
return a.validateAssignor()
72
}
73
74
// New creates a new loki.source.azure_event_hubs component.
75
func New(o component.Options, args Arguments) (*Component, error) {
76
c := &Component{
77
mut: sync.RWMutex{},
78
opts: o,
79
handler: make(loki.LogsReceiver),
80
fanout: args.ForwardTo,
81
}
82
83
// Call to Update() to start readers and set receivers once at the start.
84
if err := c.Update(args); err != nil {
85
return nil, err
86
}
87
88
return c, nil
89
}
90
91
// Component implements the loki.source.azure_event_hubs component.
92
type Component struct {
93
opts component.Options
94
mut sync.RWMutex
95
fanout []loki.LogsReceiver
96
handler loki.LogsReceiver
97
target *kt.TargetSyncer
98
}
99
100
// Run implements component.Component.
101
func (c *Component) Run(ctx context.Context) error {
102
defer func() {
103
level.Info(c.opts.Logger).Log("msg", "loki.source.azure_event_hubs component shutting down, stopping the targets")
104
c.mut.RLock()
105
err := c.target.Stop()
106
if err != nil {
107
level.Error(c.opts.Logger).Log("msg", "error while stopping azure_event_hubs target", "err", err)
108
}
109
c.mut.RUnlock()
110
}()
111
112
for {
113
select {
114
case <-ctx.Done():
115
return nil
116
case entry := <-c.handler:
117
c.mut.RLock()
118
for _, receiver := range c.fanout {
119
receiver <- entry
120
}
121
c.mut.RUnlock()
122
}
123
}
124
}
125
126
const (
127
AuthenticationMechanismConnectionString = "connection_string"
128
AuthenticationMechanismOAuth = "oauth"
129
)
130
131
// Update implements component.Component.
132
func (c *Component) Update(args component.Arguments) error {
133
c.mut.Lock()
134
defer c.mut.Unlock()
135
136
newArgs := args.(Arguments)
137
c.fanout = newArgs.ForwardTo
138
139
cfg, err := newArgs.Convert()
140
if err != nil {
141
return err
142
}
143
144
entryHandler := loki.NewEntryHandler(c.handler, func() {})
145
t, err := kt.NewSyncer(c.opts.Registerer, c.opts.Logger, cfg, entryHandler, &parser.AzureEventHubsTargetMessageParser{
146
DisallowCustomMessages: newArgs.DisallowCustomMessages,
147
})
148
if err != nil {
149
return fmt.Errorf("error starting azure_event_hubs target: %w", err)
150
}
151
c.target = t
152
153
return nil
154
}
155
156
// Convert is used to bridge between the River and Promtail types.
157
func (a *Arguments) Convert() (kt.Config, error) {
158
lbls := make(model.LabelSet, len(a.Labels))
159
for k, v := range a.Labels {
160
lbls[model.LabelName(k)] = model.LabelValue(v)
161
}
162
163
cfg := kt.Config{
164
RelabelConfigs: flow_relabel.ComponentToPromRelabelConfigs(a.RelabelRules),
165
KafkaConfig: kt.TargetConfig{
166
Brokers: []string{a.FullyQualifiedNamespace},
167
Topics: a.EventHubs,
168
Labels: lbls,
169
UseIncomingTimestamp: a.UseIncomingTimestamp,
170
GroupID: a.GroupID,
171
Version: sarama.V1_0_0_0.String(),
172
Assignor: a.Assignor,
173
},
174
}
175
switch a.Authentication.Mechanism {
176
case AuthenticationMechanismConnectionString:
177
if a.Authentication.ConnectionString == "" {
178
return kt.Config{}, fmt.Errorf("connection string is required when authentication mechanism is %s", a.Authentication.Mechanism)
179
}
180
cfg.KafkaConfig.Authentication = kt.Authentication{
181
Type: kt.AuthenticationTypeSASL,
182
SASLConfig: kt.SASLConfig{
183
UseTLS: true,
184
User: "$ConnectionString",
185
Password: flagext.SecretWithValue(a.Authentication.ConnectionString),
186
Mechanism: sarama.SASLTypePlaintext,
187
},
188
}
189
case AuthenticationMechanismOAuth:
190
if a.Authentication.Scopes == nil {
191
host, _, err := net.SplitHostPort(a.FullyQualifiedNamespace)
192
if err != nil {
193
return kt.Config{}, fmt.Errorf("unable to extract host from fully qualified namespace: %w", err)
194
}
195
a.Authentication.Scopes = []string{fmt.Sprintf("https://%s", host)}
196
}
197
198
cfg.KafkaConfig.Authentication = kt.Authentication{
199
Type: kt.AuthenticationTypeSASL,
200
SASLConfig: kt.SASLConfig{
201
UseTLS: true,
202
Mechanism: sarama.SASLTypeOAuth,
203
OAuthConfig: kt.OAuthConfig{
204
TokenProvider: kt.TokenProviderTypeAzure,
205
Scopes: a.Authentication.Scopes,
206
},
207
},
208
}
209
default:
210
return kt.Config{}, fmt.Errorf("authentication mechanism %s is unsupported", a.Authentication.Mechanism)
211
}
212
return cfg, nil
213
}
214
215
func (a *Arguments) validateAssignor() error {
216
validAssignors := []string{sarama.StickyBalanceStrategyName, sarama.RoundRobinBalanceStrategyName, sarama.RangeBalanceStrategyName}
217
for _, validAssignor := range validAssignors {
218
if a.Assignor == validAssignor {
219
return nil
220
}
221
}
222
return fmt.Errorf("assignor value %s is invalid, must be one of: %v", a.Assignor, validAssignors)
223
}
224
225