Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/heroku/heroku_test.go
4096 views
1
package heroku
2
3
import (
4
"context"
5
"fmt"
6
"net/http"
7
"strings"
8
"testing"
9
"time"
10
11
"github.com/grafana/agent/component"
12
"github.com/grafana/agent/component/common/loki"
13
fnet "github.com/grafana/agent/component/common/net"
14
flow_relabel "github.com/grafana/agent/component/common/relabel"
15
"github.com/grafana/agent/component/loki/source/heroku/internal/herokutarget"
16
"github.com/grafana/agent/pkg/util"
17
"github.com/grafana/regexp"
18
"github.com/phayes/freeport"
19
"github.com/prometheus/client_golang/prometheus"
20
"github.com/prometheus/common/model"
21
"github.com/stretchr/testify/require"
22
)
23
24
func TestPush(t *testing.T) {
25
opts := defaultOptions(t)
26
27
ch1, ch2 := make(chan loki.Entry), make(chan loki.Entry)
28
args := testArgsWith(t, func(args *Arguments) {
29
args.ForwardTo = []loki.LogsReceiver{ch1, ch2}
30
args.RelabelRules = rulesExport
31
args.Labels = map[string]string{"foo": "bar"}
32
})
33
// Create and run the component.
34
c, err := New(opts, args)
35
require.NoError(t, err)
36
37
go func() { require.NoError(t, c.Run(context.Background())) }()
38
waitForServerToBeReady(t, c)
39
40
// Create a Heroku Drain Request and send it to the launched server.
41
req, err := http.NewRequest(http.MethodPost, getEndpoint(c.target), strings.NewReader(testPayload))
42
require.NoError(t, err)
43
44
res, err := http.DefaultClient.Do(req)
45
require.NoError(t, err)
46
require.Equal(t, http.StatusNoContent, res.StatusCode)
47
48
// Check the received log entries
49
wantLabelSet := model.LabelSet{"foo": "bar", "host": "host", "app": "heroku", "proc": "router", "log_id": "-"}
50
wantLogLine := "at=info method=GET path=\"/\" host=cryptic-cliffs-27764.herokuapp.com request_id=59da6323-2bc4-4143-8677-cc66ccfb115f fwd=\"181.167.87.140\" dyno=web.1 connect=0ms service=3ms status=200 bytes=6979 protocol=https\n"
51
52
for i := 0; i < 2; i++ {
53
select {
54
case logEntry := <-ch1:
55
require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)
56
require.Equal(t, wantLogLine, logEntry.Line)
57
require.Equal(t, wantLabelSet, logEntry.Labels)
58
case logEntry := <-ch2:
59
require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)
60
require.Equal(t, wantLogLine, logEntry.Line)
61
require.Equal(t, wantLabelSet, logEntry.Labels)
62
case <-time.After(5 * time.Second):
63
require.FailNow(t, "failed waiting for log line")
64
}
65
}
66
}
67
68
func TestUpdate_detectsWhenTargetRequiresARestart(t *testing.T) {
69
httpPort := getFreePort(t)
70
grpcPort := getFreePort(t)
71
tests := []struct {
72
name string
73
args Arguments
74
newArgs Arguments
75
restartRequired bool
76
}{
77
{
78
name: "identical args don't require server restart",
79
args: testArgsWithPorts(httpPort, grpcPort),
80
newArgs: testArgsWithPorts(httpPort, grpcPort),
81
restartRequired: false,
82
},
83
{
84
name: "change in address requires server restart",
85
args: testArgsWithPorts(httpPort, grpcPort),
86
newArgs: testArgsWith(t, func(args *Arguments) {
87
args.Server.HTTP.ListenAddress = "127.0.0.1"
88
args.Server.HTTP.ListenPort = httpPort
89
args.Server.GRPC.ListenPort = grpcPort
90
}),
91
restartRequired: true,
92
},
93
{
94
name: "change in port requires server restart",
95
args: testArgsWithPorts(httpPort, grpcPort),
96
newArgs: testArgsWithPorts(getFreePort(t), grpcPort),
97
restartRequired: true,
98
},
99
{
100
name: "change in forwardTo does not require server restart",
101
args: testArgsWithPorts(httpPort, grpcPort),
102
newArgs: testArgsWith(t, func(args *Arguments) {
103
args.ForwardTo = []loki.LogsReceiver{}
104
args.Server.HTTP.ListenPort = httpPort
105
args.Server.GRPC.ListenPort = grpcPort
106
}),
107
restartRequired: false,
108
},
109
{
110
name: "change in labels requires server restart",
111
args: testArgsWithPorts(httpPort, grpcPort),
112
newArgs: testArgsWith(t, func(args *Arguments) {
113
args.Labels = map[string]string{"some": "label"}
114
args.Server.HTTP.ListenPort = httpPort
115
args.Server.GRPC.ListenPort = grpcPort
116
}),
117
restartRequired: true,
118
},
119
{
120
name: "change in relabel rules requires server restart",
121
args: testArgsWithPorts(httpPort, grpcPort),
122
newArgs: testArgsWith(t, func(args *Arguments) {
123
args.RelabelRules = flow_relabel.Rules{}
124
args.Server.HTTP.ListenPort = httpPort
125
args.Server.GRPC.ListenPort = grpcPort
126
}),
127
restartRequired: true,
128
},
129
{
130
name: "change in use incoming timestamp requires server restart",
131
args: testArgsWithPorts(httpPort, grpcPort),
132
newArgs: testArgsWith(t, func(args *Arguments) {
133
args.UseIncomingTimestamp = !args.UseIncomingTimestamp
134
args.Server.HTTP.ListenPort = httpPort
135
args.Server.GRPC.ListenPort = grpcPort
136
}),
137
restartRequired: true,
138
},
139
}
140
for _, tc := range tests {
141
t.Run(tc.name, func(t *testing.T) {
142
comp, err := New(
143
defaultOptions(t),
144
tc.args,
145
)
146
require.NoError(t, err)
147
defer func() {
148
// in order to cleanly shutdown, we want to make sure the server is running first.
149
waitForServerToBeReady(t, comp)
150
require.NoError(t, comp.target.Stop())
151
}()
152
153
// in order to cleanly update, we want to make sure the server is running first.
154
waitForServerToBeReady(t, comp)
155
156
targetBefore := comp.target
157
err = comp.Update(tc.newArgs)
158
require.NoError(t, err)
159
160
restarted := targetBefore != comp.target
161
require.Equal(t, restarted, tc.restartRequired)
162
})
163
}
164
}
165
166
const testPayload = `270 <158>1 2022-06-13T14:52:23.622778+00:00 host heroku router - at=info method=GET path="/" host=cryptic-cliffs-27764.herokuapp.com request_id=59da6323-2bc4-4143-8677-cc66ccfb115f fwd="181.167.87.140" dyno=web.1 connect=0ms service=3ms status=200 bytes=6979 protocol=https
167
`
168
169
var rulesExport = flow_relabel.Rules{
170
{
171
SourceLabels: []string{"__heroku_drain_host"},
172
Regex: newRegexp(),
173
Action: flow_relabel.Replace,
174
Replacement: "$1",
175
TargetLabel: "host",
176
},
177
{
178
SourceLabels: []string{"__heroku_drain_app"},
179
Regex: newRegexp(),
180
Action: flow_relabel.Replace,
181
Replacement: "$1",
182
TargetLabel: "app",
183
},
184
{
185
SourceLabels: []string{"__heroku_drain_proc"},
186
Regex: newRegexp(),
187
Action: flow_relabel.Replace,
188
Replacement: "$1",
189
TargetLabel: "proc",
190
},
191
{
192
SourceLabels: []string{"__heroku_drain_log_id"},
193
Regex: newRegexp(),
194
Action: flow_relabel.Replace,
195
Replacement: "$1",
196
TargetLabel: "log_id",
197
},
198
}
199
200
func defaultOptions(t *testing.T) component.Options {
201
return component.Options{
202
Logger: util.TestFlowLogger(t),
203
Registerer: prometheus.NewRegistry(),
204
OnStateChange: func(e component.Exports) {},
205
}
206
}
207
208
func testArgsWithPorts(httpPort int, grpcPort int) Arguments {
209
return Arguments{
210
Server: &fnet.ServerConfig{
211
HTTP: &fnet.HTTPConfig{
212
ListenAddress: "localhost",
213
ListenPort: httpPort,
214
},
215
GRPC: &fnet.GRPCConfig{
216
ListenAddress: "localhost",
217
ListenPort: grpcPort,
218
},
219
},
220
ForwardTo: []loki.LogsReceiver{make(chan loki.Entry), make(chan loki.Entry)},
221
Labels: map[string]string{"foo": "bar", "fizz": "buzz"},
222
RelabelRules: flow_relabel.Rules{
223
{
224
SourceLabels: []string{"tag"},
225
Regex: flow_relabel.Regexp{Regexp: regexp.MustCompile("ignore")},
226
Action: flow_relabel.Drop,
227
},
228
},
229
UseIncomingTimestamp: false,
230
}
231
}
232
233
func testArgsWith(t *testing.T, mutator func(arguments *Arguments)) Arguments {
234
a := testArgsWithPorts(getFreePort(t), getFreePort(t))
235
mutator(&a)
236
return a
237
}
238
239
func waitForServerToBeReady(t *testing.T, comp *Component) {
240
require.Eventuallyf(t, func() bool {
241
resp, err := http.Get(fmt.Sprintf(
242
"http://%v/wrong/url",
243
comp.target.HTTPListenAddress(),
244
))
245
return err == nil && resp.StatusCode == 404
246
}, 5*time.Second, 20*time.Millisecond, "server failed to start before timeout")
247
}
248
249
func getFreePort(t *testing.T) int {
250
port, err := freeport.GetFreePort()
251
require.NoError(t, err)
252
return port
253
}
254
255
func newRegexp() flow_relabel.Regexp {
256
re, err := regexp.Compile("^(?:(.*))$")
257
if err != nil {
258
panic(err)
259
}
260
return flow_relabel.Regexp{Regexp: re}
261
}
262
263
func getEndpoint(target *herokutarget.HerokuTarget) string {
264
return fmt.Sprintf("http://%s%s", target.HTTPListenAddress(), target.DrainEndpoint())
265
}
266
267