Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/common/loki/client/batch.go
4096 views
1
package client
2
3
// This code is copied from Promtail. The client package is used to configure
4
// and run the clients that can send log entries to a Loki instance.
5
6
import (
7
"fmt"
8
"sort"
9
"strings"
10
"time"
11
12
"github.com/gogo/protobuf/proto"
13
"github.com/golang/snappy"
14
"github.com/grafana/agent/component/common/loki"
15
"github.com/grafana/loki/pkg/logproto"
16
"github.com/prometheus/common/model"
17
)
18
19
const (
20
errMaxStreamsLimitExceeded = "streams limit exceeded, streams: %d exceeds limit: %d, stream: '%s'"
21
)
22
23
// batch holds pending log streams waiting to be sent to Loki, and it's used
24
// to reduce the number of push requests to Loki aggregating multiple log streams
25
// and entries in a single batch request. In case of multi-tenant Promtail, log
26
// streams for each tenant are stored in a dedicated batch.
27
type batch struct {
28
streams map[string]*logproto.Stream
29
bytes int
30
createdAt time.Time
31
32
maxStreams int
33
}
34
35
func newBatch(maxStreams int, entries ...loki.Entry) *batch {
36
b := &batch{
37
streams: map[string]*logproto.Stream{},
38
bytes: 0,
39
createdAt: time.Now(),
40
maxStreams: maxStreams,
41
}
42
43
// Add entries to the batch
44
for _, entry := range entries {
45
//never error here
46
_ = b.add(entry)
47
}
48
49
return b
50
}
51
52
// add an entry to the batch
53
func (b *batch) add(entry loki.Entry) error {
54
b.bytes += len(entry.Line)
55
56
// Append the entry to an already existing stream (if any)
57
labels := labelsMapToString(entry.Labels, ReservedLabelTenantID)
58
if stream, ok := b.streams[labels]; ok {
59
stream.Entries = append(stream.Entries, entry.Entry)
60
return nil
61
}
62
63
streams := len(b.streams)
64
if b.maxStreams > 0 && streams >= b.maxStreams {
65
return fmt.Errorf(errMaxStreamsLimitExceeded, streams, b.maxStreams, labels)
66
}
67
// Add the entry as a new stream
68
b.streams[labels] = &logproto.Stream{
69
Labels: labels,
70
Entries: []logproto.Entry{entry.Entry},
71
}
72
return nil
73
}
74
75
func labelsMapToString(ls model.LabelSet, without ...model.LabelName) string {
76
lstrs := make([]string, 0, len(ls))
77
Outer:
78
for l, v := range ls {
79
for _, w := range without {
80
if l == w {
81
continue Outer
82
}
83
}
84
lstrs = append(lstrs, fmt.Sprintf("%s=%q", l, v))
85
}
86
87
sort.Strings(lstrs)
88
return fmt.Sprintf("{%s}", strings.Join(lstrs, ", "))
89
}
90
91
// sizeBytes returns the current batch size in bytes
92
func (b *batch) sizeBytes() int {
93
return b.bytes
94
}
95
96
// sizeBytesAfter returns the size of the batch after the input entry
97
// will be added to the batch itself
98
func (b *batch) sizeBytesAfter(entry loki.Entry) int {
99
return b.bytes + len(entry.Line)
100
}
101
102
// age of the batch since its creation
103
func (b *batch) age() time.Duration {
104
return time.Since(b.createdAt)
105
}
106
107
// encode the batch as snappy-compressed push request, and returns
108
// the encoded bytes and the number of encoded entries
109
func (b *batch) encode() ([]byte, int, error) {
110
req, entriesCount := b.createPushRequest()
111
buf, err := proto.Marshal(req)
112
if err != nil {
113
return nil, 0, err
114
}
115
buf = snappy.Encode(nil, buf)
116
return buf, entriesCount, nil
117
}
118
119
// creates push request and returns it, together with number of entries
120
func (b *batch) createPushRequest() (*logproto.PushRequest, int) {
121
req := logproto.PushRequest{
122
Streams: make([]logproto.Stream, 0, len(b.streams)),
123
}
124
125
entriesCount := 0
126
for _, stream := range b.streams {
127
req.Streams = append(req.Streams, *stream)
128
entriesCount += len(stream.Entries)
129
}
130
return &req, entriesCount
131
}
132
133