Path: blob/main/component/loki/source/syslog/syslog_test.go
4096 views
package syslog12import (3"context"4"fmt"5"io"6"net"7"testing"8"time"910"github.com/grafana/agent/component"11"github.com/grafana/agent/component/common/loki"12flow_relabel "github.com/grafana/agent/component/common/relabel"13"github.com/grafana/agent/pkg/util"14"github.com/grafana/regexp"15"github.com/phayes/freeport"16"github.com/prometheus/client_golang/prometheus"17"github.com/prometheus/common/model"18"github.com/stretchr/testify/require"19)2021func Test(t *testing.T) {22opts := component.Options{23Logger: util.TestFlowLogger(t),24Registerer: prometheus.NewRegistry(),25OnStateChange: func(e component.Exports) {},26}2728ch1, ch2 := make(chan loki.Entry), make(chan loki.Entry)29args := Arguments{}30tcpListenerAddr, udpListenerAddr := getFreeAddr(t), getFreeAddr(t)3132args.SyslogListeners = []ListenerConfig{33{34ListenAddress: tcpListenerAddr,35ListenProtocol: "tcp",36Labels: map[string]string{"protocol": "tcp"},37},38{39ListenAddress: udpListenerAddr,40ListenProtocol: "udp",41Labels: map[string]string{"protocol": "udp"},42},43}44args.ForwardTo = []loki.LogsReceiver{ch1, ch2}4546// Create and run the component.47c, err := New(opts, args)48require.NoError(t, err)4950go c.Run(context.Background())51time.Sleep(200 * time.Millisecond)5253// Create and send a Syslog message over TCP to the first listener.54msg := `<165>1 2023-01-05T09:13:17.001Z host1 app - id1 [exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"][examplePriority@32473 class="high"] An application event log entry...`55con, err := net.Dial("tcp", tcpListenerAddr)56require.NoError(t, err)57writeMessageToStream(con, msg, fmtNewline)58err = con.Close()59require.NoError(t, err)6061wantLabelSet := model.LabelSet{"protocol": "tcp"}6263for i := 0; i < 2; i++ {64select {65case logEntry := <-ch1:66require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)67require.Equal(t, "An application event log entry...", logEntry.Line)68require.Equal(t, wantLabelSet, logEntry.Labels)69case logEntry := <-ch2:70require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)71require.Equal(t, "An application event log entry...", logEntry.Line)72require.Equal(t, wantLabelSet, logEntry.Labels)73case <-time.After(5 * time.Second):74require.FailNow(t, "failed waiting for log line")75}76}7778// Send a Syslog message over UDP to the second listener.79con, err = net.Dial("udp", udpListenerAddr)80require.NoError(t, err)81writeMessageToStream(con, msg, fmtOctetCounting)82err = con.Close()83require.NoError(t, err)8485wantLabelSet = model.LabelSet{"protocol": "udp"}8687for i := 0; i < 2; i++ {88select {89case logEntry := <-ch1:90require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)91require.Equal(t, "An application event log entry...", logEntry.Line)92require.Equal(t, wantLabelSet, logEntry.Labels)93case logEntry := <-ch2:94require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)95require.Equal(t, "An application event log entry...", logEntry.Line)96require.Equal(t, wantLabelSet, logEntry.Labels)97case <-time.After(5 * time.Second):98require.FailNow(t, "failed waiting for log line")99}100}101}102103func TestWithRelabelRules(t *testing.T) {104opts := component.Options{105Logger: util.TestFlowLogger(t),106Registerer: prometheus.NewRegistry(),107OnStateChange: func(e component.Exports) {},108}109110ch1 := make(chan loki.Entry)111args := Arguments{}112tcpListenerAddr := getFreeAddr(t)113114args.SyslogListeners = []ListenerConfig{115{116ListenAddress: tcpListenerAddr,117Labels: map[string]string{"protocol": "tcp"},118},119}120args.ForwardTo = []loki.LogsReceiver{ch1}121122// Create a handler which will be used to retrieve relabeling rules.123args.RelabelRules = []*flow_relabel.Config{124{125SourceLabels: []string{"__name__"},126Regex: mustNewRegexp("__syslog_(.*)"),127Action: flow_relabel.LabelMap,128Replacement: "syslog_${1}",129},130{131Regex: mustNewRegexp("syslog_connection_hostname"),132Action: flow_relabel.LabelDrop,133},134}135136// Create and run the component.137c, err := New(opts, args)138require.NoError(t, err)139140go c.Run(context.Background())141time.Sleep(200 * time.Millisecond)142143// Create and send a Syslog message over TCP to the first listener.144msg := `<165>1 2023-01-05T09:13:17.001Z host1 app - id1 [exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"][examplePriority@32473 class="high"] An application event log entry...`145con, err := net.Dial("tcp", tcpListenerAddr)146require.NoError(t, err)147writeMessageToStream(con, msg, fmtNewline)148err = con.Close()149require.NoError(t, err)150151// The entry should've had the relabeling rules applied to it.152wantLabelSet := model.LabelSet{153"protocol": "tcp",154"syslog_connection_ip_address": "127.0.0.1",155"syslog_message_app_name": "app",156"syslog_message_facility": "local4",157"syslog_message_hostname": "host1",158"syslog_message_msg_id": "id1",159"syslog_message_severity": "notice",160}161162select {163case logEntry := <-ch1:164require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)165require.Equal(t, "An application event log entry...", logEntry.Line)166require.Equal(t, wantLabelSet, logEntry.Labels)167case <-time.After(5 * time.Second):168require.FailNow(t, "failed waiting for log line")169}170}171172func getFreeAddr(t *testing.T) string {173t.Helper()174175portNumber, err := freeport.GetFreePort()176require.NoError(t, err)177178return fmt.Sprintf("127.0.0.1:%d", portNumber)179}180181func writeMessageToStream(w io.Writer, msg string, formatter formatFunc) error {182_, err := fmt.Fprint(w, formatter(msg))183if err != nil {184return err185}186return nil187}188189type formatFunc func(string) string190191var (192fmtOctetCounting = func(s string) string { return fmt.Sprintf("%d %s", len(s), s) }193fmtNewline = func(s string) string { return s + "\n" }194)195196func mustNewRegexp(s string) flow_relabel.Regexp {197re, err := regexp.Compile("^(?:" + s + ")$")198if err != nil {199panic(err)200}201return flow_relabel.Regexp{Regexp: re}202}203204205