Path: blob/main/component/loki/source/internal/kafkatarget/target_syncer_test.go
4096 views
package kafkatarget12import (3"context"4"fmt"5"reflect"6"testing"7"time"89"github.com/grafana/agent/component/common/loki/client/fake"1011"github.com/grafana/dskit/flagext"12"github.com/prometheus/common/config"1314"github.com/Shopify/sarama"15"github.com/go-kit/log"16"github.com/prometheus/client_golang/prometheus"17"github.com/prometheus/common/model"18"github.com/prometheus/prometheus/model/relabel"19"github.com/stretchr/testify/assert"20"github.com/stretchr/testify/require"21)2223func Test_TopicDiscovery(t *testing.T) {24ctx, cancel := context.WithCancel(context.Background())25group := &testConsumerGroupHandler{}26TopicPollInterval = time.Microsecond27var closed bool28client := &mockKafkaClient{29topics: []string{"topic1"},30}31ts := &TargetSyncer{32ctx: ctx,33cancel: cancel,34logger: log.NewNopLogger(),35reg: prometheus.DefaultRegisterer,36topicManager: mustNewTopicsManager(client, []string{"topic1", "topic2"}),37close: func() error {38closed = true39return nil40},41consumer: consumer{42ctx: context.Background(),43cancel: func() {},44ConsumerGroup: group,45logger: log.NewNopLogger(),46discoverer: DiscovererFn(func(s sarama.ConsumerGroupSession, c sarama.ConsumerGroupClaim) (RunnableTarget, error) {47return nil, nil48}),49},50cfg: Config{51RelabelConfigs: []*relabel.Config{},52KafkaConfig: TargetConfig{53UseIncomingTimestamp: true,54Topics: []string{"topic1", "topic2"},55},56},57}5859ts.loop()6061require.Eventually(t, func() bool {62group.mut.Lock()63if !group.consuming.Load() {64return false65}66group.mut.Unlock()67return reflect.DeepEqual([]string{"topic1"}, group.GetTopics())68}, time.Second, time.Millisecond, "expected topics: %v, got: %v", []string{"topic1"}, group.GetTopics())6970client.UpdateTopics([]string{"topic1", "topic2"})7172require.Eventually(t, func() bool {73group.mut.Lock()74if !group.consuming.Load() {75return false76}77group.mut.Unlock()78return reflect.DeepEqual([]string{"topic1", "topic2"}, group.GetTopics())79}, time.Second, time.Millisecond, "expected topics: %v, got: %v", []string{"topic1", "topic2"}, group.GetTopics())8081require.NoError(t, ts.Stop())82require.True(t, closed)83}8485func Test_NewTarget(t *testing.T) {86ts := &TargetSyncer{87logger: log.NewNopLogger(),88reg: prometheus.DefaultRegisterer,89client: fake.NewClient(func() {}),90cfg: Config{91RelabelConfigs: []*relabel.Config{92{93SourceLabels: model.LabelNames{"__meta_kafka_topic"},94TargetLabel: "topic",95Replacement: "$1",96Action: relabel.Replace,97Regex: relabel.MustNewRegexp("(.*)"),98},99},100KafkaConfig: TargetConfig{101UseIncomingTimestamp: true,102GroupID: "group_1",103Topics: []string{"topic1", "topic2"},104Labels: model.LabelSet{"static": "static1"},105},106},107}108tg, err := ts.NewTarget(&testSession{}, newTestClaim("foo", 10, 1))109110require.NoError(t, err)111require.Equal(t, ConsumerDetails{112MemberID: "foo",113GenerationID: 10,114Topic: "foo",115Partition: 10,116InitialOffset: 1,117}, tg.Details())118require.Equal(t, model.LabelSet{"static": "static1", "topic": "foo"}, tg.Labels())119require.Equal(t, model.LabelSet{"__meta_kafka_member_id": "foo", "__meta_kafka_partition": "10", "__meta_kafka_topic": "foo", "__meta_kafka_group_id": "group_1"}, tg.DiscoveredLabels())120}121122func Test_NewDroppedTarget(t *testing.T) {123ts := &TargetSyncer{124logger: log.NewNopLogger(),125reg: prometheus.DefaultRegisterer,126cfg: Config{127KafkaConfig: TargetConfig{128UseIncomingTimestamp: true,129GroupID: "group1",130Topics: []string{"topic1", "topic2"},131},132},133}134tg, err := ts.NewTarget(&testSession{}, newTestClaim("foo", 10, 1))135136require.NoError(t, err)137require.Equal(t, "dropping target, no labels", tg.Details())138require.Equal(t, model.LabelSet(nil), tg.Labels())139require.Equal(t, model.LabelSet{"__meta_kafka_member_id": "foo", "__meta_kafka_partition": "10", "__meta_kafka_topic": "foo", "__meta_kafka_group_id": "group1"}, tg.DiscoveredLabels())140}141142func Test_validateConfig(t *testing.T) {143tests := []struct {144cfg *Config145wantErr bool146expected *Config147}{148{149&Config{150KafkaConfig: TargetConfig{151GroupID: "foo",152Topics: []string{"bar"},153},154},155true,156nil,157},158{159&Config{160KafkaConfig: TargetConfig{161Brokers: []string{"foo"},162GroupID: "bar",163},164},165true,166nil,167},168{169&Config{170KafkaConfig: TargetConfig{171Brokers: []string{"foo"},172},173},174true,175nil,176},177{178&Config{179KafkaConfig: TargetConfig{180Brokers: []string{"foo"},181Topics: []string{"bar"},182},183},184false,185&Config{186KafkaConfig: TargetConfig{187Brokers: []string{"foo"},188Topics: []string{"bar"},189GroupID: "promtail",190Version: "2.1.1",191},192},193},194}195196for i, tt := range tests {197tt := tt198t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {199err := validateConfig(tt.cfg)200if (err != nil) != tt.wantErr {201t.Errorf("validateConfig() error = %v, wantErr %v", err, tt.wantErr)202}203if err == nil {204require.Equal(t, tt.expected, tt.cfg)205}206})207}208}209210func Test_withAuthentication(t *testing.T) {211var (212tlsConf = config.TLSConfig{213CAFile: "testdata/example.com.ca.pem",214CertFile: "testdata/example.com.pem",215KeyFile: "testdata/example.com-key.pem",216ServerName: "example.com",217InsecureSkipVerify: true,218}219expectedTLSConf, _ = config.NewTLSConfig(&config.TLSConfig{220CAFile: "testdata/example.com.ca.pem",221CertFile: "testdata/example.com.pem",222KeyFile: "testdata/example.com-key.pem",223ServerName: "example.com",224InsecureSkipVerify: true,225})226cfg = sarama.NewConfig()227)228229// no authentication230noAuthCfg, err := withAuthentication(*cfg, Authentication{231Type: AuthenticationTypeNone,232})233assert.Nil(t, err)234assert.Equal(t, false, noAuthCfg.Net.TLS.Enable)235assert.Equal(t, false, noAuthCfg.Net.SASL.Enable)236assert.NoError(t, noAuthCfg.Validate())237238// specify unsupported auth type239illegalAuthTypeCfg, err := withAuthentication(*cfg, Authentication{240Type: "illegal",241})242assert.NotNil(t, err)243assert.Nil(t, illegalAuthTypeCfg)244245// mTLS authentication246mTLSCfg, err := withAuthentication(*cfg, Authentication{247Type: AuthenticationTypeSSL,248TLSConfig: tlsConf,249})250assert.Nil(t, err)251assert.Equal(t, true, mTLSCfg.Net.TLS.Enable)252assert.NotNil(t, mTLSCfg.Net.TLS.Config)253assert.Equal(t, "example.com", mTLSCfg.Net.TLS.Config.ServerName)254assert.Equal(t, true, mTLSCfg.Net.TLS.Config.InsecureSkipVerify)255assert.Equal(t, expectedTLSConf.Certificates, mTLSCfg.Net.TLS.Config.Certificates)256assert.NotNil(t, mTLSCfg.Net.TLS.Config.RootCAs)257assert.NoError(t, mTLSCfg.Validate())258259// mTLS authentication expect ignore sasl260mTLSCfg, err = withAuthentication(*cfg, Authentication{261Type: AuthenticationTypeSSL,262TLSConfig: tlsConf,263SASLConfig: SASLConfig{264Mechanism: sarama.SASLTypeSCRAMSHA256,265User: "user",266Password: flagext.SecretWithValue("pass"),267UseTLS: false,268},269})270assert.Nil(t, err)271assert.Equal(t, false, mTLSCfg.Net.SASL.Enable)272273// SASL/PLAIN274saslCfg, err := withAuthentication(*cfg, Authentication{275Type: AuthenticationTypeSASL,276SASLConfig: SASLConfig{277Mechanism: sarama.SASLTypePlaintext,278User: "user",279Password: flagext.SecretWithValue("pass"),280},281})282assert.Nil(t, err)283assert.Equal(t, false, saslCfg.Net.TLS.Enable)284assert.Equal(t, true, saslCfg.Net.SASL.Enable)285assert.Equal(t, "user", saslCfg.Net.SASL.User)286assert.Equal(t, "pass", saslCfg.Net.SASL.Password)287assert.Equal(t, sarama.SASLTypePlaintext, string(saslCfg.Net.SASL.Mechanism))288assert.NoError(t, saslCfg.Validate())289290// SASL/SCRAM291saslCfg, err = withAuthentication(*cfg, Authentication{292Type: AuthenticationTypeSASL,293SASLConfig: SASLConfig{294Mechanism: sarama.SASLTypeSCRAMSHA512,295User: "user",296Password: flagext.SecretWithValue("pass"),297},298})299assert.Nil(t, err)300assert.Equal(t, false, saslCfg.Net.TLS.Enable)301assert.Equal(t, true, saslCfg.Net.SASL.Enable)302assert.Equal(t, "user", saslCfg.Net.SASL.User)303assert.Equal(t, "pass", saslCfg.Net.SASL.Password)304assert.Equal(t, sarama.SASLTypeSCRAMSHA512, string(saslCfg.Net.SASL.Mechanism))305assert.NoError(t, saslCfg.Validate())306307// SASL unsupported mechanism308_, err = withAuthentication(*cfg, Authentication{309Type: AuthenticationTypeSASL,310SASLConfig: SASLConfig{311Mechanism: sarama.SASLTypeGSSAPI,312User: "user",313Password: flagext.SecretWithValue("pass"),314},315})316assert.Error(t, err)317assert.Equal(t, err.Error(), "error unsupported sasl mechanism: GSSAPI")318319// SASL over TLS320saslCfg, err = withAuthentication(*cfg, Authentication{321Type: AuthenticationTypeSASL,322SASLConfig: SASLConfig{323Mechanism: sarama.SASLTypeSCRAMSHA512,324User: "user",325Password: flagext.SecretWithValue("pass"),326UseTLS: true,327TLSConfig: tlsConf,328},329})330assert.Nil(t, err)331assert.Equal(t, true, saslCfg.Net.TLS.Enable)332assert.Equal(t, true, saslCfg.Net.SASL.Enable)333assert.NotNil(t, saslCfg.Net.TLS.Config)334assert.Equal(t, "example.com", saslCfg.Net.TLS.Config.ServerName)335assert.Equal(t, true, saslCfg.Net.TLS.Config.InsecureSkipVerify)336assert.Equal(t, expectedTLSConf.Certificates, saslCfg.Net.TLS.Config.Certificates)337assert.NotNil(t, saslCfg.Net.TLS.Config.RootCAs)338assert.NoError(t, saslCfg.Validate())339}340341342