Path: blob/main/component/loki/source/syslog/internal/syslogtarget/transport.go
4097 views
package syslogtarget12// This code is copied from Promtail. The syslogtarget package is used to3// configure and run the targets that can read syslog entries and forward them4// to other loki components.56import (7"context"8"crypto/tls"9"crypto/x509"10"fmt"11"io"12"net"13"os"14"strings"15"sync"16"time"1718"github.com/grafana/dskit/backoff"19"github.com/mwitkow/go-conntrack"2021"github.com/go-kit/log"22"github.com/go-kit/log/level"23"github.com/influxdata/go-syslog/v3"24"github.com/prometheus/common/config"25"github.com/prometheus/prometheus/model/labels"2627"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"28"github.com/grafana/loki/clients/pkg/promtail/targets/syslog/syslogparser"29)3031var (32protocolUDP = "udp"33protocolTCP = "tcp"34)3536type Transport interface {37Run() error38Addr() net.Addr39Ready() bool40Close() error41Wait()42}4344type handleMessage func(labels.Labels, syslog.Message)45type handleMessageError func(error)4647type baseTransport struct {48config *scrapeconfig.SyslogTargetConfig49logger log.Logger5051openConnections *sync.WaitGroup5253handleMessage handleMessage54handleMessageError handleMessageError5556ctx context.Context57ctxCancel context.CancelFunc58}5960func (t *baseTransport) close() {61t.ctxCancel()62}6364// Ready implements SyslogTransport65func (t *baseTransport) Ready() bool {66return t.ctx.Err() == nil67}6869func (t *baseTransport) idleTimeout() time.Duration {70if t.config.IdleTimeout != 0 {71return t.config.IdleTimeout72}73return DefaultIdleTimeout74}7576func (t *baseTransport) maxMessageLength() int {77if t.config.MaxMessageLength != 0 {78return t.config.MaxMessageLength79}80return DefaultMaxMessageLength81}8283func (t *baseTransport) connectionLabels(ip string) labels.Labels {84lb := labels.NewBuilder(nil)85for k, v := range t.config.Labels {86lb.Set(string(k), string(v))87}8889lb.Set("__syslog_connection_ip_address", ip)90lb.Set("__syslog_connection_hostname", lookupAddr(ip))9192return lb.Labels(nil)93}9495func ipFromConn(c net.Conn) net.IP {96switch addr := c.RemoteAddr().(type) {97case *net.TCPAddr:98return addr.IP99}100101return nil102}103104func lookupAddr(addr string) string {105names, _ := net.LookupAddr(addr)106return strings.Join(names, ",")107}108109func newBaseTransport(config *scrapeconfig.SyslogTargetConfig, handleMessage handleMessage, handleError handleMessageError, logger log.Logger) *baseTransport {110ctx, cancel := context.WithCancel(context.Background())111return &baseTransport{112config: config,113logger: logger,114openConnections: new(sync.WaitGroup),115handleMessage: handleMessage,116handleMessageError: handleError,117ctx: ctx,118ctxCancel: cancel,119}120}121122type idleTimeoutConn struct {123net.Conn124idleTimeout time.Duration125}126127func (c *idleTimeoutConn) Write(p []byte) (int, error) {128c.setDeadline()129return c.Conn.Write(p)130}131132func (c *idleTimeoutConn) Read(b []byte) (int, error) {133c.setDeadline()134return c.Conn.Read(b)135}136137func (c *idleTimeoutConn) setDeadline() {138_ = c.Conn.SetDeadline(time.Now().Add(c.idleTimeout))139}140141type ConnPipe struct {142addr net.Addr143*io.PipeReader144*io.PipeWriter145}146147func NewConnPipe(addr net.Addr) *ConnPipe {148pr, pw := io.Pipe()149return &ConnPipe{150addr: addr,151PipeReader: pr,152PipeWriter: pw,153}154}155156func (pipe *ConnPipe) Close() error {157if err := pipe.PipeWriter.Close(); err != nil {158return err159}160return nil161}162163type TCPTransport struct {164*baseTransport165listener net.Listener166}167168func NewSyslogTCPTransport(config *scrapeconfig.SyslogTargetConfig, handleMessage handleMessage, handleError handleMessageError, logger log.Logger) Transport {169return &TCPTransport{170baseTransport: newBaseTransport(config, handleMessage, handleError, logger),171}172}173174// Run implements SyslogTransport175func (t *TCPTransport) Run() error {176l, err := net.Listen(protocolTCP, t.config.ListenAddress)177l = conntrack.NewListener(l, conntrack.TrackWithName("syslog_target/"+t.config.ListenAddress))178if err != nil {179return fmt.Errorf("error setting up syslog target: %w", err)180}181182var (183tlsConfig = t.config.TLSConfig184185configuredCA = len(tlsConfig.CA) > 0 || len(tlsConfig.CAFile) > 0186configuredCert = len(tlsConfig.Cert) > 0 || len(tlsConfig.CertFile) > 0187configuredKey = len(tlsConfig.Key) > 0 || len(tlsConfig.KeyFile) > 0188189tlsEnabled = configuredCA || configuredCert || configuredKey190)191192if tlsEnabled {193tlsConfig, err := newTLSConfig(tlsConfig)194if err != nil {195return fmt.Errorf("error setting up syslog target: %w", err)196}197l = tls.NewListener(l, tlsConfig)198}199200t.listener = l201level.Info(t.logger).Log("msg", "syslog listening on address", "address", t.Addr().String(), "protocol", protocolTCP, "tls", tlsEnabled)202203t.openConnections.Add(1)204go t.acceptConnections()205206return nil207}208209// newTLSConfig creates TLS server settings from a [config.TLSConfig]. Use this210// function to create TLS server settings, and [config.NewTLSConfig] to create211// TLS client settings.212func newTLSConfig(config config.TLSConfig) (*tls.Config, error) {213var (214configuredCert = len(config.Cert) > 0 || len(config.CertFile) > 0215configuredKey = len(config.Key) > 0 || len(config.KeyFile) > 0216)217218if !configuredCert || !configuredKey {219return nil, fmt.Errorf("certificate and key must be configured")220}221222var certBytes, keyBytes []byte223224if len(config.CertFile) > 0 {225bb, err := os.ReadFile(config.CertFile)226if err != nil {227return nil, fmt.Errorf("unable to load server certificate: %w", err)228}229certBytes = bb230} else if len(config.Cert) > 0 {231certBytes = []byte(config.Cert)232}233234if len(config.KeyFile) > 0 {235bb, err := os.ReadFile(config.KeyFile)236if err != nil {237return nil, fmt.Errorf("unable to load server key: %w", err)238}239keyBytes = bb240} else if len(config.Key) > 0 {241keyBytes = []byte(config.Key)242}243244certs, err := tls.X509KeyPair(certBytes, keyBytes)245if err != nil {246return nil, fmt.Errorf("unable to load server certificate or key: %w", err)247}248249tlsConfig := &tls.Config{250Certificates: []tls.Certificate{certs},251}252253var caBytes []byte254255if len(config.CAFile) > 0 {256bb, err := os.ReadFile(config.CAFile)257if err != nil {258return nil, fmt.Errorf("unable to load client CA certificate: %w", err)259}260caBytes = bb261} else if len(config.CA) > 0 {262caBytes = []byte(config.CA)263}264265if len(caBytes) > 0 {266caCertPool := x509.NewCertPool()267if ok := caCertPool.AppendCertsFromPEM(caBytes); !ok {268return nil, fmt.Errorf("unable to parse client CA certificate")269}270271tlsConfig.ClientCAs = caCertPool272tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert273}274275return tlsConfig, nil276}277278func (t *TCPTransport) acceptConnections() {279defer t.openConnections.Done()280281l := log.With(t.logger, "address", t.listener.Addr().String())282283backoff := backoff.New(t.ctx, backoff.Config{284MinBackoff: 5 * time.Millisecond,285MaxBackoff: 1 * time.Second,286})287288for {289c, err := t.listener.Accept()290if err != nil {291if !t.Ready() {292level.Info(l).Log("msg", "syslog server shutting down", "protocol", protocolTCP, "err", t.ctx.Err())293return294}295296if _, ok := err.(net.Error); ok {297level.Warn(l).Log("msg", "failed to accept syslog connection", "err", err, "num_retries", backoff.NumRetries())298backoff.Wait()299continue300}301302level.Error(l).Log("msg", "failed to accept syslog connection. quiting", "err", err)303return304}305backoff.Reset()306307t.openConnections.Add(1)308go t.handleConnection(c)309}310}311312func (t *TCPTransport) handleConnection(cn net.Conn) {313defer t.openConnections.Done()314315c := &idleTimeoutConn{cn, t.idleTimeout()}316317handlerCtx, cancel := context.WithCancel(t.ctx)318defer cancel()319go func() {320<-handlerCtx.Done()321_ = c.Close()322}()323324lbs := t.connectionLabels(ipFromConn(c).String())325326err := syslogparser.ParseStream(c, func(result *syslog.Result) {327if err := result.Error; err != nil {328t.handleMessageError(err)329return330}331t.handleMessage(lbs.Copy(), result.Message)332}, t.maxMessageLength())333334if err != nil {335level.Warn(t.logger).Log("msg", "error initializing syslog stream", "err", err)336}337}338339// Close implements SyslogTransport340func (t *TCPTransport) Close() error {341t.baseTransport.close()342return t.listener.Close()343}344345// Wait implements SyslogTransport346func (t *TCPTransport) Wait() {347t.openConnections.Wait()348}349350// Addr implements SyslogTransport351func (t *TCPTransport) Addr() net.Addr {352return t.listener.Addr()353}354355type UDPTransport struct {356*baseTransport357udpConn *net.UDPConn358}359360func NewSyslogUDPTransport(config *scrapeconfig.SyslogTargetConfig, handleMessage handleMessage, handleError handleMessageError, logger log.Logger) Transport {361return &UDPTransport{362baseTransport: newBaseTransport(config, handleMessage, handleError, logger),363}364}365366// Run implements SyslogTransport367func (t *UDPTransport) Run() error {368var err error369addr, err := net.ResolveUDPAddr(protocolUDP, t.config.ListenAddress)370if err != nil {371return fmt.Errorf("error resolving UDP address: %w", err)372}373t.udpConn, err = net.ListenUDP(protocolUDP, addr)374if err != nil {375return fmt.Errorf("error setting up syslog target: %w", err)376}377_ = t.udpConn.SetReadBuffer(1024 * 1024)378level.Info(t.logger).Log("msg", "syslog listening on address", "address", t.Addr().String(), "protocol", protocolUDP)379380t.openConnections.Add(1)381go t.acceptPackets()382return nil383}384385// Close implements SyslogTransport386func (t *UDPTransport) Close() error {387t.baseTransport.close()388return t.udpConn.Close()389}390391func (t *UDPTransport) acceptPackets() {392defer t.openConnections.Done()393394var (395n int396addr net.Addr397err error398)399streams := make(map[string]*ConnPipe)400buf := make([]byte, t.maxMessageLength())401402for {403if !t.Ready() {404level.Info(t.logger).Log("msg", "syslog server shutting down", "protocol", protocolUDP, "err", t.ctx.Err())405for _, stream := range streams {406if err = stream.Close(); err != nil {407level.Error(t.logger).Log("msg", "failed to close pipe", "err", err)408}409}410return411}412n, addr, err = t.udpConn.ReadFrom(buf)413if n <= 0 && err != nil {414level.Warn(t.logger).Log("msg", "failed to read packets", "addr", addr, "err", err)415continue416}417418stream, ok := streams[addr.String()]419if !ok {420stream = NewConnPipe(addr)421streams[addr.String()] = stream422t.openConnections.Add(1)423go t.handleRcv(stream)424}425if _, err := stream.Write(buf[:n]); err != nil {426level.Warn(t.logger).Log("msg", "failed to write to stream", "addr", addr, "err", err)427}428}429}430431func (t *UDPTransport) handleRcv(c *ConnPipe) {432defer t.openConnections.Done()433434lbs := t.connectionLabels(c.addr.String())435err := syslogparser.ParseStream(c, func(result *syslog.Result) {436if err := result.Error; err != nil {437t.handleMessageError(err)438} else {439t.handleMessage(lbs.Copy(), result.Message)440}441}, t.maxMessageLength())442443if err != nil {444level.Warn(t.logger).Log("msg", "error parsing syslog stream", "err", err)445}446}447448// Wait implements SyslogTransport449func (t *UDPTransport) Wait() {450t.openConnections.Wait()451}452453// Addr implements SyslogTransport454func (t *UDPTransport) Addr() net.Addr {455return t.udpConn.LocalAddr()456}457458459