Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/gcplog/gcplog_test.go
4096 views
1
package gcplog
2
3
import (
4
"context"
5
"fmt"
6
"net/http"
7
"strings"
8
"testing"
9
"time"
10
11
"github.com/grafana/agent/component"
12
"github.com/grafana/agent/component/common/loki"
13
fnet "github.com/grafana/agent/component/common/net"
14
flow_relabel "github.com/grafana/agent/component/common/relabel"
15
gt "github.com/grafana/agent/component/loki/source/gcplog/internal/gcplogtarget"
16
17
"github.com/grafana/agent/pkg/util"
18
"github.com/grafana/regexp"
19
"github.com/phayes/freeport"
20
"github.com/prometheus/client_golang/prometheus"
21
"github.com/prometheus/common/model"
22
"github.com/stretchr/testify/require"
23
)
24
25
// TODO (@tpaschalis) We can't test this easily as there's no way to inject
26
// the mock PubSub client inside the component, but we'll find a workaround.
27
func TestPull(t *testing.T) {}
28
29
func TestPush(t *testing.T) {
30
opts := component.Options{
31
Logger: util.TestFlowLogger(t),
32
Registerer: prometheus.NewRegistry(),
33
OnStateChange: func(e component.Exports) {},
34
}
35
36
ch1, ch2 := make(chan loki.Entry), make(chan loki.Entry)
37
args := Arguments{}
38
39
port, err := freeport.GetFreePort()
40
require.NoError(t, err)
41
args.PushTarget = &gt.PushConfig{
42
Server: &fnet.ServerConfig{
43
HTTP: &fnet.HTTPConfig{
44
ListenAddress: "localhost",
45
ListenPort: port,
46
},
47
// assign random grpc port
48
GRPC: &fnet.GRPCConfig{ListenPort: 0},
49
},
50
Labels: map[string]string{
51
"foo": "bar",
52
},
53
}
54
args.ForwardTo = []loki.LogsReceiver{ch1, ch2}
55
args.RelabelRules = exportedRules
56
57
// Create and run the component.
58
c, err := New(opts, args)
59
require.NoError(t, err)
60
61
go c.Run(context.Background())
62
time.Sleep(200 * time.Millisecond)
63
64
// Create a GCP PushRequest and send it to the launched server.
65
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://localhost:%d/gcp/api/v1/push", port), strings.NewReader(testPushPayload))
66
require.NoError(t, err)
67
68
res, err := http.DefaultClient.Do(req)
69
require.NoError(t, err)
70
require.Equal(t, http.StatusNoContent, res.StatusCode)
71
72
// Check the received log entries
73
wantLabelSet := model.LabelSet{"foo": "bar", "message_id": "5187581549398349", "resource_type": "k8s_cluster"}
74
wantLogLine := "{\"insertId\":\"4affa858-e5f2-47f7-9254-e609b5c014d0\",\"labels\":{},\"logName\":\"projects/test-project/logs/cloudaudit.googleapis.com%2Fdata_access\",\"receiveTimestamp\":\"2022-09-06T18:07:43.417714046Z\",\"resource\":{\"labels\":{\"cluster_name\":\"dev-us-central-42\",\"location\":\"us-central1\",\"project_id\":\"test-project\"},\"type\":\"k8s_cluster\"},\"timestamp\":\"2022-09-06T18:07:42.363113Z\"}\n"
75
76
for i := 0; i < 2; i++ {
77
select {
78
case logEntry := <-ch1:
79
require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)
80
require.Equal(t, wantLogLine, logEntry.Line)
81
require.Equal(t, wantLabelSet, logEntry.Labels)
82
case logEntry := <-ch2:
83
require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)
84
require.Equal(t, wantLogLine, logEntry.Line)
85
require.Equal(t, wantLabelSet, logEntry.Labels)
86
case <-time.After(5 * time.Second):
87
require.FailNow(t, "failed waiting for log line")
88
}
89
}
90
}
91
92
const testPushPayload = `
93
{
94
"message": {
95
"attributes": {
96
"logging.googleapis.com/timestamp": "2022-07-25T22:19:09.903683708Z"
97
},
98
"data": "eyJpbnNlcnRJZCI6IjRhZmZhODU4LWU1ZjItNDdmNy05MjU0LWU2MDliNWMwMTRkMCIsImxhYmVscyI6e30sImxvZ05hbWUiOiJwcm9qZWN0cy90ZXN0LXByb2plY3QvbG9ncy9jbG91ZGF1ZGl0Lmdvb2dsZWFwaXMuY29tJTJGZGF0YV9hY2Nlc3MiLCJyZWNlaXZlVGltZXN0YW1wIjoiMjAyMi0wOS0wNlQxODowNzo0My40MTc3MTQwNDZaIiwicmVzb3VyY2UiOnsibGFiZWxzIjp7ImNsdXN0ZXJfbmFtZSI6ImRldi11cy1jZW50cmFsLTQyIiwibG9jYXRpb24iOiJ1cy1jZW50cmFsMSIsInByb2plY3RfaWQiOiJ0ZXN0LXByb2plY3QifSwidHlwZSI6Ims4c19jbHVzdGVyIn0sInRpbWVzdGFtcCI6IjIwMjItMDktMDZUMTg6MDc6NDIuMzYzMTEzWiJ9Cg==",
99
"messageId": "5187581549398349",
100
"message_id": "5187581549398349",
101
"publishTime": "2022-07-25T22:19:15.56Z",
102
"publish_time": "2022-07-25T22:19:15.56Z"
103
},
104
"subscription": "projects/test-project/subscriptions/test"
105
}`
106
107
var exportedRules = flow_relabel.Rules{
108
{
109
SourceLabels: []string{"__gcp_message_id"},
110
Regex: mustNewRegexp("(.*)"),
111
Action: flow_relabel.Replace,
112
Replacement: "$1",
113
TargetLabel: "message_id",
114
},
115
{
116
SourceLabels: []string{"__gcp_resource_type"},
117
Regex: mustNewRegexp("(.*)"),
118
Action: flow_relabel.Replace,
119
Replacement: "$1",
120
TargetLabel: "resource_type",
121
},
122
}
123
124
func mustNewRegexp(s string) flow_relabel.Regexp {
125
re, err := regexp.Compile("^(?:" + s + ")$")
126
if err != nil {
127
panic(err)
128
}
129
return flow_relabel.Regexp{Regexp: re}
130
}
131
132