Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/syslog/syslog_test.go
4096 views
1
package syslog
2
3
import (
4
"context"
5
"fmt"
6
"io"
7
"net"
8
"testing"
9
"time"
10
11
"github.com/grafana/agent/component"
12
"github.com/grafana/agent/component/common/loki"
13
flow_relabel "github.com/grafana/agent/component/common/relabel"
14
"github.com/grafana/agent/pkg/util"
15
"github.com/grafana/regexp"
16
"github.com/phayes/freeport"
17
"github.com/prometheus/client_golang/prometheus"
18
"github.com/prometheus/common/model"
19
"github.com/stretchr/testify/require"
20
)
21
22
func Test(t *testing.T) {
23
opts := component.Options{
24
Logger: util.TestFlowLogger(t),
25
Registerer: prometheus.NewRegistry(),
26
OnStateChange: func(e component.Exports) {},
27
}
28
29
ch1, ch2 := make(chan loki.Entry), make(chan loki.Entry)
30
args := Arguments{}
31
tcpListenerAddr, udpListenerAddr := getFreeAddr(t), getFreeAddr(t)
32
33
args.SyslogListeners = []ListenerConfig{
34
{
35
ListenAddress: tcpListenerAddr,
36
ListenProtocol: "tcp",
37
Labels: map[string]string{"protocol": "tcp"},
38
},
39
{
40
ListenAddress: udpListenerAddr,
41
ListenProtocol: "udp",
42
Labels: map[string]string{"protocol": "udp"},
43
},
44
}
45
args.ForwardTo = []loki.LogsReceiver{ch1, ch2}
46
47
// Create and run the component.
48
c, err := New(opts, args)
49
require.NoError(t, err)
50
51
go c.Run(context.Background())
52
time.Sleep(200 * time.Millisecond)
53
54
// Create and send a Syslog message over TCP to the first listener.
55
msg := `<165>1 2023-01-05T09:13:17.001Z host1 app - id1 [exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"][examplePriority@32473 class="high"] An application event log entry...`
56
con, err := net.Dial("tcp", tcpListenerAddr)
57
require.NoError(t, err)
58
writeMessageToStream(con, msg, fmtNewline)
59
err = con.Close()
60
require.NoError(t, err)
61
62
wantLabelSet := model.LabelSet{"protocol": "tcp"}
63
64
for i := 0; i < 2; i++ {
65
select {
66
case logEntry := <-ch1:
67
require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)
68
require.Equal(t, "An application event log entry...", logEntry.Line)
69
require.Equal(t, wantLabelSet, logEntry.Labels)
70
case logEntry := <-ch2:
71
require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)
72
require.Equal(t, "An application event log entry...", logEntry.Line)
73
require.Equal(t, wantLabelSet, logEntry.Labels)
74
case <-time.After(5 * time.Second):
75
require.FailNow(t, "failed waiting for log line")
76
}
77
}
78
79
// Send a Syslog message over UDP to the second listener.
80
con, err = net.Dial("udp", udpListenerAddr)
81
require.NoError(t, err)
82
writeMessageToStream(con, msg, fmtOctetCounting)
83
err = con.Close()
84
require.NoError(t, err)
85
86
wantLabelSet = model.LabelSet{"protocol": "udp"}
87
88
for i := 0; i < 2; i++ {
89
select {
90
case logEntry := <-ch1:
91
require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)
92
require.Equal(t, "An application event log entry...", logEntry.Line)
93
require.Equal(t, wantLabelSet, logEntry.Labels)
94
case logEntry := <-ch2:
95
require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)
96
require.Equal(t, "An application event log entry...", logEntry.Line)
97
require.Equal(t, wantLabelSet, logEntry.Labels)
98
case <-time.After(5 * time.Second):
99
require.FailNow(t, "failed waiting for log line")
100
}
101
}
102
}
103
104
func TestWithRelabelRules(t *testing.T) {
105
opts := component.Options{
106
Logger: util.TestFlowLogger(t),
107
Registerer: prometheus.NewRegistry(),
108
OnStateChange: func(e component.Exports) {},
109
}
110
111
ch1 := make(chan loki.Entry)
112
args := Arguments{}
113
tcpListenerAddr := getFreeAddr(t)
114
115
args.SyslogListeners = []ListenerConfig{
116
{
117
ListenAddress: tcpListenerAddr,
118
Labels: map[string]string{"protocol": "tcp"},
119
},
120
}
121
args.ForwardTo = []loki.LogsReceiver{ch1}
122
123
// Create a handler which will be used to retrieve relabeling rules.
124
args.RelabelRules = []*flow_relabel.Config{
125
{
126
SourceLabels: []string{"__name__"},
127
Regex: mustNewRegexp("__syslog_(.*)"),
128
Action: flow_relabel.LabelMap,
129
Replacement: "syslog_${1}",
130
},
131
{
132
Regex: mustNewRegexp("syslog_connection_hostname"),
133
Action: flow_relabel.LabelDrop,
134
},
135
}
136
137
// Create and run the component.
138
c, err := New(opts, args)
139
require.NoError(t, err)
140
141
go c.Run(context.Background())
142
time.Sleep(200 * time.Millisecond)
143
144
// Create and send a Syslog message over TCP to the first listener.
145
msg := `<165>1 2023-01-05T09:13:17.001Z host1 app - id1 [exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"][examplePriority@32473 class="high"] An application event log entry...`
146
con, err := net.Dial("tcp", tcpListenerAddr)
147
require.NoError(t, err)
148
writeMessageToStream(con, msg, fmtNewline)
149
err = con.Close()
150
require.NoError(t, err)
151
152
// The entry should've had the relabeling rules applied to it.
153
wantLabelSet := model.LabelSet{
154
"protocol": "tcp",
155
"syslog_connection_ip_address": "127.0.0.1",
156
"syslog_message_app_name": "app",
157
"syslog_message_facility": "local4",
158
"syslog_message_hostname": "host1",
159
"syslog_message_msg_id": "id1",
160
"syslog_message_severity": "notice",
161
}
162
163
select {
164
case logEntry := <-ch1:
165
require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)
166
require.Equal(t, "An application event log entry...", logEntry.Line)
167
require.Equal(t, wantLabelSet, logEntry.Labels)
168
case <-time.After(5 * time.Second):
169
require.FailNow(t, "failed waiting for log line")
170
}
171
}
172
173
func getFreeAddr(t *testing.T) string {
174
t.Helper()
175
176
portNumber, err := freeport.GetFreePort()
177
require.NoError(t, err)
178
179
return fmt.Sprintf("127.0.0.1:%d", portNumber)
180
}
181
182
func writeMessageToStream(w io.Writer, msg string, formatter formatFunc) error {
183
_, err := fmt.Fprint(w, formatter(msg))
184
if err != nil {
185
return err
186
}
187
return nil
188
}
189
190
type formatFunc func(string) string
191
192
var (
193
fmtOctetCounting = func(s string) string { return fmt.Sprintf("%d %s", len(s), s) }
194
fmtNewline = func(s string) string { return s + "\n" }
195
)
196
197
func mustNewRegexp(s string) flow_relabel.Regexp {
198
re, err := regexp.Compile("^(?:" + s + ")$")
199
if err != nil {
200
panic(err)
201
}
202
return flow_relabel.Regexp{Regexp: re}
203
}
204
205