Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/file/decompresser_test.go
4096 views
1
package file
2
3
// This code is copied from Promtail to test their decompressor implementation
4
// of the reader interface.
5
6
import (
7
"os"
8
"sync"
9
"testing"
10
"time"
11
12
"github.com/grafana/agent/component/common/loki/client/fake"
13
14
"github.com/go-kit/log"
15
"github.com/grafana/agent/component/common/loki"
16
"github.com/prometheus/client_golang/prometheus"
17
"github.com/stretchr/testify/require"
18
"go.uber.org/atomic"
19
)
20
21
type noopClient struct {
22
noopChan chan loki.Entry
23
wg sync.WaitGroup
24
once sync.Once
25
}
26
27
func (n *noopClient) Chan() chan<- loki.Entry {
28
return n.noopChan
29
}
30
31
func (n *noopClient) Stop() {
32
n.once.Do(func() { close(n.noopChan) })
33
}
34
35
func newNoopClient() *noopClient {
36
c := &noopClient{noopChan: make(chan loki.Entry)}
37
c.wg.Add(1)
38
go func() {
39
defer c.wg.Done()
40
for range c.noopChan {
41
// noop
42
}
43
}()
44
return c
45
}
46
47
func BenchmarkReadlines(b *testing.B) {
48
entryHandler := newNoopClient()
49
50
scenarios := []struct {
51
name string
52
file string
53
}{
54
{
55
name: "2000 lines of log .tar.gz compressed",
56
file: "testdata/short-access.tar.gz",
57
},
58
{
59
name: "100000 lines of log .gz compressed",
60
file: "testdata/long-access.gz",
61
},
62
}
63
64
for _, tc := range scenarios {
65
b.Run(tc.name, func(b *testing.B) {
66
decBase := &decompressor{
67
logger: log.NewNopLogger(),
68
running: atomic.NewBool(false),
69
handler: entryHandler,
70
path: tc.file,
71
}
72
73
for i := 0; i < b.N; i++ {
74
newDec := decBase
75
newDec.metrics = newMetrics(prometheus.NewRegistry())
76
newDec.done = make(chan struct{})
77
newDec.readLines()
78
<-newDec.done
79
}
80
})
81
}
82
}
83
84
func TestGigantiqueGunzipFile(t *testing.T) {
85
file := "testdata/long-access.gz"
86
handler := fake.NewClient(func() {})
87
defer handler.Stop()
88
89
d := &decompressor{
90
logger: log.NewNopLogger(),
91
running: atomic.NewBool(false),
92
handler: handler,
93
path: file,
94
done: make(chan struct{}),
95
metrics: newMetrics(prometheus.NewRegistry()),
96
}
97
98
d.readLines()
99
100
<-d.done
101
time.Sleep(time.Millisecond * 200)
102
103
entries := handler.Received()
104
require.Equal(t, 100000, len(entries))
105
}
106
107
// TestOnelineFiles test the supported formats for log lines that only contain 1 line.
108
//
109
// Based on our experience, this is the scenario with the most edge cases.
110
func TestOnelineFiles(t *testing.T) {
111
fileContent, err := os.ReadFile("testdata/onelinelog.log")
112
require.NoError(t, err)
113
t.Run("gunzip file", func(t *testing.T) {
114
file := "testdata/onelinelog.log.gz"
115
handler := fake.NewClient(func() {})
116
defer handler.Stop()
117
118
d := &decompressor{
119
logger: log.NewNopLogger(),
120
running: atomic.NewBool(false),
121
handler: handler,
122
path: file,
123
done: make(chan struct{}),
124
metrics: newMetrics(prometheus.NewRegistry()),
125
}
126
127
d.readLines()
128
129
<-d.done
130
time.Sleep(time.Millisecond * 200)
131
132
entries := handler.Received()
133
require.Equal(t, 1, len(entries))
134
require.Equal(t, string(fileContent), entries[0].Line)
135
})
136
137
t.Run("bzip2 file", func(t *testing.T) {
138
file := "testdata/onelinelog.log.bz2"
139
handler := fake.NewClient(func() {})
140
defer handler.Stop()
141
142
d := &decompressor{
143
logger: log.NewNopLogger(),
144
running: atomic.NewBool(false),
145
handler: handler,
146
path: file,
147
done: make(chan struct{}),
148
metrics: newMetrics(prometheus.NewRegistry()),
149
}
150
151
d.readLines()
152
153
<-d.done
154
time.Sleep(time.Millisecond * 200)
155
156
entries := handler.Received()
157
require.Equal(t, 1, len(entries))
158
require.Equal(t, string(fileContent), entries[0].Line)
159
})
160
161
t.Run("tar.gz file", func(t *testing.T) {
162
file := "testdata/onelinelog.tar.gz"
163
handler := fake.NewClient(func() {})
164
defer handler.Stop()
165
166
d := &decompressor{
167
logger: log.NewNopLogger(),
168
running: atomic.NewBool(false),
169
handler: handler,
170
path: file,
171
done: make(chan struct{}),
172
metrics: newMetrics(prometheus.NewRegistry()),
173
}
174
175
d.readLines()
176
177
<-d.done
178
time.Sleep(time.Millisecond * 200)
179
180
entries := handler.Received()
181
require.Equal(t, 1, len(entries))
182
firstEntry := entries[0]
183
require.Contains(t, firstEntry.Line, "onelinelog.log") // contains .tar.gz headers
184
require.Contains(t, firstEntry.Line, `5.202.214.160 - - [26/Jan/2019:19:45:25 +0330] "GET / HTTP/1.1" 200 30975 "https://www.zanbil.ir/" "Mozilla/5.0 (Windows NT 6.2; WOW64; rv:21.0) Gecko/20100101 Firefox/21.0" "-"`)
185
})
186
}
187
188