Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/gcplog/internal/gcplogtarget/push_target_test.go
4096 views
1
package gcplogtarget
2
3
import (
4
"fmt"
5
"net/http"
6
"os"
7
"strings"
8
"sync"
9
"testing"
10
"time"
11
12
"github.com/grafana/agent/component/common/loki/client/fake"
13
14
"github.com/grafana/agent/component/common/loki"
15
fnet "github.com/grafana/agent/component/common/net"
16
17
"github.com/go-kit/log"
18
"github.com/phayes/freeport"
19
"github.com/prometheus/client_golang/prometheus"
20
"github.com/prometheus/common/model"
21
"github.com/prometheus/prometheus/model/relabel"
22
"github.com/stretchr/testify/require"
23
)
24
25
const localhost = "127.0.0.1"
26
27
const expectedMessageData = `{"insertId":"4affa858-e5f2-47f7-9254-e609b5c014d0","labels":{},"logName":"projects/test-project/logs/cloudaudit.googleapis.com%2Fdata_access","receiveTimestamp":"2022-09-06T18:07:43.417714046Z","resource":{"labels":{"cluster_name":"dev-us-central-42","location":"us-central1","project_id":"test-project"},"type":"k8s_cluster"},"timestamp":"2022-09-06T18:07:42.363113Z"}
28
`
29
const testPayload = `
30
{
31
"message": {
32
"attributes": {
33
"logging.googleapis.com/timestamp": "2022-07-25T22:19:09.903683708Z"
34
},
35
"data": "eyJpbnNlcnRJZCI6IjRhZmZhODU4LWU1ZjItNDdmNy05MjU0LWU2MDliNWMwMTRkMCIsImxhYmVscyI6e30sImxvZ05hbWUiOiJwcm9qZWN0cy90ZXN0LXByb2plY3QvbG9ncy9jbG91ZGF1ZGl0Lmdvb2dsZWFwaXMuY29tJTJGZGF0YV9hY2Nlc3MiLCJyZWNlaXZlVGltZXN0YW1wIjoiMjAyMi0wOS0wNlQxODowNzo0My40MTc3MTQwNDZaIiwicmVzb3VyY2UiOnsibGFiZWxzIjp7ImNsdXN0ZXJfbmFtZSI6ImRldi11cy1jZW50cmFsLTQyIiwibG9jYXRpb24iOiJ1cy1jZW50cmFsMSIsInByb2plY3RfaWQiOiJ0ZXN0LXByb2plY3QifSwidHlwZSI6Ims4c19jbHVzdGVyIn0sInRpbWVzdGFtcCI6IjIwMjItMDktMDZUMTg6MDc6NDIuMzYzMTEzWiJ9Cg==",
36
"messageId": "5187581549398349",
37
"message_id": "5187581549398349",
38
"publishTime": "2022-07-25T22:19:15.56Z",
39
"publish_time": "2022-07-25T22:19:15.56Z"
40
},
41
"subscription": "projects/test-project/subscriptions/test"
42
}`
43
44
func makeGCPPushRequest(host string, body string) (*http.Request, error) {
45
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/gcp/api/v1/push", host), strings.NewReader(body))
46
if err != nil {
47
return nil, err
48
}
49
return req, nil
50
}
51
52
func TestPushTarget(t *testing.T) {
53
w := log.NewSyncWriter(os.Stderr)
54
logger := log.NewLogfmtLogger(w)
55
56
type expectedEntry struct {
57
labels model.LabelSet
58
line string
59
}
60
type args struct {
61
RequestBody string
62
RelabelConfigs []*relabel.Config
63
Labels model.LabelSet
64
}
65
66
cases := map[string]struct {
67
args args
68
expectedEntries []expectedEntry
69
}{
70
"simplified cloud functions log line": {
71
args: args{
72
RequestBody: testPayload,
73
Labels: model.LabelSet{
74
"job": "some_job_name",
75
},
76
},
77
expectedEntries: []expectedEntry{
78
{
79
labels: model.LabelSet{
80
"job": "some_job_name",
81
},
82
line: expectedMessageData,
83
},
84
},
85
},
86
"simplified cloud functions log line, with relabeling custom attribute and message id": {
87
args: args{
88
RequestBody: testPayload,
89
Labels: model.LabelSet{
90
"job": "some_job_name",
91
},
92
RelabelConfigs: []*relabel.Config{
93
{
94
SourceLabels: model.LabelNames{"__gcp_attributes_logging_googleapis_com_timestamp"},
95
Regex: relabel.MustNewRegexp("(.*)"),
96
Replacement: "$1",
97
TargetLabel: "google_timestamp",
98
Action: relabel.Replace,
99
},
100
{
101
SourceLabels: model.LabelNames{"__gcp_message_id"},
102
Regex: relabel.MustNewRegexp("(.*)"),
103
Replacement: "$1",
104
TargetLabel: "message_id",
105
Action: relabel.Replace,
106
},
107
{
108
SourceLabels: model.LabelNames{"__gcp_subscription_name"},
109
Regex: relabel.MustNewRegexp("(.*)"),
110
Replacement: "$1",
111
TargetLabel: "subscription",
112
Action: relabel.Replace,
113
},
114
// Internal GCP Log entry attributes and labels
115
{
116
SourceLabels: model.LabelNames{"__gcp_logname"},
117
Regex: relabel.MustNewRegexp("(.*)"),
118
Replacement: "$1",
119
TargetLabel: "log_name",
120
Action: relabel.Replace,
121
},
122
{
123
SourceLabels: model.LabelNames{"__gcp_resource_type"},
124
Regex: relabel.MustNewRegexp("(.*)"),
125
Replacement: "$1",
126
TargetLabel: "resource_type",
127
Action: relabel.Replace,
128
},
129
{
130
SourceLabels: model.LabelNames{"__gcp_resource_labels_cluster_name"},
131
Regex: relabel.MustNewRegexp("(.*)"),
132
Replacement: "$1",
133
TargetLabel: "cluster",
134
Action: relabel.Replace,
135
},
136
},
137
},
138
expectedEntries: []expectedEntry{
139
{
140
labels: model.LabelSet{
141
"job": "some_job_name",
142
"google_timestamp": "2022-07-25T22:19:09.903683708Z",
143
"message_id": "5187581549398349",
144
"subscription": "projects/test-project/subscriptions/test",
145
"log_name": "projects/test-project/logs/cloudaudit.googleapis.com%2Fdata_access",
146
"resource_type": "k8s_cluster",
147
"cluster": "dev-us-central-42",
148
},
149
line: expectedMessageData,
150
},
151
},
152
},
153
}
154
for name, tc := range cases {
155
outerName := t.Name()
156
t.Run(name, func(t *testing.T) {
157
// Create fake promtail client
158
eh := fake.NewClient(func() {})
159
defer eh.Stop()
160
161
port, err := freeport.GetFreePort()
162
require.NoError(t, err)
163
lbls := make(map[string]string, len(tc.args.Labels))
164
for k, v := range tc.args.Labels {
165
lbls[string(k)] = string(v)
166
}
167
config := &PushConfig{
168
Labels: lbls,
169
UseIncomingTimestamp: false,
170
Server: &fnet.ServerConfig{
171
HTTP: &fnet.HTTPConfig{
172
ListenAddress: "localhost",
173
ListenPort: port,
174
},
175
// assign random grpc port
176
GRPC: &fnet.GRPCConfig{ListenPort: 0},
177
},
178
}
179
180
prometheus.DefaultRegisterer = prometheus.NewRegistry()
181
metrics := NewMetrics(prometheus.DefaultRegisterer)
182
pt, err := NewPushTarget(metrics, logger, eh, outerName+"_test_job", config, tc.args.RelabelConfigs, nil)
183
require.NoError(t, err)
184
defer func() {
185
_ = pt.Stop()
186
}()
187
188
// Clear received lines after test case is ran
189
defer eh.Clear()
190
191
// Send some logs
192
ts := time.Now()
193
194
req, err := makeGCPPushRequest(fmt.Sprintf("http://%s:%d", localhost, port), tc.args.RequestBody)
195
require.NoError(t, err, "expected request to be created successfully")
196
res, err := http.DefaultClient.Do(req)
197
require.NoError(t, err)
198
require.Equal(t, http.StatusNoContent, res.StatusCode, "expected no-content status code")
199
200
waitForMessages(eh)
201
202
// Make sure we didn't time out
203
require.Equal(t, 1, len(eh.Received()))
204
205
require.Equal(t, len(eh.Received()), len(tc.expectedEntries), "expected to receive equal amount of expected label sets")
206
for i, expectedEntry := range tc.expectedEntries {
207
// TODO: Add assertion over propagated timestamp
208
actualEntry := eh.Received()[i]
209
210
require.Equal(t, expectedEntry.line, actualEntry.Line, "expected line to be equal for %d-th entry", i)
211
212
expectedLS := expectedEntry.labels
213
actualLS := actualEntry.Labels
214
for label, value := range expectedLS {
215
require.Equal(t, expectedLS[label], actualLS[label], "expected label %s to be equal to %s in %d-th entry", label, value, i)
216
}
217
218
// Timestamp is always set in the handler, we expect received timestamps to be slightly higher than the timestamp when we started sending logs.
219
require.GreaterOrEqual(t, actualEntry.Timestamp.Unix(), ts.Unix(), "expected %d-th entry to have a received timestamp greater than publish time", i)
220
}
221
})
222
}
223
}
224
225
func TestPushTarget_UseIncomingTimestamp(t *testing.T) {
226
w := log.NewSyncWriter(os.Stderr)
227
logger := log.NewLogfmtLogger(w)
228
229
// Create fake promtail client
230
eh := fake.NewClient(func() {})
231
defer eh.Stop()
232
233
port, err := freeport.GetFreePort()
234
require.NoError(t, err)
235
require.NoError(t, err, "error generating server config or finding open port")
236
config := &PushConfig{
237
Labels: nil,
238
UseIncomingTimestamp: true,
239
Server: &fnet.ServerConfig{
240
HTTP: &fnet.HTTPConfig{
241
ListenAddress: "localhost",
242
ListenPort: port,
243
},
244
// assign random grpc port
245
GRPC: &fnet.GRPCConfig{ListenPort: 0},
246
},
247
}
248
249
prometheus.DefaultRegisterer = prometheus.NewRegistry()
250
metrics := NewMetrics(prometheus.DefaultRegisterer)
251
pt, err := NewPushTarget(metrics, logger, eh, t.Name()+"_test_job", config, nil, nil)
252
require.NoError(t, err)
253
defer func() {
254
_ = pt.Stop()
255
}()
256
257
// Clear received lines after test case is ran
258
defer eh.Clear()
259
260
req, err := makeGCPPushRequest(fmt.Sprintf("http://%s:%d", localhost, port), testPayload)
261
require.NoError(t, err, "expected request to be created successfully")
262
res, err := http.DefaultClient.Do(req)
263
require.NoError(t, err)
264
require.Equal(t, http.StatusNoContent, res.StatusCode, "expected no-content status code")
265
266
waitForMessages(eh)
267
268
// Make sure we didn't time out
269
require.Equal(t, 1, len(eh.Received()))
270
271
expectedTs, err := time.Parse(time.RFC3339Nano, "2022-09-06T18:07:42.363113Z")
272
require.NoError(t, err, "expected expected timestamp to be parse correctly")
273
require.Equal(t, expectedTs, eh.Received()[0].Timestamp, "expected entry timestamp to be overridden by received one")
274
}
275
276
func TestPushTarget_UseTenantIDHeaderIfPresent(t *testing.T) {
277
w := log.NewSyncWriter(os.Stderr)
278
logger := log.NewLogfmtLogger(w)
279
280
// Create fake promtail client
281
eh := fake.NewClient(func() {})
282
defer eh.Stop()
283
284
port, err := freeport.GetFreePort()
285
require.NoError(t, err)
286
config := &PushConfig{
287
Labels: nil,
288
UseIncomingTimestamp: true,
289
Server: &fnet.ServerConfig{
290
HTTP: &fnet.HTTPConfig{
291
ListenAddress: "localhost",
292
ListenPort: port,
293
},
294
// assign random grpc port
295
GRPC: &fnet.GRPCConfig{ListenPort: 0},
296
},
297
}
298
299
prometheus.DefaultRegisterer = prometheus.NewRegistry()
300
metrics := NewMetrics(prometheus.DefaultRegisterer)
301
tenantIDRelabelConfig := []*relabel.Config{
302
{
303
SourceLabels: model.LabelNames{"__tenant_id__"},
304
Regex: relabel.MustNewRegexp("(.*)"),
305
Replacement: "$1",
306
TargetLabel: "tenant_id",
307
Action: relabel.Replace,
308
},
309
}
310
pt, err := NewPushTarget(metrics, logger, eh, t.Name()+"_test_job", config, tenantIDRelabelConfig, nil)
311
require.NoError(t, err)
312
defer func() {
313
_ = pt.Stop()
314
}()
315
316
// Clear received lines after test case is ran
317
defer eh.Clear()
318
319
req, err := makeGCPPushRequest(fmt.Sprintf("http://%s:%d", localhost, port), testPayload)
320
require.NoError(t, err, "expected request to be created successfully")
321
req.Header.Set("X-Scope-OrgID", "42")
322
res, err := http.DefaultClient.Do(req)
323
require.NoError(t, err)
324
require.Equal(t, http.StatusNoContent, res.StatusCode, "expected no-content status code")
325
326
waitForMessages(eh)
327
328
// Make sure we didn't time out
329
require.Equal(t, 1, len(eh.Received()))
330
331
require.Equal(t, model.LabelValue("42"), eh.Received()[0].Labels[ReservedLabelTenantID])
332
require.Equal(t, model.LabelValue("42"), eh.Received()[0].Labels["tenant_id"])
333
}
334
335
func TestPushTarget_ErroneousPayloadsAreRejected(t *testing.T) {
336
w := log.NewSyncWriter(os.Stderr)
337
logger := log.NewLogfmtLogger(w)
338
339
// Create fake promtail client
340
eh := fake.NewClient(func() {})
341
defer eh.Stop()
342
343
port, err := freeport.GetFreePort()
344
require.NoError(t, err)
345
config := &PushConfig{
346
Labels: nil,
347
Server: &fnet.ServerConfig{
348
HTTP: &fnet.HTTPConfig{
349
ListenAddress: "localhost",
350
ListenPort: port,
351
},
352
// assign random grpc port
353
GRPC: &fnet.GRPCConfig{ListenPort: 0},
354
},
355
}
356
357
prometheus.DefaultRegisterer = prometheus.NewRegistry()
358
metrics := NewMetrics(prometheus.DefaultRegisterer)
359
pt, err := NewPushTarget(metrics, logger, eh, t.Name()+"_test_job", config, nil, nil)
360
require.NoError(t, err)
361
defer func() {
362
_ = pt.Stop()
363
}()
364
365
// Clear received lines after test case is ran
366
defer eh.Clear()
367
368
for caseName, testPayload := range map[string]string{
369
"invalid JSON": "{",
370
"empty": "{}",
371
"missing subscription": `{
372
"message": {
373
"message_id": "123",
374
"data": "some data"
375
}
376
}`,
377
"missing message ID": `{
378
"subscription": "sub",
379
"message": {
380
"data": "data"
381
}
382
}`,
383
"missing data": `{
384
"subscription": "sub",
385
"message": {
386
"data": "",
387
"message_id":"123"
388
}
389
}`,
390
} {
391
t.Run(caseName, func(t *testing.T) {
392
req, err := makeGCPPushRequest(fmt.Sprintf("http://%s:%d", localhost, port), testPayload)
393
require.NoError(t, err, "expected request to be created successfully")
394
res, err := http.DefaultClient.Do(req)
395
res.Request.Body.Close()
396
require.NoError(t, err)
397
require.Equal(t, http.StatusBadRequest, res.StatusCode, "expected bad request status code")
398
})
399
}
400
}
401
402
// blockingEntryHandler implements an loki.EntryHandler that has no space in
403
// it's receive channel, blocking when an loki.Entry is sent down the pipe.
404
type blockingEntryHandler struct {
405
ch chan loki.Entry
406
once sync.Once
407
}
408
409
func newBlockingEntryHandler() *blockingEntryHandler {
410
filledChannel := make(chan loki.Entry)
411
return &blockingEntryHandler{ch: filledChannel}
412
}
413
414
func (t *blockingEntryHandler) Chan() chan<- loki.Entry {
415
return t.ch
416
}
417
418
func (t *blockingEntryHandler) Stop() {
419
t.once.Do(func() { close(t.ch) })
420
}
421
422
func TestPushTarget_UsePushTimeout(t *testing.T) {
423
w := log.NewSyncWriter(os.Stderr)
424
logger := log.NewLogfmtLogger(w)
425
426
eh := newBlockingEntryHandler()
427
defer eh.Stop()
428
429
port, err := freeport.GetFreePort()
430
require.NoError(t, err)
431
config := &PushConfig{
432
Labels: nil,
433
UseIncomingTimestamp: true,
434
PushTimeout: time.Second,
435
Server: &fnet.ServerConfig{
436
HTTP: &fnet.HTTPConfig{
437
ListenAddress: "localhost",
438
ListenPort: port,
439
},
440
// assign random grpc port
441
GRPC: &fnet.GRPCConfig{ListenPort: 0},
442
},
443
}
444
445
prometheus.DefaultRegisterer = prometheus.NewRegistry()
446
metrics := NewMetrics(prometheus.DefaultRegisterer)
447
tenantIDRelabelConfig := []*relabel.Config{
448
{
449
SourceLabels: model.LabelNames{"__tenant_id__"},
450
Regex: relabel.MustNewRegexp("(.*)"),
451
Replacement: "$1",
452
TargetLabel: "tenant_id",
453
Action: relabel.Replace,
454
},
455
}
456
pt, err := NewPushTarget(metrics, logger, eh, t.Name()+"_test_job", config, tenantIDRelabelConfig, nil)
457
require.NoError(t, err)
458
defer func() {
459
_ = pt.Stop()
460
}()
461
462
req, err := makeGCPPushRequest(fmt.Sprintf("http://%s:%d", localhost, port), testPayload)
463
require.NoError(t, err, "expected request to be created successfully")
464
res, err := http.DefaultClient.Do(req)
465
require.NoError(t, err)
466
require.Equal(t, http.StatusServiceUnavailable, res.StatusCode, "expected timeout response")
467
}
468
469
func waitForMessages(eh *fake.Client) {
470
countdown := 1000
471
for len(eh.Received()) != 1 && countdown > 0 {
472
time.Sleep(1 * time.Millisecond)
473
countdown--
474
}
475
}
476
477