Path: blob/main/component/loki/source/internal/kafkatarget/consumer_test.go
4096 views
package kafkatarget12// This code is copied from Promtail. The kafkatarget package is used to3// configure and run the targets that can read kafka entries and forward them4// to other loki components.56import (7"context"8"errors"9"testing"10"time"1112"github.com/Shopify/sarama"13"github.com/go-kit/log"14"github.com/prometheus/common/model"15"github.com/stretchr/testify/require"1617"github.com/grafana/loki/clients/pkg/promtail/targets/target"18)1920type DiscovererFn func(sarama.ConsumerGroupSession, sarama.ConsumerGroupClaim) (RunnableTarget, error)2122func (d DiscovererFn) NewTarget(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) (RunnableTarget, error) {23return d(session, claim)24}2526type fakeTarget struct {27ctx context.Context28lbs model.LabelSet29}3031func (f *fakeTarget) run() { <-f.ctx.Done() }32func (f *fakeTarget) Type() target.TargetType { return "" }33func (f *fakeTarget) DiscoveredLabels() model.LabelSet { return nil }34func (f *fakeTarget) Labels() model.LabelSet { return f.lbs }35func (f *fakeTarget) Ready() bool { return true }36func (f *fakeTarget) Details() interface{} { return nil }3738func Test_ConsumerConsume(t *testing.T) {39var (40group = &testConsumerGroupHandler{}41session = &testSession{}42ctx, cancel = context.WithCancel(context.Background())43c = &consumer{44logger: log.NewNopLogger(),45ctx: context.Background(),46cancel: func() {},47ConsumerGroup: group,48discoverer: DiscovererFn(func(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) (RunnableTarget, error) {49if claim.Topic() != "dropped" {50return &fakeTarget{51ctx: ctx,52lbs: model.LabelSet{"topic": model.LabelValue(claim.Topic())},53}, nil54}55return &fakeTarget{56ctx: ctx,57}, nil58}),59}60)6162c.start(ctx, []string{"foo"})63require.Eventually(t, group.consuming.Load, 5*time.Second, 100*time.Microsecond)64require.NoError(t, group.handler.Setup(session))65go func() {66err := group.handler.ConsumeClaim(session, newTestClaim("foo", 1, 2))67require.NoError(t, err)68}()69go func() {70err := group.handler.ConsumeClaim(session, newTestClaim("bar", 1, 2))71require.NoError(t, err)72}()73go func() {74err := group.handler.ConsumeClaim(session, newTestClaim("dropped", 1, 2))75require.NoError(t, err)76}()77require.Eventually(t, func() bool {78return len(c.getActiveTargets()) == 279}, 2*time.Second, 100*time.Millisecond)80require.Eventually(t, func() bool {81return len(c.getDroppedTargets()) == 182}, 2*time.Second, 100*time.Millisecond)83err := group.handler.Cleanup(session)84require.NoError(t, err)85cancel()86c.stop()87}8889func Test_ConsumerRetry(t *testing.T) {90var (91group = &testConsumerGroupHandler{92returnErr: errors.New("foo"),93}94ctx, cancel = context.WithCancel(context.Background())95c = &consumer{96logger: log.NewNopLogger(),97ctx: context.Background(),98cancel: func() {},99ConsumerGroup: group,100}101)102defer cancel()103c.start(ctx, []string{"foo"})104<-time.After(2 * time.Second)105c.stop()106}107108109