Path: blob/main/component/loki/source/internal/kafkatarget/kafkatarget_test.go
4096 views
package kafkatarget12// This code is copied from Promtail. The kafkatarget package is used to3// configure and run the targets that can read kafka entries and forward them4// to other loki components.56import (7"context"8"fmt"9"sync"10"testing"11"time"1213"github.com/grafana/agent/component/common/loki/client/fake"1415"github.com/Shopify/sarama"16"github.com/prometheus/common/model"17"github.com/prometheus/prometheus/model/relabel"18"github.com/stretchr/testify/require"19"go.uber.org/atomic"20)2122// Consumergroup handler23type testConsumerGroupHandler struct {24handler sarama.ConsumerGroupHandler25ctx context.Context26topics []string2728returnErr error2930consuming atomic.Bool31mut sync.RWMutex32}3334func (c *testConsumerGroupHandler) Consume(ctx context.Context, topics []string, handler sarama.ConsumerGroupHandler) error {35if c.returnErr != nil {36return c.returnErr37}3839c.mut.Lock()4041c.ctx = ctx42c.topics = topics43c.handler = handler4445c.mut.Unlock()4647c.consuming.Store(true)48<-ctx.Done()49c.consuming.Store(false)50return nil51}5253func (c *testConsumerGroupHandler) GetTopics() []string {54c.mut.RLock()55defer c.mut.RUnlock()5657return c.topics58}5960func (c *testConsumerGroupHandler) Errors() <-chan error {61return nil62}6364func (c *testConsumerGroupHandler) Close() error {65return nil66}6768func (c *testConsumerGroupHandler) Pause(partitions map[string][]int32) {}69func (c *testConsumerGroupHandler) Resume(partitions map[string][]int32) {}70func (c *testConsumerGroupHandler) PauseAll() {}71func (c *testConsumerGroupHandler) ResumeAll() {}7273type testSession struct {74markedMessage []*sarama.ConsumerMessage75}7677func (s *testSession) Claims() map[string][]int32 { return nil }78func (s *testSession) MemberID() string { return "foo" }79func (s *testSession) GenerationID() int32 { return 10 }80func (s *testSession) MarkOffset(topic string, partition int32, offset int64, metadata string) {}81func (s *testSession) Commit() {}82func (s *testSession) ResetOffset(topic string, partition int32, offset int64, metadata string) {}83func (s *testSession) MarkMessage(msg *sarama.ConsumerMessage, metadata string) {84s.markedMessage = append(s.markedMessage, msg)85}86func (s *testSession) Context() context.Context { return context.Background() }8788type testClaim struct {89topic string90partition int3291offset int6492messages chan *sarama.ConsumerMessage93}9495func newTestClaim(topic string, partition int32, offset int64) *testClaim {96return &testClaim{97topic: topic,98partition: partition,99offset: offset,100messages: make(chan *sarama.ConsumerMessage),101}102}103104func (t *testClaim) Topic() string { return t.topic }105func (t *testClaim) Partition() int32 { return t.partition }106func (t *testClaim) InitialOffset() int64 { return t.offset }107func (t *testClaim) HighWaterMarkOffset() int64 { return 0 }108func (t *testClaim) Messages() <-chan *sarama.ConsumerMessage { return t.messages }109func (t *testClaim) Send(m *sarama.ConsumerMessage) {110t.messages <- m111}112113func (t *testClaim) Stop() {114close(t.messages)115}116117func Test_TargetRun(t *testing.T) {118tc := []struct {119name string120inMessageKey string121inLS model.LabelSet122inDiscoveredLS model.LabelSet123relabels []*relabel.Config124expectedLS model.LabelSet125}{126{127name: "no relabel config",128inMessageKey: "foo",129inDiscoveredLS: model.LabelSet{"__meta_kafka_foo": "bar"},130inLS: model.LabelSet{"buzz": "bazz"},131relabels: nil,132expectedLS: model.LabelSet{"buzz": "bazz"},133},134{135name: "message key with relabel config",136inMessageKey: "foo",137inDiscoveredLS: model.LabelSet{"__meta_kafka_foo": "bar"},138inLS: model.LabelSet{"buzz": "bazz"},139relabels: []*relabel.Config{140{141SourceLabels: model.LabelNames{"__meta_kafka_message_key"},142Regex: relabel.MustNewRegexp("(.*)"),143TargetLabel: "message_key",144Replacement: "$1",145Action: "replace",146},147},148expectedLS: model.LabelSet{"buzz": "bazz", "message_key": "foo"},149},150{151name: "no message key with relabel config",152inMessageKey: "",153inDiscoveredLS: model.LabelSet{"__meta_kafka_foo": "bar"},154inLS: model.LabelSet{"buzz": "bazz"},155relabels: []*relabel.Config{156{157SourceLabels: model.LabelNames{"__meta_kafka_message_key"},158Regex: relabel.MustNewRegexp("(.*)"),159TargetLabel: "message_key",160Replacement: "$1",161Action: "replace",162},163},164expectedLS: model.LabelSet{"buzz": "bazz", "message_key": "none"},165},166}167for _, tt := range tc {168t.Run(tt.name, func(t *testing.T) {169session, claim := &testSession{}, newTestClaim("footopic", 10, 12)170var closed bool171fc := fake.NewClient(172func() {173closed = true174},175)176177tg := NewKafkaTarget(nil, session, claim, tt.inDiscoveredLS, tt.inLS, tt.relabels, fc, true, &KafkaTargetMessageParser{})178179var wg sync.WaitGroup180wg.Add(1)181go func() {182defer wg.Done()183tg.run()184}()185186for i := 0; i < 10; i++ {187claim.Send(&sarama.ConsumerMessage{188Timestamp: time.Unix(0, int64(i)),189Value: []byte(fmt.Sprintf("%d", i)),190Key: []byte(tt.inMessageKey),191})192}193claim.Stop()194wg.Wait()195re := fc.Received()196197require.Len(t, session.markedMessage, 10)198require.Len(t, re, 10)199require.True(t, closed)200for _, e := range re {201require.Equal(t, tt.expectedLS.String(), e.Labels.String())202}203})204}205}206207208