Path: blob/main/component/loki/source/cloudflare/internal/cloudflaretarget/target.go
4097 views
package cloudflaretarget12// This code is copied from Promtail. The cloudflaretarget package is used to3// configure and run a target that can read from the Cloudflare Logpull API and4// forward entries to other loki components.56import (7"context"8"regexp"9"strings"10"sync"11"time"1213"github.com/buger/jsonparser"14"github.com/cloudflare/cloudflare-go"15"github.com/go-kit/log"16"github.com/go-kit/log/level"17"github.com/grafana/agent/component/common/loki"18"github.com/grafana/agent/component/common/loki/positions"19"github.com/grafana/dskit/backoff"20"github.com/grafana/dskit/concurrency"21"github.com/grafana/dskit/multierror"22"github.com/grafana/loki/pkg/logproto"23"github.com/prometheus/common/model"24"go.uber.org/atomic"25)2627// The minimum window size is 1 minute.28const minDelay = time.Minute2930var cloudflareTooEarlyError = regexp.MustCompile(`too early: logs older than \S+ are not available`)3132var defaultBackoff = backoff.Config{33MinBackoff: 1 * time.Second,34MaxBackoff: 10 * time.Second,35MaxRetries: 5,36}3738// Config defines how to connect to Cloudflare's Logpull API.39type Config struct {40APIToken string41ZoneID string42Labels model.LabelSet43Workers int44PullRange model.Duration45FieldsType string46}4748// Target enables pulling HTTP log messages from Cloudflare using the Logpull49// API.50type Target struct {51logger log.Logger52handler loki.EntryHandler53positions positions.Positions54config *Config55metrics *Metrics5657client Client58ctx context.Context59cancel context.CancelFunc60wg sync.WaitGroup61to time.Time // the end of the next pull interval62running *atomic.Bool63err error64}6566// NewTarget creates and runs a Cloudflare target.67func NewTarget(metrics *Metrics, logger log.Logger, handler loki.EntryHandler, position positions.Positions, config *Config) (*Target, error) {68fields, err := Fields(FieldsType(config.FieldsType))69if err != nil {70return nil, err71}72client, err := getClient(config.APIToken, config.ZoneID, fields)73if err != nil {74return nil, err75}76pos, err := position.Get(positions.CursorKey(config.ZoneID), config.Labels.String())77if err != nil {78return nil, err79}80to := time.Now()81if pos != 0 {82to = time.Unix(0, pos)83}84ctx, cancel := context.WithCancel(context.Background())85t := &Target{86logger: logger,87handler: handler,88positions: position,89config: config,90metrics: metrics,9192ctx: ctx,93cancel: cancel,94client: client,95to: to,96running: atomic.NewBool(false),97}98t.start()99return t, nil100}101102func (t *Target) start() {103t.wg.Add(1)104t.running.Store(true)105go func() {106defer func() {107t.wg.Done()108t.running.Store(false)109}()110for t.ctx.Err() == nil {111end := t.to112maxEnd := time.Now().Add(-minDelay)113if end.After(maxEnd) {114end = maxEnd115}116start := end.Add(-time.Duration(t.config.PullRange))117requests := splitRequests(start, end, t.config.Workers)118// Use background context for workers as we don't want to cancel halfway through.119// In case of errors we stop the target, each worker has its own retry logic.120if err := concurrency.ForEachJob(context.Background(), len(requests), t.config.Workers, func(ctx context.Context, idx int) error {121request := requests[idx]122return t.pull(ctx, request.start, request.end)123}); err != nil {124level.Error(t.logger).Log("msg", "failed to pull logs", "err", err, "start", start, "end", end)125t.err = err126return127}128129// Sets current timestamp metrics, move to the next interval and saves the position.130t.metrics.LastEnd.Set(float64(end.UnixNano()) / 1e9)131t.to = end.Add(time.Duration(t.config.PullRange))132t.positions.Put(positions.CursorKey(t.config.ZoneID), t.Labels().String(), t.to.UnixNano())133134// If the next window can be fetched do it, if not sleep for a while.135// This is because Cloudflare logs should never be pulled between now-1m and now.136diff := t.to.Sub(time.Now().Add(-minDelay))137if diff > 0 {138select {139case <-time.After(diff):140case <-t.ctx.Done():141}142}143}144}()145}146147// pull pulls logs from cloudflare for a given time range.148// It will retry on errors.149func (t *Target) pull(ctx context.Context, start, end time.Time) error {150var (151backoff = backoff.New(ctx, defaultBackoff)152errs = multierror.New()153it cloudflare.LogpullReceivedIterator154err error155)156157for backoff.Ongoing() {158it, err = t.client.LogpullReceived(ctx, start, end)159if err != nil && cloudflareTooEarlyError.MatchString(err.Error()) {160level.Warn(t.logger).Log("msg", "failed iterating over logs, out of cloudflare range, not retrying", "err", err, "start", start, "end", end, "retries", backoff.NumRetries())161return nil162} else if err != nil {163if it != nil {164it.Close()165}166errs.Add(err)167backoff.Wait()168continue169}170if err := func() error {171defer it.Close()172var lineRead int64173for it.Next() {174line := it.Line()175ts, err := jsonparser.GetInt(line, "EdgeStartTimestamp")176if err != nil {177ts = time.Now().UnixNano()178}179t.handler.Chan() <- loki.Entry{180Labels: t.config.Labels.Clone(),181Entry: logproto.Entry{182Timestamp: time.Unix(0, ts),183Line: string(line),184},185}186lineRead++187t.metrics.Entries.Inc()188}189if it.Err() != nil {190level.Warn(t.logger).Log("msg", "failed iterating over logs", "err", it.Err(), "start", start, "end", end, "retries", backoff.NumRetries(), "lineRead", lineRead)191return it.Err()192}193return nil194}(); err != nil {195errs.Add(err)196backoff.Wait()197continue198}199return nil200}201return errs.Err()202}203204// Stop shuts down the target.205func (t *Target) Stop() {206t.cancel()207t.wg.Wait()208t.handler.Stop()209}210211// Labels returns the custom labels attached to log entries.212func (t *Target) Labels() model.LabelSet {213return t.config.Labels214}215216// Ready reports whether the target is ready.217func (t *Target) Ready() bool {218return t.running.Load()219}220221// Details returns debug details about the Cloudflare target.222func (t *Target) Details() map[string]string {223fields, _ := Fields(FieldsType(t.config.FieldsType))224var errMsg string225if t.err != nil {226errMsg = t.err.Error()227}228return map[string]string{229"zone_id": t.config.ZoneID,230"error": errMsg,231"position": t.positions.GetString(positions.CursorKey(t.config.ZoneID), t.config.Labels.String()),232"last_timestamp": t.to.String(),233"fields": strings.Join(fields, ","),234}235}236237type pullRequest struct {238start time.Time239end time.Time240}241242func splitRequests(start, end time.Time, workers int) []pullRequest {243perWorker := end.Sub(start) / time.Duration(workers)244var requests []pullRequest245for i := 0; i < workers; i++ {246r := pullRequest{247start: start.Add(time.Duration(i) * perWorker),248end: start.Add(time.Duration(i+1) * perWorker),249}250// If the last worker is smaller than the others, we need to make sure it gets the last chunk.251if i == workers-1 && r.end != end {252r.end = end253}254requests = append(requests, r)255}256return requests257}258259260