Path: blob/main/component/common/loki/client/batch.go
4096 views
package client12// This code is copied from Promtail. The client package is used to configure3// and run the clients that can send log entries to a Loki instance.45import (6"fmt"7"sort"8"strings"9"time"1011"github.com/gogo/protobuf/proto"12"github.com/golang/snappy"13"github.com/grafana/agent/component/common/loki"14"github.com/grafana/loki/pkg/logproto"15"github.com/prometheus/common/model"16)1718const (19errMaxStreamsLimitExceeded = "streams limit exceeded, streams: %d exceeds limit: %d, stream: '%s'"20)2122// batch holds pending log streams waiting to be sent to Loki, and it's used23// to reduce the number of push requests to Loki aggregating multiple log streams24// and entries in a single batch request. In case of multi-tenant Promtail, log25// streams for each tenant are stored in a dedicated batch.26type batch struct {27streams map[string]*logproto.Stream28bytes int29createdAt time.Time3031maxStreams int32}3334func newBatch(maxStreams int, entries ...loki.Entry) *batch {35b := &batch{36streams: map[string]*logproto.Stream{},37bytes: 0,38createdAt: time.Now(),39maxStreams: maxStreams,40}4142// Add entries to the batch43for _, entry := range entries {44//never error here45_ = b.add(entry)46}4748return b49}5051// add an entry to the batch52func (b *batch) add(entry loki.Entry) error {53b.bytes += len(entry.Line)5455// Append the entry to an already existing stream (if any)56labels := labelsMapToString(entry.Labels, ReservedLabelTenantID)57if stream, ok := b.streams[labels]; ok {58stream.Entries = append(stream.Entries, entry.Entry)59return nil60}6162streams := len(b.streams)63if b.maxStreams > 0 && streams >= b.maxStreams {64return fmt.Errorf(errMaxStreamsLimitExceeded, streams, b.maxStreams, labels)65}66// Add the entry as a new stream67b.streams[labels] = &logproto.Stream{68Labels: labels,69Entries: []logproto.Entry{entry.Entry},70}71return nil72}7374func labelsMapToString(ls model.LabelSet, without ...model.LabelName) string {75lstrs := make([]string, 0, len(ls))76Outer:77for l, v := range ls {78for _, w := range without {79if l == w {80continue Outer81}82}83lstrs = append(lstrs, fmt.Sprintf("%s=%q", l, v))84}8586sort.Strings(lstrs)87return fmt.Sprintf("{%s}", strings.Join(lstrs, ", "))88}8990// sizeBytes returns the current batch size in bytes91func (b *batch) sizeBytes() int {92return b.bytes93}9495// sizeBytesAfter returns the size of the batch after the input entry96// will be added to the batch itself97func (b *batch) sizeBytesAfter(entry loki.Entry) int {98return b.bytes + len(entry.Line)99}100101// age of the batch since its creation102func (b *batch) age() time.Duration {103return time.Since(b.createdAt)104}105106// encode the batch as snappy-compressed push request, and returns107// the encoded bytes and the number of encoded entries108func (b *batch) encode() ([]byte, int, error) {109req, entriesCount := b.createPushRequest()110buf, err := proto.Marshal(req)111if err != nil {112return nil, 0, err113}114buf = snappy.Encode(nil, buf)115return buf, entriesCount, nil116}117118// creates push request and returns it, together with number of entries119func (b *batch) createPushRequest() (*logproto.PushRequest, int) {120req := logproto.PushRequest{121Streams: make([]logproto.Stream, 0, len(b.streams)),122}123124entriesCount := 0125for _, stream := range b.streams {126req.Streams = append(req.Streams, *stream)127entriesCount += len(stream.Entries)128}129return &req, entriesCount130}131132133