Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/cloudflare/internal/cloudflaretarget/target.go
4097 views
1
package cloudflaretarget
2
3
// This code is copied from Promtail. The cloudflaretarget package is used to
4
// configure and run a target that can read from the Cloudflare Logpull API and
5
// forward entries to other loki components.
6
7
import (
8
"context"
9
"regexp"
10
"strings"
11
"sync"
12
"time"
13
14
"github.com/buger/jsonparser"
15
"github.com/cloudflare/cloudflare-go"
16
"github.com/go-kit/log"
17
"github.com/go-kit/log/level"
18
"github.com/grafana/agent/component/common/loki"
19
"github.com/grafana/agent/component/common/loki/positions"
20
"github.com/grafana/dskit/backoff"
21
"github.com/grafana/dskit/concurrency"
22
"github.com/grafana/dskit/multierror"
23
"github.com/grafana/loki/pkg/logproto"
24
"github.com/prometheus/common/model"
25
"go.uber.org/atomic"
26
)
27
28
// The minimum window size is 1 minute.
29
const minDelay = time.Minute
30
31
var cloudflareTooEarlyError = regexp.MustCompile(`too early: logs older than \S+ are not available`)
32
33
var defaultBackoff = backoff.Config{
34
MinBackoff: 1 * time.Second,
35
MaxBackoff: 10 * time.Second,
36
MaxRetries: 5,
37
}
38
39
// Config defines how to connect to Cloudflare's Logpull API.
40
type Config struct {
41
APIToken string
42
ZoneID string
43
Labels model.LabelSet
44
Workers int
45
PullRange model.Duration
46
FieldsType string
47
}
48
49
// Target enables pulling HTTP log messages from Cloudflare using the Logpull
50
// API.
51
type Target struct {
52
logger log.Logger
53
handler loki.EntryHandler
54
positions positions.Positions
55
config *Config
56
metrics *Metrics
57
58
client Client
59
ctx context.Context
60
cancel context.CancelFunc
61
wg sync.WaitGroup
62
to time.Time // the end of the next pull interval
63
running *atomic.Bool
64
err error
65
}
66
67
// NewTarget creates and runs a Cloudflare target.
68
func NewTarget(metrics *Metrics, logger log.Logger, handler loki.EntryHandler, position positions.Positions, config *Config) (*Target, error) {
69
fields, err := Fields(FieldsType(config.FieldsType))
70
if err != nil {
71
return nil, err
72
}
73
client, err := getClient(config.APIToken, config.ZoneID, fields)
74
if err != nil {
75
return nil, err
76
}
77
pos, err := position.Get(positions.CursorKey(config.ZoneID), config.Labels.String())
78
if err != nil {
79
return nil, err
80
}
81
to := time.Now()
82
if pos != 0 {
83
to = time.Unix(0, pos)
84
}
85
ctx, cancel := context.WithCancel(context.Background())
86
t := &Target{
87
logger: logger,
88
handler: handler,
89
positions: position,
90
config: config,
91
metrics: metrics,
92
93
ctx: ctx,
94
cancel: cancel,
95
client: client,
96
to: to,
97
running: atomic.NewBool(false),
98
}
99
t.start()
100
return t, nil
101
}
102
103
func (t *Target) start() {
104
t.wg.Add(1)
105
t.running.Store(true)
106
go func() {
107
defer func() {
108
t.wg.Done()
109
t.running.Store(false)
110
}()
111
for t.ctx.Err() == nil {
112
end := t.to
113
maxEnd := time.Now().Add(-minDelay)
114
if end.After(maxEnd) {
115
end = maxEnd
116
}
117
start := end.Add(-time.Duration(t.config.PullRange))
118
requests := splitRequests(start, end, t.config.Workers)
119
// Use background context for workers as we don't want to cancel halfway through.
120
// In case of errors we stop the target, each worker has its own retry logic.
121
if err := concurrency.ForEachJob(context.Background(), len(requests), t.config.Workers, func(ctx context.Context, idx int) error {
122
request := requests[idx]
123
return t.pull(ctx, request.start, request.end)
124
}); err != nil {
125
level.Error(t.logger).Log("msg", "failed to pull logs", "err", err, "start", start, "end", end)
126
t.err = err
127
return
128
}
129
130
// Sets current timestamp metrics, move to the next interval and saves the position.
131
t.metrics.LastEnd.Set(float64(end.UnixNano()) / 1e9)
132
t.to = end.Add(time.Duration(t.config.PullRange))
133
t.positions.Put(positions.CursorKey(t.config.ZoneID), t.Labels().String(), t.to.UnixNano())
134
135
// If the next window can be fetched do it, if not sleep for a while.
136
// This is because Cloudflare logs should never be pulled between now-1m and now.
137
diff := t.to.Sub(time.Now().Add(-minDelay))
138
if diff > 0 {
139
select {
140
case <-time.After(diff):
141
case <-t.ctx.Done():
142
}
143
}
144
}
145
}()
146
}
147
148
// pull pulls logs from cloudflare for a given time range.
149
// It will retry on errors.
150
func (t *Target) pull(ctx context.Context, start, end time.Time) error {
151
var (
152
backoff = backoff.New(ctx, defaultBackoff)
153
errs = multierror.New()
154
it cloudflare.LogpullReceivedIterator
155
err error
156
)
157
158
for backoff.Ongoing() {
159
it, err = t.client.LogpullReceived(ctx, start, end)
160
if err != nil && cloudflareTooEarlyError.MatchString(err.Error()) {
161
level.Warn(t.logger).Log("msg", "failed iterating over logs, out of cloudflare range, not retrying", "err", err, "start", start, "end", end, "retries", backoff.NumRetries())
162
return nil
163
} else if err != nil {
164
if it != nil {
165
it.Close()
166
}
167
errs.Add(err)
168
backoff.Wait()
169
continue
170
}
171
if err := func() error {
172
defer it.Close()
173
var lineRead int64
174
for it.Next() {
175
line := it.Line()
176
ts, err := jsonparser.GetInt(line, "EdgeStartTimestamp")
177
if err != nil {
178
ts = time.Now().UnixNano()
179
}
180
t.handler.Chan() <- loki.Entry{
181
Labels: t.config.Labels.Clone(),
182
Entry: logproto.Entry{
183
Timestamp: time.Unix(0, ts),
184
Line: string(line),
185
},
186
}
187
lineRead++
188
t.metrics.Entries.Inc()
189
}
190
if it.Err() != nil {
191
level.Warn(t.logger).Log("msg", "failed iterating over logs", "err", it.Err(), "start", start, "end", end, "retries", backoff.NumRetries(), "lineRead", lineRead)
192
return it.Err()
193
}
194
return nil
195
}(); err != nil {
196
errs.Add(err)
197
backoff.Wait()
198
continue
199
}
200
return nil
201
}
202
return errs.Err()
203
}
204
205
// Stop shuts down the target.
206
func (t *Target) Stop() {
207
t.cancel()
208
t.wg.Wait()
209
t.handler.Stop()
210
}
211
212
// Labels returns the custom labels attached to log entries.
213
func (t *Target) Labels() model.LabelSet {
214
return t.config.Labels
215
}
216
217
// Ready reports whether the target is ready.
218
func (t *Target) Ready() bool {
219
return t.running.Load()
220
}
221
222
// Details returns debug details about the Cloudflare target.
223
func (t *Target) Details() map[string]string {
224
fields, _ := Fields(FieldsType(t.config.FieldsType))
225
var errMsg string
226
if t.err != nil {
227
errMsg = t.err.Error()
228
}
229
return map[string]string{
230
"zone_id": t.config.ZoneID,
231
"error": errMsg,
232
"position": t.positions.GetString(positions.CursorKey(t.config.ZoneID), t.config.Labels.String()),
233
"last_timestamp": t.to.String(),
234
"fields": strings.Join(fields, ","),
235
}
236
}
237
238
type pullRequest struct {
239
start time.Time
240
end time.Time
241
}
242
243
func splitRequests(start, end time.Time, workers int) []pullRequest {
244
perWorker := end.Sub(start) / time.Duration(workers)
245
var requests []pullRequest
246
for i := 0; i < workers; i++ {
247
r := pullRequest{
248
start: start.Add(time.Duration(i) * perWorker),
249
end: start.Add(time.Duration(i+1) * perWorker),
250
}
251
// If the last worker is smaller than the others, we need to make sure it gets the last chunk.
252
if i == workers-1 && r.end != end {
253
r.end = end
254
}
255
requests = append(requests, r)
256
}
257
return requests
258
}
259
260