Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/write/write_test.go
4096 views
1
package write
2
3
import (
4
"context"
5
"fmt"
6
"math"
7
"net/http"
8
"net/http/httptest"
9
"testing"
10
"time"
11
12
"github.com/grafana/agent/component/common/loki"
13
"github.com/grafana/agent/pkg/flow/componenttest"
14
"github.com/grafana/agent/pkg/river"
15
"github.com/grafana/agent/pkg/util"
16
"github.com/grafana/loki/pkg/logproto"
17
loki_util "github.com/grafana/loki/pkg/util"
18
"github.com/prometheus/common/model"
19
"github.com/stretchr/testify/require"
20
)
21
22
func TestRiverConfig(t *testing.T) {
23
var exampleRiverConfig = `
24
endpoint {
25
name = "test-url"
26
url = "http://0.0.0.0:11111/loki/api/v1/push"
27
remote_timeout = "100ms"
28
}
29
`
30
31
var args Arguments
32
err := river.Unmarshal([]byte(exampleRiverConfig), &args)
33
require.NoError(t, err)
34
}
35
36
func TestBadRiverConfig(t *testing.T) {
37
var exampleRiverConfig = `
38
endpoint {
39
name = "test-url"
40
url = "http://0.0.0.0:11111/loki/api/v1/push"
41
remote_timeout = "100ms"
42
bearer_token = "token"
43
bearer_token_file = "/path/to/file.token"
44
}
45
`
46
47
// Make sure the squashed HTTPClientConfig Validate function is being utilized correctly
48
var args Arguments
49
err := river.Unmarshal([]byte(exampleRiverConfig), &args)
50
require.ErrorContains(t, err, "at most one of bearer_token & bearer_token_file must be configured")
51
}
52
53
func Test(t *testing.T) {
54
// Set up the server that will receive the log entry, and expose it on ch.
55
ch := make(chan logproto.PushRequest)
56
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
57
var pushReq logproto.PushRequest
58
err := loki_util.ParseProtoReader(context.Background(), r.Body, int(r.ContentLength), math.MaxInt32, &pushReq, loki_util.RawSnappy)
59
if err != nil {
60
w.WriteHeader(http.StatusBadRequest)
61
return
62
}
63
tenantHeader := r.Header.Get("X-Scope-OrgID")
64
require.Equal(t, tenantHeader, "tenant-1")
65
66
ch <- pushReq
67
}))
68
defer srv.Close()
69
70
// Set up the component Arguments.
71
cfg := fmt.Sprintf(`
72
endpoint {
73
url = "%s"
74
batch_wait = "10ms"
75
tenant_id = "tenant-1"
76
}
77
`, srv.URL)
78
var args Arguments
79
require.NoError(t, river.Unmarshal([]byte(cfg), &args))
80
81
// Set up and start the component.
82
tc, err := componenttest.NewControllerFromID(util.TestLogger(t), "loki.write")
83
require.NoError(t, err)
84
go func() {
85
err = tc.Run(componenttest.TestContext(t), args)
86
require.NoError(t, err)
87
}()
88
require.NoError(t, tc.WaitExports(time.Second))
89
90
// Send two log entries to the component's receiver
91
logEntry := loki.Entry{
92
Labels: model.LabelSet{"foo": "bar"},
93
Entry: logproto.Entry{
94
Timestamp: time.Now(),
95
Line: "very important log",
96
},
97
}
98
99
exports := tc.Exports().(Exports)
100
exports.Receiver <- logEntry
101
exports.Receiver <- logEntry
102
103
// Wait for our exporter to finish and pass data to our HTTP server.
104
// Make sure the log entries were received correctly.
105
select {
106
case <-time.After(2 * time.Second):
107
require.FailNow(t, "failed waiting for logs")
108
case req := <-ch:
109
require.Len(t, req.Streams, 1)
110
require.Equal(t, req.Streams[0].Labels, logEntry.Labels.String())
111
require.Len(t, req.Streams[0].Entries, 2)
112
require.Equal(t, req.Streams[0].Entries[0].Line, logEntry.Line)
113
require.Equal(t, req.Streams[0].Entries[1].Line, logEntry.Line)
114
}
115
}
116
117