Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/heroku/internal/herokutarget/target_test.go
4096 views
1
package herokutarget
2
3
// This code is copied from Promtail. The herokutarget package is used to
4
// configure and run the targets that can read heroku entries and forward them
5
// to other loki components.
6
7
import (
8
"fmt"
9
"net"
10
"net/http"
11
"net/url"
12
"os"
13
"strings"
14
"testing"
15
"time"
16
17
"github.com/grafana/agent/component/common/loki/client/fake"
18
19
"github.com/go-kit/log"
20
"github.com/google/uuid"
21
"github.com/prometheus/client_golang/prometheus"
22
"github.com/prometheus/common/model"
23
"github.com/prometheus/prometheus/model/relabel"
24
"github.com/stretchr/testify/require"
25
26
fnet "github.com/grafana/agent/component/common/net"
27
)
28
29
const localhost = "127.0.0.1"
30
31
const testPayload = `270 <158>1 2022-06-13T14:52:23.622778+00:00 host heroku router - at=info method=GET path="/" host=cryptic-cliffs-27764.herokuapp.com request_id=59da6323-2bc4-4143-8677-cc66ccfb115f fwd="181.167.87.140" dyno=web.1 connect=0ms service=3ms status=200 bytes=6979 protocol=https
32
`
33
const testLogLine1 = `140 <190>1 2022-06-13T14:52:23.621815+00:00 host app web.1 - [GIN] 2022/06/13 - 14:52:23 | 200 | 1.428101ms | 181.167.87.140 | GET "/"
34
`
35
const testLogLine1Timestamp = "2022-06-13T14:52:23.621815+00:00"
36
const testLogLine2 = `156 <190>1 2022-06-13T14:52:23.827271+00:00 host app web.1 - [GIN] 2022/06/13 - 14:52:23 | 200 | 163.92µs | 181.167.87.140 | GET "/static/main.css"
37
`
38
39
func makeDrainRequest(host string, params map[string][]string, bodies ...string) (*http.Request, error) {
40
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/heroku/api/v1/drain", host), strings.NewReader(strings.Join(bodies, "")))
41
if err != nil {
42
return nil, err
43
}
44
45
drainToken := uuid.New().String()
46
frameID := uuid.New().String()
47
48
values := url.Values{}
49
for name, params := range params {
50
for _, p := range params {
51
values.Add(name, p)
52
}
53
}
54
req.URL.RawQuery = values.Encode()
55
56
req.Header.Set("Content-Type", "application/heroku_drain-1")
57
req.Header.Set("Logplex-Drain-Token", fmt.Sprintf("d.%s", drainToken))
58
req.Header.Set("Logplex-Frame-Id", frameID)
59
req.Header.Set("Logplex-Msg-Count", fmt.Sprintf("%d", len(bodies)))
60
61
return req, nil
62
}
63
64
func TestHerokuDrainTarget(t *testing.T) {
65
w := log.NewSyncWriter(os.Stderr)
66
logger := log.NewLogfmtLogger(w)
67
68
type expectedEntry struct {
69
labels model.LabelSet
70
line string
71
}
72
type args struct {
73
RequestBodies []string
74
RequestParams map[string][]string
75
RelabelConfigs []*relabel.Config
76
Labels model.LabelSet
77
}
78
79
cases := map[string]struct {
80
args args
81
expectedEntries []expectedEntry
82
}{
83
"heroku request with a single log line, internal labels dropped, and fixed are propagated": {
84
args: args{
85
RequestBodies: []string{testPayload},
86
RequestParams: map[string][]string{},
87
Labels: model.LabelSet{
88
"job": "some_job_name",
89
},
90
},
91
expectedEntries: []expectedEntry{
92
{
93
labels: model.LabelSet{
94
"job": "some_job_name",
95
},
96
line: `at=info method=GET path="/" host=cryptic-cliffs-27764.herokuapp.com request_id=59da6323-2bc4-4143-8677-cc66ccfb115f fwd="181.167.87.140" dyno=web.1 connect=0ms service=3ms status=200 bytes=6979 protocol=https
97
`,
98
},
99
},
100
},
101
"heroku request with a single log line and query parameters, internal labels dropped, and fixed are propagated": {
102
args: args{
103
RequestBodies: []string{testPayload},
104
RequestParams: map[string][]string{
105
"some_query_param": {"app_123", "app_456"},
106
},
107
Labels: model.LabelSet{
108
"job": "some_job_name",
109
},
110
},
111
expectedEntries: []expectedEntry{
112
{
113
labels: model.LabelSet{
114
"job": "some_job_name",
115
},
116
line: `at=info method=GET path="/" host=cryptic-cliffs-27764.herokuapp.com request_id=59da6323-2bc4-4143-8677-cc66ccfb115f fwd="181.167.87.140" dyno=web.1 connect=0ms service=3ms status=200 bytes=6979 protocol=https
117
`,
118
},
119
},
120
},
121
"heroku request with two log lines, internal labels dropped, and fixed are propagated": {
122
args: args{
123
RequestBodies: []string{testLogLine1, testLogLine2},
124
RequestParams: map[string][]string{},
125
Labels: model.LabelSet{
126
"job": "multiple_line_job",
127
},
128
},
129
expectedEntries: []expectedEntry{
130
{
131
labels: model.LabelSet{
132
"job": "multiple_line_job",
133
},
134
line: `[GIN] 2022/06/13 - 14:52:23 | 200 | 1.428101ms | 181.167.87.140 | GET "/"
135
`,
136
},
137
{
138
labels: model.LabelSet{
139
"job": "multiple_line_job",
140
},
141
line: `[GIN] 2022/06/13 - 14:52:23 | 200 | 163.92µs | 181.167.87.140 | GET "/static/main.css"
142
`,
143
},
144
},
145
},
146
"heroku request with two log lines and query parameters, internal labels dropped, and fixed are propagated": {
147
args: args{
148
RequestBodies: []string{testLogLine1, testLogLine2},
149
RequestParams: map[string][]string{
150
"some_query_param": {"app_123", "app_456"},
151
},
152
Labels: model.LabelSet{
153
"job": "multiple_line_job",
154
},
155
},
156
expectedEntries: []expectedEntry{
157
{
158
labels: model.LabelSet{
159
"job": "multiple_line_job",
160
},
161
line: `[GIN] 2022/06/13 - 14:52:23 | 200 | 1.428101ms | 181.167.87.140 | GET "/"
162
`,
163
},
164
{
165
labels: model.LabelSet{
166
"job": "multiple_line_job",
167
},
168
line: `[GIN] 2022/06/13 - 14:52:23 | 200 | 163.92µs | 181.167.87.140 | GET "/static/main.css"
169
`,
170
},
171
},
172
},
173
"heroku request with a single log line, with internal labels relabeled, and fixed labels": {
174
args: args{
175
RequestBodies: []string{testLogLine1},
176
RequestParams: map[string][]string{},
177
Labels: model.LabelSet{
178
"job": "relabeling_job",
179
},
180
RelabelConfigs: []*relabel.Config{
181
{
182
SourceLabels: model.LabelNames{"__heroku_drain_host"},
183
TargetLabel: "host",
184
Replacement: "$1",
185
Action: relabel.Replace,
186
Regex: relabel.MustNewRegexp("(.*)"),
187
},
188
{
189
SourceLabels: model.LabelNames{"__heroku_drain_app"},
190
TargetLabel: "app",
191
Replacement: "$1",
192
Action: relabel.Replace,
193
Regex: relabel.MustNewRegexp("(.*)"),
194
},
195
{
196
SourceLabels: model.LabelNames{"__heroku_drain_proc"},
197
TargetLabel: "procID",
198
Replacement: "$1",
199
Action: relabel.Replace,
200
Regex: relabel.MustNewRegexp("(.*)"),
201
},
202
},
203
},
204
expectedEntries: []expectedEntry{
205
{
206
line: `[GIN] 2022/06/13 - 14:52:23 | 200 | 1.428101ms | 181.167.87.140 | GET "/"
207
`,
208
labels: model.LabelSet{
209
"host": "host",
210
"app": "app",
211
"procID": "web.1",
212
},
213
},
214
},
215
},
216
"heroku request with a single log line and query parameters, with internal labels relabeled, and fixed labels": {
217
args: args{
218
RequestBodies: []string{testLogLine1},
219
RequestParams: map[string][]string{
220
"some_query_param": {"app_123", "app_456"},
221
},
222
Labels: model.LabelSet{
223
"job": "relabeling_job",
224
},
225
RelabelConfigs: []*relabel.Config{
226
{
227
SourceLabels: model.LabelNames{"__heroku_drain_host"},
228
TargetLabel: "host",
229
Replacement: "$1",
230
Action: relabel.Replace,
231
Regex: relabel.MustNewRegexp("(.*)"),
232
},
233
{
234
SourceLabels: model.LabelNames{"__heroku_drain_app"},
235
TargetLabel: "app",
236
Replacement: "$1",
237
Action: relabel.Replace,
238
Regex: relabel.MustNewRegexp("(.*)"),
239
},
240
{
241
SourceLabels: model.LabelNames{"__heroku_drain_proc"},
242
TargetLabel: "procID",
243
Replacement: "$1",
244
Action: relabel.Replace,
245
Regex: relabel.MustNewRegexp("(.*)"),
246
},
247
{
248
SourceLabels: model.LabelNames{"__heroku_drain_param_some_query_param"},
249
TargetLabel: "query_param",
250
Replacement: "$1",
251
Action: relabel.Replace,
252
Regex: relabel.MustNewRegexp("(.*)"),
253
},
254
},
255
},
256
expectedEntries: []expectedEntry{
257
{
258
line: `[GIN] 2022/06/13 - 14:52:23 | 200 | 1.428101ms | 181.167.87.140 | GET "/"
259
`,
260
labels: model.LabelSet{
261
"host": "host",
262
"app": "app",
263
"procID": "web.1",
264
"query_param": "app_123,app_456",
265
},
266
},
267
},
268
},
269
}
270
for name, tc := range cases {
271
t.Run(name, func(t *testing.T) {
272
// Create fake promtail client
273
eh := fake.NewClient(func() {})
274
defer eh.Stop()
275
276
serverConfig, port, err := getServerConfigWithAvailablePort()
277
require.NoError(t, err, "error generating server config or finding open port")
278
config := &HerokuDrainTargetConfig{
279
Server: serverConfig,
280
Labels: tc.args.Labels,
281
UseIncomingTimestamp: false,
282
}
283
284
prometheus.DefaultRegisterer = prometheus.NewRegistry()
285
metrics := NewMetrics(prometheus.DefaultRegisterer)
286
pt, err := NewHerokuTarget(metrics, logger, eh, tc.args.RelabelConfigs, config, prometheus.DefaultRegisterer)
287
require.NoError(t, err)
288
defer func() {
289
_ = pt.Stop()
290
}()
291
292
// Clear received lines after test case is ran
293
defer eh.Clear()
294
295
// Send some logs
296
ts := time.Now()
297
298
req, err := makeDrainRequest(fmt.Sprintf("http://%s:%d", localhost, port), tc.args.RequestParams, tc.args.RequestBodies...)
299
require.NoError(t, err, "expected test drain request to be successfully created")
300
res, err := http.DefaultClient.Do(req)
301
require.NoError(t, err)
302
require.Equal(t, http.StatusNoContent, res.StatusCode, "expected no-content status code")
303
304
waitForMessages(eh)
305
306
// Make sure we didn't time out
307
require.Equal(t, len(tc.args.RequestBodies), len(eh.Received()))
308
309
require.Equal(t, len(eh.Received()), len(tc.expectedEntries), "expected to receive equal amount of expected label sets")
310
for i, expectedEntry := range tc.expectedEntries {
311
// TODO: Add assertion over propagated timestamp
312
actualEntry := eh.Received()[i]
313
314
require.Equal(t, expectedEntry.line, actualEntry.Line, "expected line to be equal for %d-th entry", i)
315
316
expectedLS := expectedEntry.labels
317
actualLS := actualEntry.Labels
318
for label, value := range expectedLS {
319
require.Equal(t, expectedLS[label], actualLS[label], "expected label %s to be equal to %s in %d-th entry", label, value, i)
320
}
321
322
// Timestamp is always set in the handler, we expect received timestamps to be slightly higher than the timestamp when we started sending logs.
323
require.GreaterOrEqual(t, actualEntry.Timestamp.Unix(), ts.Unix(), "expected %d-th entry to have a received timestamp greater than publish time", i)
324
}
325
})
326
}
327
}
328
329
func TestHerokuDrainTarget_UseIncomingTimestamp(t *testing.T) {
330
w := log.NewSyncWriter(os.Stderr)
331
logger := log.NewLogfmtLogger(w)
332
333
// Create fake promtail client
334
eh := fake.NewClient(func() {})
335
defer eh.Stop()
336
337
serverConfig, port, err := getServerConfigWithAvailablePort()
338
require.NoError(t, err, "error generating server config or finding open port")
339
config := &HerokuDrainTargetConfig{
340
Server: serverConfig,
341
Labels: nil,
342
UseIncomingTimestamp: true,
343
}
344
345
prometheus.DefaultRegisterer = prometheus.NewRegistry()
346
metrics := NewMetrics(prometheus.DefaultRegisterer)
347
pt, err := NewHerokuTarget(metrics, logger, eh, nil, config, prometheus.DefaultRegisterer)
348
require.NoError(t, err)
349
defer func() {
350
_ = pt.Stop()
351
}()
352
353
// Clear received lines after test case is ran
354
defer eh.Clear()
355
356
req, err := makeDrainRequest(fmt.Sprintf("http://%s:%d", localhost, port), make(map[string][]string), testLogLine1)
357
require.NoError(t, err, "expected test drain request to be successfully created")
358
res, err := http.DefaultClient.Do(req)
359
require.NoError(t, err)
360
require.Equal(t, http.StatusNoContent, res.StatusCode, "expected no-content status code")
361
362
waitForMessages(eh)
363
364
// Make sure we didn't time out
365
require.Equal(t, 1, len(eh.Received()))
366
367
expectedTs, err := time.Parse(time.RFC3339Nano, testLogLine1Timestamp)
368
require.NoError(t, err, "expected expected timestamp to be parse correctly")
369
require.Equal(t, expectedTs, eh.Received()[0].Timestamp, "expected entry timestamp to be overridden by received one")
370
}
371
372
func TestHerokuDrainTarget_UseTenantIDHeaderIfPresent(t *testing.T) {
373
w := log.NewSyncWriter(os.Stderr)
374
logger := log.NewLogfmtLogger(w)
375
376
// Create fake promtail client
377
eh := fake.NewClient(func() {})
378
defer eh.Stop()
379
380
serverConfig, port, err := getServerConfigWithAvailablePort()
381
require.NoError(t, err, "error generating server config or finding open port")
382
config := &HerokuDrainTargetConfig{
383
Server: serverConfig,
384
Labels: nil,
385
UseIncomingTimestamp: true,
386
}
387
388
prometheus.DefaultRegisterer = prometheus.NewRegistry()
389
metrics := NewMetrics(prometheus.DefaultRegisterer)
390
tenantIDRelabelConfig := []*relabel.Config{
391
{
392
SourceLabels: model.LabelNames{"__tenant_id__"},
393
TargetLabel: "tenant_id",
394
Replacement: "$1",
395
Action: relabel.Replace,
396
Regex: relabel.MustNewRegexp("(.*)"),
397
},
398
}
399
pt, err := NewHerokuTarget(metrics, logger, eh, tenantIDRelabelConfig, config, prometheus.DefaultRegisterer)
400
require.NoError(t, err)
401
defer func() {
402
_ = pt.Stop()
403
}()
404
405
// Clear received lines after test case is ran
406
defer eh.Clear()
407
408
req, err := makeDrainRequest(fmt.Sprintf("http://%s:%d", localhost, port), make(map[string][]string), testLogLine1)
409
require.NoError(t, err, "expected test drain request to be successfully created")
410
req.Header.Set("X-Scope-OrgID", "42")
411
res, err := http.DefaultClient.Do(req)
412
require.NoError(t, err)
413
require.Equal(t, http.StatusNoContent, res.StatusCode, "expected no-content status code")
414
415
waitForMessages(eh)
416
417
// Make sure we didn't time out
418
require.Equal(t, 1, len(eh.Received()))
419
420
require.Equal(t, model.LabelValue("42"), eh.Received()[0].Labels[ReservedLabelTenantID])
421
require.Equal(t, model.LabelValue("42"), eh.Received()[0].Labels["tenant_id"])
422
}
423
424
func waitForMessages(eh *fake.Client) {
425
countdown := 1000
426
for len(eh.Received()) != 1 && countdown > 0 {
427
time.Sleep(1 * time.Millisecond)
428
countdown--
429
}
430
}
431
432
func getServerConfigWithAvailablePort() (cfg *fnet.ServerConfig, port int, err error) {
433
// Get a randomly available port by open and closing a TCP socket
434
addr, err := net.ResolveTCPAddr("tcp", localhost+":0")
435
if err != nil {
436
return
437
}
438
l, err := net.ListenTCP("tcp", addr)
439
if err != nil {
440
return
441
}
442
port = l.Addr().(*net.TCPAddr).Port
443
err = l.Close()
444
if err != nil {
445
return
446
}
447
448
cfg = &fnet.ServerConfig{
449
HTTP: &fnet.HTTPConfig{
450
ListenAddress: localhost,
451
ListenPort: port,
452
},
453
// assign random grpc port
454
GRPC: &fnet.GRPCConfig{ListenPort: 0},
455
}
456
457
return
458
}
459
460