Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/common/loki/client/batch_test.go
4096 views
1
package client
2
3
// This code is copied from Promtail. The client package is used to configure
4
// and run the clients that can send log entries to a Loki instance.
5
6
import (
7
"fmt"
8
"testing"
9
"time"
10
11
"github.com/prometheus/common/model"
12
"github.com/stretchr/testify/assert"
13
"github.com/stretchr/testify/require"
14
15
"github.com/grafana/agent/component/common/loki"
16
17
"github.com/grafana/loki/pkg/logproto"
18
)
19
20
func TestBatch_MaxStreams(t *testing.T) {
21
maxStream := 2
22
23
var inputEntries = []loki.Entry{
24
{Labels: model.LabelSet{"app": "app-1"}, Entry: logproto.Entry{Timestamp: time.Unix(4, 0).UTC(), Line: "line4"}},
25
{Labels: model.LabelSet{"app": "app-2"}, Entry: logproto.Entry{Timestamp: time.Unix(5, 0).UTC(), Line: "line5"}},
26
{Labels: model.LabelSet{"app": "app-3"}, Entry: logproto.Entry{Timestamp: time.Unix(6, 0).UTC(), Line: "line6"}},
27
{Labels: model.LabelSet{"app": "app-4"}, Entry: logproto.Entry{Timestamp: time.Unix(6, 0).UTC(), Line: "line6"}},
28
}
29
30
b := newBatch(maxStream)
31
32
errCount := 0
33
for _, entry := range inputEntries {
34
err := b.add(entry)
35
if err != nil {
36
errCount++
37
assert.EqualError(t, err, fmt.Errorf(errMaxStreamsLimitExceeded, len(b.streams), b.maxStreams, entry.Labels).Error())
38
}
39
}
40
assert.Equal(t, errCount, 2)
41
}
42
43
func TestBatch_add(t *testing.T) {
44
t.Parallel()
45
46
tests := map[string]struct {
47
inputEntries []loki.Entry
48
expectedSizeBytes int
49
}{
50
"empty batch": {
51
inputEntries: []loki.Entry{},
52
expectedSizeBytes: 0,
53
},
54
"single stream with single log entry": {
55
inputEntries: []loki.Entry{
56
{Labels: model.LabelSet{}, Entry: logEntries[0].Entry},
57
},
58
expectedSizeBytes: len(logEntries[0].Entry.Line),
59
},
60
"single stream with multiple log entries": {
61
inputEntries: []loki.Entry{
62
{Labels: model.LabelSet{}, Entry: logEntries[0].Entry},
63
{Labels: model.LabelSet{}, Entry: logEntries[1].Entry},
64
},
65
expectedSizeBytes: len(logEntries[0].Entry.Line) + len(logEntries[1].Entry.Line),
66
},
67
"multiple streams with multiple log entries": {
68
inputEntries: []loki.Entry{
69
{Labels: model.LabelSet{"type": "a"}, Entry: logEntries[0].Entry},
70
{Labels: model.LabelSet{"type": "a"}, Entry: logEntries[1].Entry},
71
{Labels: model.LabelSet{"type": "b"}, Entry: logEntries[2].Entry},
72
},
73
expectedSizeBytes: len(logEntries[0].Entry.Line) + len(logEntries[1].Entry.Line) + len(logEntries[2].Entry.Line),
74
},
75
}
76
77
for testName, testData := range tests {
78
testData := testData
79
80
t.Run(testName, func(t *testing.T) {
81
b := newBatch(0)
82
83
for _, entry := range testData.inputEntries {
84
err := b.add(entry)
85
assert.NoError(t, err)
86
}
87
88
assert.Equal(t, testData.expectedSizeBytes, b.sizeBytes())
89
})
90
}
91
}
92
93
func TestBatch_encode(t *testing.T) {
94
t.Parallel()
95
96
tests := map[string]struct {
97
inputBatch *batch
98
expectedEntriesCount int
99
}{
100
"empty batch": {
101
inputBatch: newBatch(0),
102
expectedEntriesCount: 0,
103
},
104
"single stream with single log entry": {
105
inputBatch: newBatch(0,
106
loki.Entry{Labels: model.LabelSet{}, Entry: logEntries[0].Entry},
107
),
108
expectedEntriesCount: 1,
109
},
110
"single stream with multiple log entries": {
111
inputBatch: newBatch(0,
112
loki.Entry{Labels: model.LabelSet{}, Entry: logEntries[0].Entry},
113
loki.Entry{Labels: model.LabelSet{}, Entry: logEntries[1].Entry},
114
),
115
expectedEntriesCount: 2,
116
},
117
"multiple streams with multiple log entries": {
118
inputBatch: newBatch(0,
119
loki.Entry{Labels: model.LabelSet{"type": "a"}, Entry: logEntries[0].Entry},
120
loki.Entry{Labels: model.LabelSet{"type": "a"}, Entry: logEntries[1].Entry},
121
loki.Entry{Labels: model.LabelSet{"type": "b"}, Entry: logEntries[2].Entry},
122
),
123
expectedEntriesCount: 3,
124
},
125
}
126
127
for testName, testData := range tests {
128
testData := testData
129
130
t.Run(testName, func(t *testing.T) {
131
t.Parallel()
132
133
_, entriesCount, err := testData.inputBatch.encode()
134
require.NoError(t, err)
135
assert.Equal(t, testData.expectedEntriesCount, entriesCount)
136
})
137
}
138
}
139
140
func TestHashCollisions(t *testing.T) {
141
b := newBatch(0)
142
143
ls1 := model.LabelSet{"app": "l", "uniq0": "0", "uniq1": "1"}
144
ls2 := model.LabelSet{"app": "m", "uniq0": "1", "uniq1": "1"}
145
146
require.False(t, ls1.Equal(ls2))
147
require.Equal(t, ls1.FastFingerprint(), ls2.FastFingerprint())
148
149
const entriesPerLabel = 10
150
151
for i := 0; i < entriesPerLabel; i++ {
152
_ = b.add(loki.Entry{Labels: ls1, Entry: logproto.Entry{Timestamp: time.Now(), Line: fmt.Sprintf("line %d", i)}})
153
154
_ = b.add(loki.Entry{Labels: ls2, Entry: logproto.Entry{Timestamp: time.Now(), Line: fmt.Sprintf("line %d", i)}})
155
}
156
157
// make sure that colliding labels are stored properly as independent streams
158
req, entries := b.createPushRequest()
159
assert.Len(t, req.Streams, 2)
160
assert.Equal(t, 2*entriesPerLabel, entries)
161
162
if req.Streams[0].Labels == ls1.String() {
163
assert.Equal(t, ls1.String(), req.Streams[0].Labels)
164
assert.Equal(t, ls2.String(), req.Streams[1].Labels)
165
} else {
166
assert.Equal(t, ls2.String(), req.Streams[0].Labels)
167
assert.Equal(t, ls1.String(), req.Streams[1].Labels)
168
}
169
}
170
171