Path: blob/main/component/common/loki/client/batch_test.go
4096 views
package client12// This code is copied from Promtail. The client package is used to configure3// and run the clients that can send log entries to a Loki instance.45import (6"fmt"7"testing"8"time"910"github.com/prometheus/common/model"11"github.com/stretchr/testify/assert"12"github.com/stretchr/testify/require"1314"github.com/grafana/agent/component/common/loki"1516"github.com/grafana/loki/pkg/logproto"17)1819func TestBatch_MaxStreams(t *testing.T) {20maxStream := 22122var inputEntries = []loki.Entry{23{Labels: model.LabelSet{"app": "app-1"}, Entry: logproto.Entry{Timestamp: time.Unix(4, 0).UTC(), Line: "line4"}},24{Labels: model.LabelSet{"app": "app-2"}, Entry: logproto.Entry{Timestamp: time.Unix(5, 0).UTC(), Line: "line5"}},25{Labels: model.LabelSet{"app": "app-3"}, Entry: logproto.Entry{Timestamp: time.Unix(6, 0).UTC(), Line: "line6"}},26{Labels: model.LabelSet{"app": "app-4"}, Entry: logproto.Entry{Timestamp: time.Unix(6, 0).UTC(), Line: "line6"}},27}2829b := newBatch(maxStream)3031errCount := 032for _, entry := range inputEntries {33err := b.add(entry)34if err != nil {35errCount++36assert.EqualError(t, err, fmt.Errorf(errMaxStreamsLimitExceeded, len(b.streams), b.maxStreams, entry.Labels).Error())37}38}39assert.Equal(t, errCount, 2)40}4142func TestBatch_add(t *testing.T) {43t.Parallel()4445tests := map[string]struct {46inputEntries []loki.Entry47expectedSizeBytes int48}{49"empty batch": {50inputEntries: []loki.Entry{},51expectedSizeBytes: 0,52},53"single stream with single log entry": {54inputEntries: []loki.Entry{55{Labels: model.LabelSet{}, Entry: logEntries[0].Entry},56},57expectedSizeBytes: len(logEntries[0].Entry.Line),58},59"single stream with multiple log entries": {60inputEntries: []loki.Entry{61{Labels: model.LabelSet{}, Entry: logEntries[0].Entry},62{Labels: model.LabelSet{}, Entry: logEntries[1].Entry},63},64expectedSizeBytes: len(logEntries[0].Entry.Line) + len(logEntries[1].Entry.Line),65},66"multiple streams with multiple log entries": {67inputEntries: []loki.Entry{68{Labels: model.LabelSet{"type": "a"}, Entry: logEntries[0].Entry},69{Labels: model.LabelSet{"type": "a"}, Entry: logEntries[1].Entry},70{Labels: model.LabelSet{"type": "b"}, Entry: logEntries[2].Entry},71},72expectedSizeBytes: len(logEntries[0].Entry.Line) + len(logEntries[1].Entry.Line) + len(logEntries[2].Entry.Line),73},74}7576for testName, testData := range tests {77testData := testData7879t.Run(testName, func(t *testing.T) {80b := newBatch(0)8182for _, entry := range testData.inputEntries {83err := b.add(entry)84assert.NoError(t, err)85}8687assert.Equal(t, testData.expectedSizeBytes, b.sizeBytes())88})89}90}9192func TestBatch_encode(t *testing.T) {93t.Parallel()9495tests := map[string]struct {96inputBatch *batch97expectedEntriesCount int98}{99"empty batch": {100inputBatch: newBatch(0),101expectedEntriesCount: 0,102},103"single stream with single log entry": {104inputBatch: newBatch(0,105loki.Entry{Labels: model.LabelSet{}, Entry: logEntries[0].Entry},106),107expectedEntriesCount: 1,108},109"single stream with multiple log entries": {110inputBatch: newBatch(0,111loki.Entry{Labels: model.LabelSet{}, Entry: logEntries[0].Entry},112loki.Entry{Labels: model.LabelSet{}, Entry: logEntries[1].Entry},113),114expectedEntriesCount: 2,115},116"multiple streams with multiple log entries": {117inputBatch: newBatch(0,118loki.Entry{Labels: model.LabelSet{"type": "a"}, Entry: logEntries[0].Entry},119loki.Entry{Labels: model.LabelSet{"type": "a"}, Entry: logEntries[1].Entry},120loki.Entry{Labels: model.LabelSet{"type": "b"}, Entry: logEntries[2].Entry},121),122expectedEntriesCount: 3,123},124}125126for testName, testData := range tests {127testData := testData128129t.Run(testName, func(t *testing.T) {130t.Parallel()131132_, entriesCount, err := testData.inputBatch.encode()133require.NoError(t, err)134assert.Equal(t, testData.expectedEntriesCount, entriesCount)135})136}137}138139func TestHashCollisions(t *testing.T) {140b := newBatch(0)141142ls1 := model.LabelSet{"app": "l", "uniq0": "0", "uniq1": "1"}143ls2 := model.LabelSet{"app": "m", "uniq0": "1", "uniq1": "1"}144145require.False(t, ls1.Equal(ls2))146require.Equal(t, ls1.FastFingerprint(), ls2.FastFingerprint())147148const entriesPerLabel = 10149150for i := 0; i < entriesPerLabel; i++ {151_ = b.add(loki.Entry{Labels: ls1, Entry: logproto.Entry{Timestamp: time.Now(), Line: fmt.Sprintf("line %d", i)}})152153_ = b.add(loki.Entry{Labels: ls2, Entry: logproto.Entry{Timestamp: time.Now(), Line: fmt.Sprintf("line %d", i)}})154}155156// make sure that colliding labels are stored properly as independent streams157req, entries := b.createPushRequest()158assert.Len(t, req.Streams, 2)159assert.Equal(t, 2*entriesPerLabel, entries)160161if req.Streams[0].Labels == ls1.String() {162assert.Equal(t, ls1.String(), req.Streams[0].Labels)163assert.Equal(t, ls2.String(), req.Streams[1].Labels)164} else {165assert.Equal(t, ls2.String(), req.Streams[0].Labels)166assert.Equal(t, ls1.String(), req.Streams[1].Labels)167}168}169170171