Path: blob/main/component/loki/source/gcplog/internal/gcplogtarget/pull_target.go
4096 views
package gcplogtarget12// This code is copied from Promtail. The gcplogtarget package is used to3// configure and run the targets that can read log entries from cloud resource4// logs like bucket logs, load balancer logs, and Kubernetes cluster logs5// from GCP.67import (8"context"9"io"10"sync"11"time"1213"cloud.google.com/go/pubsub"14"github.com/go-kit/log"15"github.com/go-kit/log/level"16"github.com/grafana/agent/component/common/loki"17"github.com/grafana/dskit/backoff"18"github.com/prometheus/common/model"19"github.com/prometheus/prometheus/model/relabel"20"google.golang.org/api/option"21)2223// PullTarget represents a target that scrapes logs from a GCP project id and24// subscription and converts them to Loki log entries.25type PullTarget struct {26metrics *Metrics27logger log.Logger28handler loki.EntryHandler29config *PullConfig30relabelConfig []*relabel.Config31jobName string3233// lifecycle management34ctx context.Context35cancel context.CancelFunc36wg sync.WaitGroup37backoff *backoff.Backoff3839// pubsub40ps io.Closer41sub pubsubSubscription42msgs chan *pubsub.Message43}4445// TODO(@tpaschalis) Expose this as River configuration in the future.46var defaultBackoff = backoff.Config{47MinBackoff: 1 * time.Second,48MaxBackoff: 10 * time.Second,49MaxRetries: 0, // Retry forever50}5152// pubsubSubscription allows us to mock pubsub for testing53type pubsubSubscription interface {54Receive(ctx context.Context, f func(context.Context, *pubsub.Message)) error55}5657// NewPullTarget returns the new instance of PullTarget.58func NewPullTarget(metrics *Metrics, logger log.Logger, handler loki.EntryHandler, jobName string, config *PullConfig, relabel []*relabel.Config, clientOptions ...option.ClientOption) (*PullTarget, error) {59ctx, cancel := context.WithCancel(context.Background())60ps, err := pubsub.NewClient(ctx, config.ProjectID, clientOptions...)61if err != nil {62cancel()63return nil, err64}6566target := &PullTarget{67metrics: metrics,68logger: logger,69handler: handler,70relabelConfig: relabel,71config: config,72jobName: jobName,73ctx: ctx,74cancel: cancel,75ps: ps,76sub: ps.SubscriptionInProject(config.Subscription, config.ProjectID),77backoff: backoff.New(ctx, defaultBackoff),78msgs: make(chan *pubsub.Message),79}8081go func() {82err := target.run()83if err != nil {84level.Error(logger).Log("msg", "loki.source.gcplog pull target shutdown with error", "err", err)85}86}()8788return target, nil89}9091func (t *PullTarget) run() error {92t.wg.Add(1)93defer t.wg.Done()9495go t.consumeSubscription()9697lbls := make(model.LabelSet, len(t.config.Labels))98for k, v := range t.config.Labels {99lbls[model.LabelName(k)] = model.LabelValue(v)100}101102for {103select {104case <-t.ctx.Done():105return t.ctx.Err()106case m := <-t.msgs:107entry, err := parseGCPLogsEntry(m.Data, lbls, nil, t.config.UseIncomingTimestamp, t.relabelConfig)108if err != nil {109level.Error(t.logger).Log("event", "error formating log entry", "cause", err)110m.Ack()111break112}113t.handler.Chan() <- entry114m.Ack() // Ack only after log is sent.115t.metrics.gcplogEntries.WithLabelValues(t.config.ProjectID).Inc()116}117}118}119120func (t *PullTarget) consumeSubscription() {121// NOTE(kavi): `cancel` the context as exiting from this goroutine should stop main `run` loop122// It makesense as no more messages will be received.123defer t.cancel()124125for t.backoff.Ongoing() {126err := t.sub.Receive(t.ctx, func(ctx context.Context, m *pubsub.Message) {127t.msgs <- m128t.backoff.Reset()129})130if err != nil {131level.Error(t.logger).Log("msg", "failed to receive pubsub messages", "error", err)132t.metrics.gcplogErrors.WithLabelValues(t.config.ProjectID).Inc()133t.metrics.gcplogTargetLastSuccessScrape.WithLabelValues(t.config.ProjectID, t.config.Subscription).SetToCurrentTime()134t.backoff.Wait()135}136}137}138139// Labels return the model.LabelSet that the target applies to log entries.140func (t *PullTarget) Labels() model.LabelSet {141lbls := make(model.LabelSet, len(t.config.Labels))142for k, v := range t.config.Labels {143lbls[model.LabelName(k)] = model.LabelValue(v)144}145return lbls146}147148// Details returns some debug information about the target.149func (t *PullTarget) Details() map[string]string {150return map[string]string{151"strategy": "pull",152"labels": t.Labels().String(),153}154}155156// Stop shuts the target down.157func (t *PullTarget) Stop() error {158t.cancel()159t.wg.Wait()160t.handler.Stop()161t.ps.Close()162return nil163}164165166