Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/logs/logs_test.go
4096 views
1
//go:build !race
2
3
package logs
4
5
import (
6
"fmt"
7
"net"
8
"net/http"
9
"os"
10
"path/filepath"
11
"strings"
12
"testing"
13
"time"
14
15
"github.com/grafana/loki/pkg/loghttp/push"
16
17
"github.com/go-kit/log"
18
"github.com/grafana/agent/pkg/util"
19
"github.com/grafana/loki/pkg/logproto"
20
"github.com/prometheus/client_golang/prometheus"
21
"github.com/stretchr/testify/require"
22
"gopkg.in/yaml.v2"
23
)
24
25
func TestLogs_NilConfig(t *testing.T) {
26
l, err := New(prometheus.NewRegistry(), nil, util.TestLogger(t), false)
27
require.NoError(t, err)
28
require.NoError(t, l.ApplyConfig(nil, false))
29
30
defer l.Stop()
31
}
32
33
func TestLogs(t *testing.T) {
34
//
35
// Create a temporary file to tail
36
//
37
positionsDir := t.TempDir()
38
39
tmpFile, err := os.CreateTemp(os.TempDir(), "*.log")
40
require.NoError(t, err)
41
t.Cleanup(func() {
42
_ = os.RemoveAll(tmpFile.Name())
43
})
44
45
//
46
// Listen for push requests and pass them through to a channel
47
//
48
pushes := make(chan *logproto.PushRequest)
49
50
lis, err := net.Listen("tcp", "127.0.0.1:0")
51
require.NoError(t, err)
52
t.Cleanup(func() {
53
require.NoError(t, lis.Close())
54
})
55
go func() {
56
_ = http.Serve(lis, http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
57
req, err := push.ParseRequest(log.NewNopLogger(), "user_id", r, nil)
58
require.NoError(t, err)
59
60
pushes <- req
61
_, _ = rw.Write(nil)
62
}))
63
}()
64
65
//
66
// Launch Loki so it starts tailing the file and writes to our server.
67
//
68
cfgText := util.Untab(fmt.Sprintf(`
69
positions_directory: %s
70
configs:
71
- name: default
72
clients:
73
- url: http://%s/loki/api/v1/push
74
batchwait: 50ms
75
batchsize: 1
76
scrape_configs:
77
- job_name: system
78
static_configs:
79
- targets: [localhost]
80
labels:
81
job: test
82
__path__: %s
83
`, positionsDir, lis.Addr().String(), tmpFile.Name()))
84
85
var cfg Config
86
dec := yaml.NewDecoder(strings.NewReader(cfgText))
87
dec.SetStrict(true)
88
require.NoError(t, dec.Decode(&cfg))
89
require.NoError(t, cfg.ApplyDefaults())
90
logger := log.NewSyncLogger(log.NewNopLogger())
91
l, err := New(prometheus.NewRegistry(), &cfg, logger, false)
92
require.NoError(t, err)
93
defer l.Stop()
94
95
//
96
// Write a log line and wait for it to come through.
97
//
98
fmt.Fprintf(tmpFile, "Hello, world!\n")
99
select {
100
case <-time.After(time.Second * 30):
101
require.FailNow(t, "timed out waiting for data to be pushed")
102
case req := <-pushes:
103
require.Equal(t, "Hello, world!", req.Streams[0].Entries[0].Line)
104
}
105
106
//
107
// Apply a new config and write a new line.
108
//
109
cfgText = util.Untab(fmt.Sprintf(`
110
positions_directory: %s
111
configs:
112
- name: default
113
clients:
114
- url: http://%s/loki/api/v1/push
115
batchwait: 50ms
116
batchsize: 5
117
scrape_configs:
118
- job_name: system
119
static_configs:
120
- targets: [localhost]
121
labels:
122
job: test-2
123
__path__: %s
124
`, positionsDir, lis.Addr().String(), tmpFile.Name()))
125
126
var newCfg Config
127
dec = yaml.NewDecoder(strings.NewReader(cfgText))
128
dec.SetStrict(true)
129
require.NoError(t, dec.Decode(&newCfg))
130
require.NoError(t, newCfg.ApplyDefaults())
131
require.NoError(t, l.ApplyConfig(&newCfg, false))
132
133
fmt.Fprintf(tmpFile, "Hello again!\n")
134
select {
135
case <-time.After(time.Second * 30):
136
require.FailNow(t, "timed out waiting for data to be pushed")
137
case req := <-pushes:
138
require.Equal(t, "Hello again!", req.Streams[0].Entries[0].Line)
139
}
140
141
t.Run("update to nil", func(t *testing.T) {
142
// Applying a nil config should remove all instances.
143
err := l.ApplyConfig(nil, false)
144
require.NoError(t, err)
145
require.Len(t, l.instances, 0)
146
})
147
148
t.Run("re-apply previous config", func(t *testing.T) {
149
// Applying a nil config should remove all instances.
150
l.ApplyConfig(nil, false)
151
152
// Re-Apply the previous config and write a new line.
153
var newCfg Config
154
dec = yaml.NewDecoder(strings.NewReader(cfgText))
155
dec.SetStrict(true)
156
require.NoError(t, dec.Decode(&newCfg))
157
require.NoError(t, newCfg.ApplyDefaults())
158
require.NoError(t, l.ApplyConfig(&newCfg, false))
159
160
fmt.Fprintf(tmpFile, "Hello again!\n")
161
select {
162
case <-time.After(time.Second * 30):
163
require.FailNow(t, "timed out waiting for data to be pushed")
164
case req := <-pushes:
165
require.Equal(t, "Hello again!", req.Streams[0].Entries[0].Line)
166
}
167
})
168
}
169
170
func TestLogs_PositionsDirectory(t *testing.T) {
171
//
172
// Create a temporary file to tail
173
//
174
positionsDir := t.TempDir()
175
176
//
177
// Launch Loki so it starts tailing the file and writes to our server.
178
//
179
cfgText := util.Untab(fmt.Sprintf(`
180
positions_directory: %[1]s/positions
181
configs:
182
- name: instance-a
183
clients:
184
- url: http://127.0.0.1:80/loki/api/v1/push
185
- name: instance-b
186
positions:
187
filename: %[1]s/other-positions/instance.yml
188
clients:
189
- url: http://127.0.0.1:80/loki/api/v1/push
190
`, positionsDir))
191
192
var cfg Config
193
dec := yaml.NewDecoder(strings.NewReader(cfgText))
194
dec.SetStrict(true)
195
require.NoError(t, dec.Decode(&cfg))
196
require.NoError(t, cfg.ApplyDefaults())
197
logger := util.TestLogger(t)
198
l, err := New(prometheus.NewRegistry(), &cfg, logger, false)
199
require.NoError(t, err)
200
defer l.Stop()
201
202
_, err = os.Stat(filepath.Join(positionsDir, "positions"))
203
require.NoError(t, err, "default shared positions directory did not get created")
204
_, err = os.Stat(filepath.Join(positionsDir, "other-positions"))
205
require.NoError(t, err, "instance-specific positions directory did not get created")
206
}
207
208