Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/file/file_test.go
4096 views
1
//go:build !race
2
3
package file
4
5
import (
6
"context"
7
"errors"
8
"log"
9
"os"
10
"path/filepath"
11
"testing"
12
"time"
13
14
"github.com/grafana/agent/component"
15
"github.com/grafana/agent/component/common/loki"
16
"github.com/grafana/agent/component/discovery"
17
"github.com/grafana/agent/pkg/flow/componenttest"
18
"github.com/grafana/agent/pkg/util"
19
"github.com/prometheus/client_golang/prometheus"
20
"github.com/prometheus/common/model"
21
"github.com/stretchr/testify/require"
22
"go.uber.org/goleak"
23
)
24
25
func Test(t *testing.T) {
26
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))
27
28
ctx, cancel := context.WithCancel(componenttest.TestContext(t))
29
defer cancel()
30
31
// Create file to log to.
32
f, err := os.CreateTemp(t.TempDir(), "example")
33
require.NoError(t, err)
34
defer f.Close()
35
36
ctrl, err := componenttest.NewControllerFromID(util.TestLogger(t), "loki.source.file")
37
require.NoError(t, err)
38
39
ch1, ch2 := make(chan loki.Entry), make(chan loki.Entry)
40
41
go func() {
42
err := ctrl.Run(ctx, Arguments{
43
Targets: []discovery.Target{{
44
"__path__": f.Name(),
45
"foo": "bar",
46
}},
47
ForwardTo: []loki.LogsReceiver{ch1, ch2},
48
})
49
require.NoError(t, err)
50
}()
51
52
ctrl.WaitRunning(time.Minute)
53
54
_, err = f.Write([]byte("writing some text\n"))
55
require.NoError(t, err)
56
57
wantLabelSet := model.LabelSet{
58
"filename": model.LabelValue(f.Name()),
59
"foo": "bar",
60
}
61
62
for i := 0; i < 2; i++ {
63
select {
64
case logEntry := <-ch1:
65
require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)
66
require.Equal(t, "writing some text", logEntry.Line)
67
require.Equal(t, wantLabelSet, logEntry.Labels)
68
case logEntry := <-ch2:
69
require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)
70
require.Equal(t, "writing some text", logEntry.Line)
71
require.Equal(t, wantLabelSet, logEntry.Labels)
72
case <-time.After(5 * time.Second):
73
require.FailNow(t, "failed waiting for log line")
74
}
75
}
76
}
77
78
// Test that updating the component does not leak goroutines.
79
func TestUpdate_NoLeak(t *testing.T) {
80
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))
81
82
ctx, cancel := context.WithCancel(componenttest.TestContext(t))
83
defer cancel()
84
85
// Create file to tail.
86
f, err := os.CreateTemp(t.TempDir(), "example")
87
require.NoError(t, err)
88
defer f.Close()
89
90
ctrl, err := componenttest.NewControllerFromID(util.TestLogger(t), "loki.source.file")
91
require.NoError(t, err)
92
93
args := Arguments{
94
Targets: []discovery.Target{{
95
"__path__": f.Name(),
96
"foo": "bar",
97
}},
98
ForwardTo: []loki.LogsReceiver{},
99
}
100
101
go func() {
102
err := ctrl.Run(ctx, args)
103
require.NoError(t, err)
104
}()
105
106
ctrl.WaitRunning(time.Minute)
107
108
// Update a bunch of times to ensure that no goroutines get leaked between
109
// updates.
110
for i := 0; i < 10; i++ {
111
err := ctrl.Update(args)
112
require.NoError(t, err)
113
}
114
}
115
116
func TestTwoTargets(t *testing.T) {
117
// Create opts for component
118
opts := component.Options{
119
Logger: util.TestFlowLogger(t),
120
Registerer: prometheus.NewRegistry(),
121
OnStateChange: func(e component.Exports) {},
122
DataPath: t.TempDir(),
123
}
124
125
f, err := os.CreateTemp(opts.DataPath, "example")
126
if err != nil {
127
log.Fatal(err)
128
}
129
f2, err := os.CreateTemp(opts.DataPath, "example2")
130
if err != nil {
131
log.Fatal(err)
132
}
133
defer f.Close()
134
defer f2.Close()
135
136
ch1 := make(chan loki.Entry)
137
args := Arguments{}
138
args.Targets = []discovery.Target{
139
{"__path__": f.Name(), "foo": "bar"},
140
{"__path__": f2.Name(), "foo": "bar2"},
141
}
142
args.ForwardTo = []loki.LogsReceiver{ch1}
143
144
c, err := New(opts, args)
145
require.NoError(t, err)
146
147
ctx, cancel := context.WithCancel(context.Background())
148
go c.Run(ctx)
149
time.Sleep(100 * time.Millisecond)
150
151
_, err = f.Write([]byte("text\n"))
152
require.NoError(t, err)
153
154
_, err = f2.Write([]byte("text2\n"))
155
require.NoError(t, err)
156
157
foundF1, foundF2 := false, false
158
for i := 0; i < 2; i++ {
159
select {
160
case logEntry := <-ch1:
161
require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)
162
if logEntry.Line == "text" {
163
foundF1 = true
164
} else if logEntry.Line == "text2" {
165
foundF2 = true
166
}
167
168
case <-time.After(5 * time.Second):
169
require.FailNow(t, "failed waiting for log line")
170
}
171
}
172
require.True(t, foundF1)
173
require.True(t, foundF2)
174
cancel()
175
// Verify that positions.yml is written. NOTE: if we didn't wait for it, there would be a race condition between
176
// temporary directory being cleaned up and this file being created.
177
require.Eventually(
178
t,
179
func() bool {
180
if _, err := os.Stat(filepath.Join(opts.DataPath, "positions.yml")); errors.Is(err, os.ErrNotExist) {
181
return false
182
}
183
return true
184
},
185
5*time.Second,
186
10*time.Millisecond,
187
"expected positions.yml file to be written eventually",
188
)
189
}
190
191