Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/syslog/internal/syslogtarget/transport.go
4097 views
1
package syslogtarget
2
3
// This code is copied from Promtail. The syslogtarget package is used to
4
// configure and run the targets that can read syslog entries and forward them
5
// to other loki components.
6
7
import (
8
"context"
9
"crypto/tls"
10
"crypto/x509"
11
"fmt"
12
"io"
13
"net"
14
"os"
15
"strings"
16
"sync"
17
"time"
18
19
"github.com/grafana/dskit/backoff"
20
"github.com/mwitkow/go-conntrack"
21
22
"github.com/go-kit/log"
23
"github.com/go-kit/log/level"
24
"github.com/influxdata/go-syslog/v3"
25
"github.com/prometheus/common/config"
26
"github.com/prometheus/prometheus/model/labels"
27
28
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
29
"github.com/grafana/loki/clients/pkg/promtail/targets/syslog/syslogparser"
30
)
31
32
var (
33
protocolUDP = "udp"
34
protocolTCP = "tcp"
35
)
36
37
type Transport interface {
38
Run() error
39
Addr() net.Addr
40
Ready() bool
41
Close() error
42
Wait()
43
}
44
45
type handleMessage func(labels.Labels, syslog.Message)
46
type handleMessageError func(error)
47
48
type baseTransport struct {
49
config *scrapeconfig.SyslogTargetConfig
50
logger log.Logger
51
52
openConnections *sync.WaitGroup
53
54
handleMessage handleMessage
55
handleMessageError handleMessageError
56
57
ctx context.Context
58
ctxCancel context.CancelFunc
59
}
60
61
func (t *baseTransport) close() {
62
t.ctxCancel()
63
}
64
65
// Ready implements SyslogTransport
66
func (t *baseTransport) Ready() bool {
67
return t.ctx.Err() == nil
68
}
69
70
func (t *baseTransport) idleTimeout() time.Duration {
71
if t.config.IdleTimeout != 0 {
72
return t.config.IdleTimeout
73
}
74
return DefaultIdleTimeout
75
}
76
77
func (t *baseTransport) maxMessageLength() int {
78
if t.config.MaxMessageLength != 0 {
79
return t.config.MaxMessageLength
80
}
81
return DefaultMaxMessageLength
82
}
83
84
func (t *baseTransport) connectionLabels(ip string) labels.Labels {
85
lb := labels.NewBuilder(nil)
86
for k, v := range t.config.Labels {
87
lb.Set(string(k), string(v))
88
}
89
90
lb.Set("__syslog_connection_ip_address", ip)
91
lb.Set("__syslog_connection_hostname", lookupAddr(ip))
92
93
return lb.Labels(nil)
94
}
95
96
func ipFromConn(c net.Conn) net.IP {
97
switch addr := c.RemoteAddr().(type) {
98
case *net.TCPAddr:
99
return addr.IP
100
}
101
102
return nil
103
}
104
105
func lookupAddr(addr string) string {
106
names, _ := net.LookupAddr(addr)
107
return strings.Join(names, ",")
108
}
109
110
func newBaseTransport(config *scrapeconfig.SyslogTargetConfig, handleMessage handleMessage, handleError handleMessageError, logger log.Logger) *baseTransport {
111
ctx, cancel := context.WithCancel(context.Background())
112
return &baseTransport{
113
config: config,
114
logger: logger,
115
openConnections: new(sync.WaitGroup),
116
handleMessage: handleMessage,
117
handleMessageError: handleError,
118
ctx: ctx,
119
ctxCancel: cancel,
120
}
121
}
122
123
type idleTimeoutConn struct {
124
net.Conn
125
idleTimeout time.Duration
126
}
127
128
func (c *idleTimeoutConn) Write(p []byte) (int, error) {
129
c.setDeadline()
130
return c.Conn.Write(p)
131
}
132
133
func (c *idleTimeoutConn) Read(b []byte) (int, error) {
134
c.setDeadline()
135
return c.Conn.Read(b)
136
}
137
138
func (c *idleTimeoutConn) setDeadline() {
139
_ = c.Conn.SetDeadline(time.Now().Add(c.idleTimeout))
140
}
141
142
type ConnPipe struct {
143
addr net.Addr
144
*io.PipeReader
145
*io.PipeWriter
146
}
147
148
func NewConnPipe(addr net.Addr) *ConnPipe {
149
pr, pw := io.Pipe()
150
return &ConnPipe{
151
addr: addr,
152
PipeReader: pr,
153
PipeWriter: pw,
154
}
155
}
156
157
func (pipe *ConnPipe) Close() error {
158
if err := pipe.PipeWriter.Close(); err != nil {
159
return err
160
}
161
return nil
162
}
163
164
type TCPTransport struct {
165
*baseTransport
166
listener net.Listener
167
}
168
169
func NewSyslogTCPTransport(config *scrapeconfig.SyslogTargetConfig, handleMessage handleMessage, handleError handleMessageError, logger log.Logger) Transport {
170
return &TCPTransport{
171
baseTransport: newBaseTransport(config, handleMessage, handleError, logger),
172
}
173
}
174
175
// Run implements SyslogTransport
176
func (t *TCPTransport) Run() error {
177
l, err := net.Listen(protocolTCP, t.config.ListenAddress)
178
l = conntrack.NewListener(l, conntrack.TrackWithName("syslog_target/"+t.config.ListenAddress))
179
if err != nil {
180
return fmt.Errorf("error setting up syslog target: %w", err)
181
}
182
183
var (
184
tlsConfig = t.config.TLSConfig
185
186
configuredCA = len(tlsConfig.CA) > 0 || len(tlsConfig.CAFile) > 0
187
configuredCert = len(tlsConfig.Cert) > 0 || len(tlsConfig.CertFile) > 0
188
configuredKey = len(tlsConfig.Key) > 0 || len(tlsConfig.KeyFile) > 0
189
190
tlsEnabled = configuredCA || configuredCert || configuredKey
191
)
192
193
if tlsEnabled {
194
tlsConfig, err := newTLSConfig(tlsConfig)
195
if err != nil {
196
return fmt.Errorf("error setting up syslog target: %w", err)
197
}
198
l = tls.NewListener(l, tlsConfig)
199
}
200
201
t.listener = l
202
level.Info(t.logger).Log("msg", "syslog listening on address", "address", t.Addr().String(), "protocol", protocolTCP, "tls", tlsEnabled)
203
204
t.openConnections.Add(1)
205
go t.acceptConnections()
206
207
return nil
208
}
209
210
// newTLSConfig creates TLS server settings from a [config.TLSConfig]. Use this
211
// function to create TLS server settings, and [config.NewTLSConfig] to create
212
// TLS client settings.
213
func newTLSConfig(config config.TLSConfig) (*tls.Config, error) {
214
var (
215
configuredCert = len(config.Cert) > 0 || len(config.CertFile) > 0
216
configuredKey = len(config.Key) > 0 || len(config.KeyFile) > 0
217
)
218
219
if !configuredCert || !configuredKey {
220
return nil, fmt.Errorf("certificate and key must be configured")
221
}
222
223
var certBytes, keyBytes []byte
224
225
if len(config.CertFile) > 0 {
226
bb, err := os.ReadFile(config.CertFile)
227
if err != nil {
228
return nil, fmt.Errorf("unable to load server certificate: %w", err)
229
}
230
certBytes = bb
231
} else if len(config.Cert) > 0 {
232
certBytes = []byte(config.Cert)
233
}
234
235
if len(config.KeyFile) > 0 {
236
bb, err := os.ReadFile(config.KeyFile)
237
if err != nil {
238
return nil, fmt.Errorf("unable to load server key: %w", err)
239
}
240
keyBytes = bb
241
} else if len(config.Key) > 0 {
242
keyBytes = []byte(config.Key)
243
}
244
245
certs, err := tls.X509KeyPair(certBytes, keyBytes)
246
if err != nil {
247
return nil, fmt.Errorf("unable to load server certificate or key: %w", err)
248
}
249
250
tlsConfig := &tls.Config{
251
Certificates: []tls.Certificate{certs},
252
}
253
254
var caBytes []byte
255
256
if len(config.CAFile) > 0 {
257
bb, err := os.ReadFile(config.CAFile)
258
if err != nil {
259
return nil, fmt.Errorf("unable to load client CA certificate: %w", err)
260
}
261
caBytes = bb
262
} else if len(config.CA) > 0 {
263
caBytes = []byte(config.CA)
264
}
265
266
if len(caBytes) > 0 {
267
caCertPool := x509.NewCertPool()
268
if ok := caCertPool.AppendCertsFromPEM(caBytes); !ok {
269
return nil, fmt.Errorf("unable to parse client CA certificate")
270
}
271
272
tlsConfig.ClientCAs = caCertPool
273
tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert
274
}
275
276
return tlsConfig, nil
277
}
278
279
func (t *TCPTransport) acceptConnections() {
280
defer t.openConnections.Done()
281
282
l := log.With(t.logger, "address", t.listener.Addr().String())
283
284
backoff := backoff.New(t.ctx, backoff.Config{
285
MinBackoff: 5 * time.Millisecond,
286
MaxBackoff: 1 * time.Second,
287
})
288
289
for {
290
c, err := t.listener.Accept()
291
if err != nil {
292
if !t.Ready() {
293
level.Info(l).Log("msg", "syslog server shutting down", "protocol", protocolTCP, "err", t.ctx.Err())
294
return
295
}
296
297
if _, ok := err.(net.Error); ok {
298
level.Warn(l).Log("msg", "failed to accept syslog connection", "err", err, "num_retries", backoff.NumRetries())
299
backoff.Wait()
300
continue
301
}
302
303
level.Error(l).Log("msg", "failed to accept syslog connection. quiting", "err", err)
304
return
305
}
306
backoff.Reset()
307
308
t.openConnections.Add(1)
309
go t.handleConnection(c)
310
}
311
}
312
313
func (t *TCPTransport) handleConnection(cn net.Conn) {
314
defer t.openConnections.Done()
315
316
c := &idleTimeoutConn{cn, t.idleTimeout()}
317
318
handlerCtx, cancel := context.WithCancel(t.ctx)
319
defer cancel()
320
go func() {
321
<-handlerCtx.Done()
322
_ = c.Close()
323
}()
324
325
lbs := t.connectionLabels(ipFromConn(c).String())
326
327
err := syslogparser.ParseStream(c, func(result *syslog.Result) {
328
if err := result.Error; err != nil {
329
t.handleMessageError(err)
330
return
331
}
332
t.handleMessage(lbs.Copy(), result.Message)
333
}, t.maxMessageLength())
334
335
if err != nil {
336
level.Warn(t.logger).Log("msg", "error initializing syslog stream", "err", err)
337
}
338
}
339
340
// Close implements SyslogTransport
341
func (t *TCPTransport) Close() error {
342
t.baseTransport.close()
343
return t.listener.Close()
344
}
345
346
// Wait implements SyslogTransport
347
func (t *TCPTransport) Wait() {
348
t.openConnections.Wait()
349
}
350
351
// Addr implements SyslogTransport
352
func (t *TCPTransport) Addr() net.Addr {
353
return t.listener.Addr()
354
}
355
356
type UDPTransport struct {
357
*baseTransport
358
udpConn *net.UDPConn
359
}
360
361
func NewSyslogUDPTransport(config *scrapeconfig.SyslogTargetConfig, handleMessage handleMessage, handleError handleMessageError, logger log.Logger) Transport {
362
return &UDPTransport{
363
baseTransport: newBaseTransport(config, handleMessage, handleError, logger),
364
}
365
}
366
367
// Run implements SyslogTransport
368
func (t *UDPTransport) Run() error {
369
var err error
370
addr, err := net.ResolveUDPAddr(protocolUDP, t.config.ListenAddress)
371
if err != nil {
372
return fmt.Errorf("error resolving UDP address: %w", err)
373
}
374
t.udpConn, err = net.ListenUDP(protocolUDP, addr)
375
if err != nil {
376
return fmt.Errorf("error setting up syslog target: %w", err)
377
}
378
_ = t.udpConn.SetReadBuffer(1024 * 1024)
379
level.Info(t.logger).Log("msg", "syslog listening on address", "address", t.Addr().String(), "protocol", protocolUDP)
380
381
t.openConnections.Add(1)
382
go t.acceptPackets()
383
return nil
384
}
385
386
// Close implements SyslogTransport
387
func (t *UDPTransport) Close() error {
388
t.baseTransport.close()
389
return t.udpConn.Close()
390
}
391
392
func (t *UDPTransport) acceptPackets() {
393
defer t.openConnections.Done()
394
395
var (
396
n int
397
addr net.Addr
398
err error
399
)
400
streams := make(map[string]*ConnPipe)
401
buf := make([]byte, t.maxMessageLength())
402
403
for {
404
if !t.Ready() {
405
level.Info(t.logger).Log("msg", "syslog server shutting down", "protocol", protocolUDP, "err", t.ctx.Err())
406
for _, stream := range streams {
407
if err = stream.Close(); err != nil {
408
level.Error(t.logger).Log("msg", "failed to close pipe", "err", err)
409
}
410
}
411
return
412
}
413
n, addr, err = t.udpConn.ReadFrom(buf)
414
if n <= 0 && err != nil {
415
level.Warn(t.logger).Log("msg", "failed to read packets", "addr", addr, "err", err)
416
continue
417
}
418
419
stream, ok := streams[addr.String()]
420
if !ok {
421
stream = NewConnPipe(addr)
422
streams[addr.String()] = stream
423
t.openConnections.Add(1)
424
go t.handleRcv(stream)
425
}
426
if _, err := stream.Write(buf[:n]); err != nil {
427
level.Warn(t.logger).Log("msg", "failed to write to stream", "addr", addr, "err", err)
428
}
429
}
430
}
431
432
func (t *UDPTransport) handleRcv(c *ConnPipe) {
433
defer t.openConnections.Done()
434
435
lbs := t.connectionLabels(c.addr.String())
436
err := syslogparser.ParseStream(c, func(result *syslog.Result) {
437
if err := result.Error; err != nil {
438
t.handleMessageError(err)
439
} else {
440
t.handleMessage(lbs.Copy(), result.Message)
441
}
442
}, t.maxMessageLength())
443
444
if err != nil {
445
level.Warn(t.logger).Log("msg", "error parsing syslog stream", "err", err)
446
}
447
}
448
449
// Wait implements SyslogTransport
450
func (t *UDPTransport) Wait() {
451
t.openConnections.Wait()
452
}
453
454
// Addr implements SyslogTransport
455
func (t *UDPTransport) Addr() net.Addr {
456
return t.udpConn.LocalAddr()
457
}
458
459