Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/common/loki/client/client.go
4096 views
1
package client
2
3
// This code is copied from Promtail. The client package is used to configure
4
// and run the clients that can send log entries to a Loki instance.
5
6
import (
7
"bufio"
8
"bytes"
9
"context"
10
"crypto/sha256"
11
"errors"
12
"fmt"
13
"io"
14
"net/http"
15
"strconv"
16
"sync"
17
"time"
18
19
"github.com/go-kit/log"
20
"github.com/go-kit/log/level"
21
"github.com/grafana/agent/component/common/loki"
22
"github.com/grafana/agent/pkg/build"
23
"github.com/grafana/dskit/backoff"
24
lokiutil "github.com/grafana/loki/pkg/util"
25
"github.com/prometheus/client_golang/prometheus"
26
"github.com/prometheus/common/config"
27
"github.com/prometheus/common/model"
28
"github.com/prometheus/prometheus/promql/parser"
29
)
30
31
const (
32
contentType = "application/x-protobuf"
33
maxErrMsgLen = 1024
34
35
// Label reserved to override the tenant ID while processing
36
// pipeline stages
37
ReservedLabelTenantID = "__tenant_id__"
38
39
LatencyLabel = "filename"
40
HostLabel = "host"
41
ClientLabel = "client"
42
)
43
44
var UserAgent = fmt.Sprintf("GrafanaAgent/%s", build.Version)
45
46
type Metrics struct {
47
encodedBytes *prometheus.CounterVec
48
sentBytes *prometheus.CounterVec
49
droppedBytes *prometheus.CounterVec
50
sentEntries *prometheus.CounterVec
51
droppedEntries *prometheus.CounterVec
52
requestDuration *prometheus.HistogramVec
53
batchRetries *prometheus.CounterVec
54
countersWithHost []*prometheus.CounterVec
55
streamLag *prometheus.GaugeVec
56
}
57
58
func NewMetrics(reg prometheus.Registerer, streamLagLabels []string) *Metrics {
59
var m Metrics
60
61
m.encodedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
62
Name: "loki_write_encoded_bytes_total",
63
Help: "Number of bytes encoded and ready to send.",
64
}, []string{HostLabel})
65
m.sentBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
66
Name: "loki_write_sent_bytes_total",
67
Help: "Number of bytes sent.",
68
}, []string{HostLabel})
69
m.droppedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
70
Name: "loki_write_dropped_bytes_total",
71
Help: "Number of bytes dropped because failed to be sent to the ingester after all retries.",
72
}, []string{HostLabel})
73
m.sentEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
74
Name: "loki_write_sent_entries_total",
75
Help: "Number of log entries sent to the ingester.",
76
}, []string{HostLabel})
77
m.droppedEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
78
Name: "loki_write_dropped_entries_total",
79
Help: "Number of log entries dropped because failed to be sent to the ingester after all retries.",
80
}, []string{HostLabel})
81
m.requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
82
Name: "loki_write_request_duration_seconds",
83
Help: "Duration of send requests.",
84
}, []string{"status_code", HostLabel})
85
m.batchRetries = prometheus.NewCounterVec(prometheus.CounterOpts{
86
Name: "loki_write_batch_retries_total",
87
Help: "Number of times batches has had to be retried.",
88
}, []string{HostLabel})
89
90
m.countersWithHost = []*prometheus.CounterVec{
91
m.encodedBytes, m.sentBytes, m.droppedBytes, m.sentEntries, m.droppedEntries,
92
}
93
94
streamLagLabelsMerged := []string{HostLabel, ClientLabel}
95
streamLagLabelsMerged = append(streamLagLabelsMerged, streamLagLabels...)
96
m.streamLag = prometheus.NewGaugeVec(prometheus.GaugeOpts{
97
Name: "loki_write_stream_lag_seconds",
98
Help: "Difference between current time and last batch timestamp for successful sends",
99
}, streamLagLabelsMerged)
100
101
if reg != nil {
102
m.encodedBytes = mustRegisterOrGet(reg, m.encodedBytes).(*prometheus.CounterVec)
103
m.sentBytes = mustRegisterOrGet(reg, m.sentBytes).(*prometheus.CounterVec)
104
m.droppedBytes = mustRegisterOrGet(reg, m.droppedBytes).(*prometheus.CounterVec)
105
m.sentEntries = mustRegisterOrGet(reg, m.sentEntries).(*prometheus.CounterVec)
106
m.droppedEntries = mustRegisterOrGet(reg, m.droppedEntries).(*prometheus.CounterVec)
107
m.requestDuration = mustRegisterOrGet(reg, m.requestDuration).(*prometheus.HistogramVec)
108
m.batchRetries = mustRegisterOrGet(reg, m.batchRetries).(*prometheus.CounterVec)
109
m.streamLag = mustRegisterOrGet(reg, m.streamLag).(*prometheus.GaugeVec)
110
}
111
112
return &m
113
}
114
115
func mustRegisterOrGet(reg prometheus.Registerer, c prometheus.Collector) prometheus.Collector {
116
if err := reg.Register(c); err != nil {
117
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
118
return are.ExistingCollector
119
}
120
panic(err)
121
}
122
return c
123
}
124
125
// Client pushes entries to Loki and can be stopped
126
type Client interface {
127
loki.EntryHandler
128
// Stop goroutine sending batch of entries without retries.
129
StopNow()
130
Name() string
131
}
132
133
// Client for pushing logs in snappy-compressed protos over HTTP.
134
type client struct {
135
name string
136
metrics *Metrics
137
streamLagLabels []string
138
logger log.Logger
139
cfg Config
140
client *http.Client
141
entries chan loki.Entry
142
143
once sync.Once
144
wg sync.WaitGroup
145
146
externalLabels model.LabelSet
147
148
// ctx is used in any upstream calls from the `client`.
149
ctx context.Context
150
cancel context.CancelFunc
151
maxStreams int
152
}
153
154
// Tripperware can wrap a roundtripper.
155
type Tripperware func(http.RoundTripper) http.RoundTripper
156
157
// New makes a new Client.
158
func New(metrics *Metrics, cfg Config, streamLagLabels []string, maxStreams int, logger log.Logger) (Client, error) {
159
if cfg.StreamLagLabels.String() != "" {
160
return nil, fmt.Errorf("client config stream_lag_labels is deprecated in favour of the config file options block field, and will be ignored: %+v", cfg.StreamLagLabels.String())
161
}
162
return newClient(metrics, cfg, streamLagLabels, maxStreams, logger)
163
}
164
165
func newClient(metrics *Metrics, cfg Config, streamLagLabels []string, maxStreams int, logger log.Logger) (*client, error) {
166
if cfg.URL.URL == nil {
167
return nil, errors.New("client needs target URL")
168
}
169
170
ctx, cancel := context.WithCancel(context.Background())
171
172
c := &client{
173
logger: log.With(logger, "component", "client", "host", cfg.URL.Host),
174
cfg: cfg,
175
entries: make(chan loki.Entry),
176
metrics: metrics,
177
streamLagLabels: streamLagLabels,
178
name: asSha256(cfg),
179
180
externalLabels: cfg.ExternalLabels.LabelSet,
181
ctx: ctx,
182
cancel: cancel,
183
maxStreams: maxStreams,
184
}
185
if cfg.Name != "" {
186
c.name = cfg.Name
187
}
188
189
err := cfg.Client.Validate()
190
if err != nil {
191
return nil, err
192
}
193
194
c.client, err = config.NewClientFromConfig(cfg.Client, "GrafanaAgent", config.WithHTTP2Disabled())
195
if err != nil {
196
return nil, err
197
}
198
199
c.client.Timeout = cfg.Timeout
200
201
// Initialize counters to 0 so the metrics are exported before the first
202
// occurrence of incrementing to avoid missing metrics.
203
for _, counter := range c.metrics.countersWithHost {
204
counter.WithLabelValues(c.cfg.URL.Host).Add(0)
205
}
206
207
c.wg.Add(1)
208
go c.run()
209
return c, nil
210
}
211
212
// NewWithTripperware creates a new Loki client with a custom tripperware.
213
func NewWithTripperware(metrics *Metrics, cfg Config, streamLagLabels []string, maxStreams int, logger log.Logger, tp Tripperware) (Client, error) {
214
c, err := newClient(metrics, cfg, streamLagLabels, maxStreams, logger)
215
if err != nil {
216
return nil, err
217
}
218
219
if tp != nil {
220
c.client.Transport = tp(c.client.Transport)
221
}
222
223
return c, nil
224
}
225
226
func (c *client) run() {
227
batches := map[string]*batch{}
228
229
// Given the client handles multiple batches (1 per tenant) and each batch
230
// can be created at a different point in time, we look for batches whose
231
// max wait time has been reached every 10 times per BatchWait, so that the
232
// maximum delay we have sending batches is 10% of the max waiting time.
233
// We apply a cap of 10ms to the ticker, to avoid too frequent checks in
234
// case the BatchWait is very low.
235
minWaitCheckFrequency := 10 * time.Millisecond
236
maxWaitCheckFrequency := c.cfg.BatchWait / 10
237
if maxWaitCheckFrequency < minWaitCheckFrequency {
238
maxWaitCheckFrequency = minWaitCheckFrequency
239
}
240
241
maxWaitCheck := time.NewTicker(maxWaitCheckFrequency)
242
243
defer func() {
244
maxWaitCheck.Stop()
245
// Send all pending batches
246
for tenantID, batch := range batches {
247
c.sendBatch(tenantID, batch)
248
}
249
250
c.wg.Done()
251
}()
252
253
for {
254
select {
255
case e, ok := <-c.entries:
256
if !ok {
257
return
258
}
259
e, tenantID := c.processEntry(e)
260
batch, ok := batches[tenantID]
261
262
// If the batch doesn't exist yet, we create a new one with the entry
263
if !ok {
264
batches[tenantID] = newBatch(c.maxStreams, e)
265
break
266
}
267
268
// If adding the entry to the batch will increase the size over the max
269
// size allowed, we do send the current batch and then create a new one
270
if batch.sizeBytesAfter(e) > c.cfg.BatchSize {
271
c.sendBatch(tenantID, batch)
272
273
batches[tenantID] = newBatch(c.maxStreams, e)
274
break
275
}
276
277
// The max size of the batch isn't reached, so we can add the entry
278
err := batch.add(e)
279
if err != nil {
280
level.Error(c.logger).Log("msg", "batch add err", "error", err)
281
c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host).Inc()
282
return
283
}
284
case <-maxWaitCheck.C:
285
// Send all batches whose max wait time has been reached
286
for tenantID, batch := range batches {
287
if batch.age() < c.cfg.BatchWait {
288
continue
289
}
290
291
c.sendBatch(tenantID, batch)
292
delete(batches, tenantID)
293
}
294
}
295
}
296
}
297
298
func (c *client) Chan() chan<- loki.Entry {
299
return c.entries
300
}
301
302
func asSha256(o interface{}) string {
303
h := sha256.New()
304
h.Write([]byte(fmt.Sprintf("%v", o)))
305
306
temp := fmt.Sprintf("%x", h.Sum(nil))
307
return temp[:6]
308
}
309
310
func (c *client) sendBatch(tenantID string, batch *batch) {
311
buf, entriesCount, err := batch.encode()
312
if err != nil {
313
level.Error(c.logger).Log("msg", "error encoding batch", "error", err)
314
return
315
}
316
bufBytes := float64(len(buf))
317
c.metrics.encodedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
318
319
backoff := backoff.New(c.ctx, c.cfg.BackoffConfig)
320
var status int
321
for {
322
start := time.Now()
323
// send uses `timeout` internally, so `context.Background` is good enough.
324
status, err = c.send(context.Background(), tenantID, buf)
325
326
c.metrics.requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds())
327
328
if err == nil {
329
c.metrics.sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
330
c.metrics.sentEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
331
for _, s := range batch.streams {
332
lbls, err := parser.ParseMetric(s.Labels)
333
if err != nil {
334
// is this possible?
335
level.Warn(c.logger).Log("msg", "error converting stream label string to label.Labels, cannot update lagging metric", "error", err)
336
return
337
}
338
339
//nolint:staticcheck
340
lblSet := make(prometheus.Labels)
341
for _, lbl := range c.streamLagLabels {
342
// label from streamLagLabels may not be found but we still need an empty value
343
// so that the prometheus client library doesn't panic on inconsistent label cardinality
344
value := ""
345
for i := range lbls {
346
if lbls[i].Name == lbl {
347
value = lbls[i].Value
348
}
349
}
350
lblSet[lbl] = value
351
}
352
353
//nolint:staticcheck
354
if lblSet != nil {
355
// always set host
356
lblSet[HostLabel] = c.cfg.URL.Host
357
// also set client name since if we have multiple
358
// loki_write clients configured we will run into a
359
// duplicate metric collected with same labels error when
360
// trying to hit the /metrics endpoint
361
lblSet[ClientLabel] = c.name
362
c.metrics.streamLag.With(lblSet).Set(time.Since(s.Entries[len(s.Entries)-1].Timestamp).Seconds())
363
}
364
}
365
return
366
}
367
368
// Only retry 429s, 500s and connection-level errors.
369
if status > 0 && status != 429 && status/100 != 5 {
370
break
371
}
372
373
level.Warn(c.logger).Log("msg", "error sending batch, will retry", "status", status, "error", err)
374
c.metrics.batchRetries.WithLabelValues(c.cfg.URL.Host).Inc()
375
backoff.Wait()
376
377
// Make sure it sends at least once before checking for retry.
378
if !backoff.Ongoing() {
379
break
380
}
381
}
382
383
if err != nil {
384
level.Error(c.logger).Log("msg", "final error sending batch", "status", status, "error", err)
385
c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
386
c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
387
}
388
}
389
390
func (c *client) send(ctx context.Context, tenantID string, buf []byte) (int, error) {
391
ctx, cancel := context.WithTimeout(ctx, c.cfg.Timeout)
392
defer cancel()
393
req, err := http.NewRequest("POST", c.cfg.URL.String(), bytes.NewReader(buf))
394
if err != nil {
395
return -1, err
396
}
397
req = req.WithContext(ctx)
398
req.Header.Set("Content-Type", contentType)
399
req.Header.Set("User-Agent", UserAgent)
400
401
// If the tenant ID is not empty, the component is running in multi-tenant
402
// mode, so we should send it to Loki
403
if tenantID != "" {
404
req.Header.Set("X-Scope-OrgID", tenantID)
405
}
406
407
resp, err := c.client.Do(req)
408
if err != nil {
409
return -1, err
410
}
411
defer lokiutil.LogError("closing response body", resp.Body.Close)
412
413
if resp.StatusCode/100 != 2 {
414
scanner := bufio.NewScanner(io.LimitReader(resp.Body, maxErrMsgLen))
415
line := ""
416
if scanner.Scan() {
417
line = scanner.Text()
418
}
419
err = fmt.Errorf("server returned HTTP status %s (%d): %s", resp.Status, resp.StatusCode, line)
420
}
421
return resp.StatusCode, err
422
}
423
424
func (c *client) getTenantID(labels model.LabelSet) string {
425
// Check if it has been overridden while processing the pipeline stages
426
if value, ok := labels[ReservedLabelTenantID]; ok {
427
return string(value)
428
}
429
430
// Check if has been specified in the config
431
if c.cfg.TenantID != "" {
432
return c.cfg.TenantID
433
}
434
435
// Defaults to an empty string, which means the X-Scope-OrgID header
436
// will not be sent
437
return ""
438
}
439
440
// Stop the client.
441
func (c *client) Stop() {
442
c.once.Do(func() { close(c.entries) })
443
c.wg.Wait()
444
}
445
446
// StopNow stops the client without retries
447
func (c *client) StopNow() {
448
// cancel will stop retrying http requests.
449
c.cancel()
450
c.Stop()
451
}
452
453
func (c *client) processEntry(e loki.Entry) (loki.Entry, string) {
454
if len(c.externalLabels) > 0 {
455
e.Labels = c.externalLabels.Merge(e.Labels)
456
}
457
tenantID := c.getTenantID(e.Labels)
458
return e, tenantID
459
}
460
461
func (c *client) UnregisterLatencyMetric(labels prometheus.Labels) {
462
labels[HostLabel] = c.cfg.URL.Host
463
c.metrics.streamLag.Delete(labels)
464
}
465
466
func (c *client) Name() string {
467
return c.name
468
}
469
470