Path: blob/main/component/loki/source/azure_event_hubs/azure_event_hubs.go
4096 views
package azure_event_hubs12import (3"context"4"fmt"5"net"6"sync"78"github.com/Shopify/sarama"9"github.com/go-kit/log/level"10"github.com/grafana/agent/component"11"github.com/grafana/agent/component/common/loki"12flow_relabel "github.com/grafana/agent/component/common/relabel"13"github.com/grafana/agent/component/loki/source/azure_event_hubs/internal/parser"14kt "github.com/grafana/agent/component/loki/source/internal/kafkatarget"15"github.com/grafana/dskit/flagext"1617"github.com/prometheus/common/model"18)1920func init() {21component.Register(component.Registration{22Name: "loki.source.azure_event_hubs",23Args: Arguments{},2425Build: func(opts component.Options, args component.Arguments) (component.Component, error) {26return New(opts, args.(Arguments))27},28})29}3031// Arguments holds values which are used to configure the loki.source.azure_event_hubs component.32type Arguments struct {33FullyQualifiedNamespace string `river:"fully_qualified_namespace,attr"`34EventHubs []string `river:"event_hubs,attr"`3536Authentication AzureEventHubsAuthentication `river:"authentication,block"`3738GroupID string `river:"group_id,attr,optional"`39UseIncomingTimestamp bool `river:"use_incoming_timestamp,attr,optional"`40DisallowCustomMessages bool `river:"disallow_custom_messages,attr,optional"`41RelabelRules flow_relabel.Rules `river:"relabel_rules,attr,optional"`42Labels map[string]string `river:"labels,attr,optional"`43Assignor string `river:"assignor,attr,optional"`4445ForwardTo []loki.LogsReceiver `river:"forward_to,attr"`46}4748// AzureEventHubsAuthentication describe the configuration for authentication with Azure Event Hub49type AzureEventHubsAuthentication struct {50Mechanism string `river:"mechanism,attr"`51Scopes []string `river:"scopes,attr,optional"`52ConnectionString string `river:"connection_string,attr,optional"`53}5455func getDefault() Arguments {56return Arguments{57GroupID: "loki.source.azure_event_hubs",58Labels: map[string]string{"job": "loki.source.azure_event_hubs"},59Assignor: "range",60}61}6263// UnmarshalRiver implements river.Unmarshaler.64func (a *Arguments) UnmarshalRiver(f func(interface{}) error) error {65*a = getDefault()66type arguments Arguments67if err := f((*arguments)(a)); err != nil {68return err69}70return a.validateAssignor()71}7273// New creates a new loki.source.azure_event_hubs component.74func New(o component.Options, args Arguments) (*Component, error) {75c := &Component{76mut: sync.RWMutex{},77opts: o,78handler: make(loki.LogsReceiver),79fanout: args.ForwardTo,80}8182// Call to Update() to start readers and set receivers once at the start.83if err := c.Update(args); err != nil {84return nil, err85}8687return c, nil88}8990// Component implements the loki.source.azure_event_hubs component.91type Component struct {92opts component.Options93mut sync.RWMutex94fanout []loki.LogsReceiver95handler loki.LogsReceiver96target *kt.TargetSyncer97}9899// Run implements component.Component.100func (c *Component) Run(ctx context.Context) error {101defer func() {102level.Info(c.opts.Logger).Log("msg", "loki.source.azure_event_hubs component shutting down, stopping the targets")103c.mut.RLock()104err := c.target.Stop()105if err != nil {106level.Error(c.opts.Logger).Log("msg", "error while stopping azure_event_hubs target", "err", err)107}108c.mut.RUnlock()109}()110111for {112select {113case <-ctx.Done():114return nil115case entry := <-c.handler:116c.mut.RLock()117for _, receiver := range c.fanout {118receiver <- entry119}120c.mut.RUnlock()121}122}123}124125const (126AuthenticationMechanismConnectionString = "connection_string"127AuthenticationMechanismOAuth = "oauth"128)129130// Update implements component.Component.131func (c *Component) Update(args component.Arguments) error {132c.mut.Lock()133defer c.mut.Unlock()134135newArgs := args.(Arguments)136c.fanout = newArgs.ForwardTo137138cfg, err := newArgs.Convert()139if err != nil {140return err141}142143entryHandler := loki.NewEntryHandler(c.handler, func() {})144t, err := kt.NewSyncer(c.opts.Registerer, c.opts.Logger, cfg, entryHandler, &parser.AzureEventHubsTargetMessageParser{145DisallowCustomMessages: newArgs.DisallowCustomMessages,146})147if err != nil {148return fmt.Errorf("error starting azure_event_hubs target: %w", err)149}150c.target = t151152return nil153}154155// Convert is used to bridge between the River and Promtail types.156func (a *Arguments) Convert() (kt.Config, error) {157lbls := make(model.LabelSet, len(a.Labels))158for k, v := range a.Labels {159lbls[model.LabelName(k)] = model.LabelValue(v)160}161162cfg := kt.Config{163RelabelConfigs: flow_relabel.ComponentToPromRelabelConfigs(a.RelabelRules),164KafkaConfig: kt.TargetConfig{165Brokers: []string{a.FullyQualifiedNamespace},166Topics: a.EventHubs,167Labels: lbls,168UseIncomingTimestamp: a.UseIncomingTimestamp,169GroupID: a.GroupID,170Version: sarama.V1_0_0_0.String(),171Assignor: a.Assignor,172},173}174switch a.Authentication.Mechanism {175case AuthenticationMechanismConnectionString:176if a.Authentication.ConnectionString == "" {177return kt.Config{}, fmt.Errorf("connection string is required when authentication mechanism is %s", a.Authentication.Mechanism)178}179cfg.KafkaConfig.Authentication = kt.Authentication{180Type: kt.AuthenticationTypeSASL,181SASLConfig: kt.SASLConfig{182UseTLS: true,183User: "$ConnectionString",184Password: flagext.SecretWithValue(a.Authentication.ConnectionString),185Mechanism: sarama.SASLTypePlaintext,186},187}188case AuthenticationMechanismOAuth:189if a.Authentication.Scopes == nil {190host, _, err := net.SplitHostPort(a.FullyQualifiedNamespace)191if err != nil {192return kt.Config{}, fmt.Errorf("unable to extract host from fully qualified namespace: %w", err)193}194a.Authentication.Scopes = []string{fmt.Sprintf("https://%s", host)}195}196197cfg.KafkaConfig.Authentication = kt.Authentication{198Type: kt.AuthenticationTypeSASL,199SASLConfig: kt.SASLConfig{200UseTLS: true,201Mechanism: sarama.SASLTypeOAuth,202OAuthConfig: kt.OAuthConfig{203TokenProvider: kt.TokenProviderTypeAzure,204Scopes: a.Authentication.Scopes,205},206},207}208default:209return kt.Config{}, fmt.Errorf("authentication mechanism %s is unsupported", a.Authentication.Mechanism)210}211return cfg, nil212}213214func (a *Arguments) validateAssignor() error {215validAssignors := []string{sarama.StickyBalanceStrategyName, sarama.RoundRobinBalanceStrategyName, sarama.RangeBalanceStrategyName}216for _, validAssignor := range validAssignors {217if a.Assignor == validAssignor {218return nil219}220}221return fmt.Errorf("assignor value %s is invalid, must be one of: %v", a.Assignor, validAssignors)222}223224225