Path: blob/main/component/loki/source/internal/kafkatarget/kafkatarget.go
4096 views
package kafkatarget12// This code is copied from Promtail. The kafkatarget package is used to3// configure and run the targets that can read kafka entries and forward them4// to other loki components.56import (7"fmt"8"time"910"github.com/Shopify/sarama"11"github.com/go-kit/log"12"github.com/go-kit/log/level"13"github.com/grafana/agent/component/common/loki"14"github.com/grafana/loki/clients/pkg/promtail/targets/target"15"github.com/prometheus/common/model"16"github.com/prometheus/prometheus/model/labels"17"github.com/prometheus/prometheus/model/relabel"18)1920type runnableDroppedTarget struct {21target.Target22runFn func()23}2425func (d *runnableDroppedTarget) run() {26d.runFn()27}2829type KafkaTarget struct {30logger log.Logger31discoveredLabels model.LabelSet32lbs model.LabelSet33details ConsumerDetails34claim sarama.ConsumerGroupClaim35session sarama.ConsumerGroupSession36client loki.EntryHandler37relabelConfig []*relabel.Config38useIncomingTimestamp bool39messageParser MessageParser40}4142func NewKafkaTarget(43logger log.Logger,44session sarama.ConsumerGroupSession,45claim sarama.ConsumerGroupClaim,46discoveredLabels, lbs model.LabelSet,47relabelConfig []*relabel.Config,48client loki.EntryHandler,49useIncomingTimestamp bool,50messageParser MessageParser,51) *KafkaTarget {5253return &KafkaTarget{54logger: logger,55discoveredLabels: discoveredLabels,56lbs: lbs,57details: newDetails(session, claim),58claim: claim,59session: session,60client: client,61relabelConfig: relabelConfig,62useIncomingTimestamp: useIncomingTimestamp,63messageParser: messageParser,64}65}6667const (68defaultKafkaMessageKey = "none"69labelKeyKafkaMessageKey = "__meta_kafka_message_key"70)7172func (t *KafkaTarget) run() {73defer t.client.Stop()74for message := range t.claim.Messages() {75mk := string(message.Key)76if len(mk) == 0 {77mk = defaultKafkaMessageKey78}7980// TODO: Possibly need to format after merging with discovered labels because we can specify multiple labels in source labels81// https://github.com/grafana/loki/pull/4745#discussion_r75002223482lbs := format([]labels.Label{{83Name: labelKeyKafkaMessageKey,84Value: mk,85}}, t.relabelConfig)8687out := t.lbs.Clone()88if len(lbs) > 0 {89out = out.Merge(lbs)90}91entries, err := t.messageParser.Parse(message, out, t.relabelConfig, t.useIncomingTimestamp)92if err != nil {93level.Error(t.logger).Log("msg", "message parsing error", "err", err)94} else {95for _, entry := range entries {96t.client.Chan() <- entry97}98}99100t.session.MarkMessage(message, "")101}102}103104func timestamp(useIncoming bool, incoming time.Time) time.Time {105if useIncoming {106return incoming107}108return time.Now()109}110111func (t *KafkaTarget) Type() target.TargetType {112return target.KafkaTargetType113}114115func (t *KafkaTarget) Ready() bool {116return true117}118119func (t *KafkaTarget) DiscoveredLabels() model.LabelSet {120return t.discoveredLabels121}122123func (t *KafkaTarget) Labels() model.LabelSet {124return t.lbs125}126127// Details returns target-specific details.128func (t *KafkaTarget) Details() interface{} {129return t.details130}131132type ConsumerDetails struct {133134// MemberID returns the cluster member ID.135MemberID string136137// GenerationID returns the current generation ID.138GenerationID int32139140Topic string141Partition int32142InitialOffset int64143}144145func (c ConsumerDetails) String() string {146return fmt.Sprintf("member_id=%s generation_id=%d topic=%s partition=%d initial_offset=%d", c.MemberID, c.GenerationID, c.Topic, c.Partition, c.InitialOffset)147}148149func newDetails(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) ConsumerDetails {150return ConsumerDetails{151MemberID: session.MemberID(),152GenerationID: session.GenerationID(),153Topic: claim.Topic(),154Partition: claim.Partition(),155InitialOffset: claim.InitialOffset(),156}157}158159160