Path: blob/main/component/loki/source/syslog/internal/syslogtarget/syslogtarget.go
4097 views
package syslogtarget12// This code is copied from Promtail. The syslogtarget package is used to3// configure and run the targets that can read syslog entries and forward them4// to other loki components.56import (7"errors"8"fmt"9"net"10"strings"11"time"1213"github.com/go-kit/log"14"github.com/go-kit/log/level"15"github.com/influxdata/go-syslog/v3"16"github.com/influxdata/go-syslog/v3/rfc5424"17"github.com/prometheus/common/model"18"github.com/prometheus/prometheus/model/labels"19"github.com/prometheus/prometheus/model/relabel"2021"github.com/grafana/agent/component/common/loki"22"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"23"github.com/grafana/loki/clients/pkg/promtail/targets/target"2425"github.com/grafana/loki/pkg/logproto"26)2728var (29DefaultIdleTimeout = 120 * time.Second30DefaultMaxMessageLength = 819231DefaultProtocol = protocolTCP32)3334// SyslogTarget listens to syslog messages.35// nolint:revive36type SyslogTarget struct {37metrics *Metrics38logger log.Logger39handler loki.EntryHandler40config *scrapeconfig.SyslogTargetConfig41relabelConfig []*relabel.Config4243transport Transport4445messages chan message46messagesDone chan struct{}47}4849type message struct {50labels model.LabelSet51message string52timestamp time.Time53}5455// NewSyslogTarget configures a new SyslogTarget.56func NewSyslogTarget(57metrics *Metrics,58logger log.Logger,59handler loki.EntryHandler,60relabel []*relabel.Config,61config *scrapeconfig.SyslogTargetConfig,62) (*SyslogTarget, error) {6364t := &SyslogTarget{65metrics: metrics,66logger: logger,67handler: handler,68config: config,69relabelConfig: relabel,70messagesDone: make(chan struct{}),71}7273switch t.transportProtocol() {74case protocolTCP:75t.transport = NewSyslogTCPTransport(76config,77t.handleMessage,78t.handleMessageError,79logger,80)81case protocolUDP:82t.transport = NewSyslogUDPTransport(83config,84t.handleMessage,85t.handleMessageError,86logger,87)88default:89return nil, fmt.Errorf("invalid transport protocol. expected 'tcp' or 'udp', got '%s'", t.transportProtocol())90}9192t.messages = make(chan message)93go t.messageSender(handler.Chan())9495err := t.transport.Run()96if err != nil {97return nil, err98}99return t, nil100}101102func (t *SyslogTarget) handleMessageError(err error) {103var ne net.Error104if errors.As(err, &ne) && ne.Timeout() {105level.Debug(t.logger).Log("msg", "connection timed out", "err", ne)106return107}108level.Warn(t.logger).Log("msg", "error parsing syslog stream", "err", err)109t.metrics.syslogParsingErrors.Inc()110}111112func (t *SyslogTarget) handleMessage(connLabels labels.Labels, msg syslog.Message) {113rfc5424Msg := msg.(*rfc5424.SyslogMessage)114115if rfc5424Msg.Message == nil {116t.metrics.syslogEmptyMessages.Inc()117return118}119120lb := labels.NewBuilder(connLabels)121if v := rfc5424Msg.SeverityLevel(); v != nil {122lb.Set("__syslog_message_severity", *v)123}124if v := rfc5424Msg.FacilityLevel(); v != nil {125lb.Set("__syslog_message_facility", *v)126}127if v := rfc5424Msg.Hostname; v != nil {128lb.Set("__syslog_message_hostname", *v)129}130if v := rfc5424Msg.Appname; v != nil {131lb.Set("__syslog_message_app_name", *v)132}133if v := rfc5424Msg.ProcID; v != nil {134lb.Set("__syslog_message_proc_id", *v)135}136if v := rfc5424Msg.MsgID; v != nil {137lb.Set("__syslog_message_msg_id", *v)138}139140if t.config.LabelStructuredData && rfc5424Msg.StructuredData != nil {141for id, params := range *rfc5424Msg.StructuredData {142id = strings.ReplaceAll(id, "@", "_")143for name, value := range params {144key := "__syslog_message_sd_" + id + "_" + name145lb.Set(key, value)146}147}148}149150processed, _ := relabel.Process(lb.Labels(nil), t.relabelConfig...)151152filtered := make(model.LabelSet)153for _, lbl := range processed {154if strings.HasPrefix(lbl.Name, "__") {155continue156}157filtered[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)158}159160var timestamp time.Time161if t.config.UseIncomingTimestamp && rfc5424Msg.Timestamp != nil {162timestamp = *rfc5424Msg.Timestamp163} else {164timestamp = time.Now()165}166167m := *rfc5424Msg.Message168if t.config.UseRFC5424Message {169fullMsg, err := rfc5424Msg.String()170if err != nil {171level.Debug(t.logger).Log("msg", "failed to convert rfc5424 message to string; using message field instead", "err", err)172} else {173m = fullMsg174}175}176t.messages <- message{filtered, m, timestamp}177}178179func (t *SyslogTarget) messageSender(entries chan<- loki.Entry) {180for msg := range t.messages {181entries <- loki.Entry{182Labels: msg.labels,183Entry: logproto.Entry{184Timestamp: msg.timestamp,185Line: msg.message,186},187}188t.metrics.syslogEntries.Inc()189}190t.messagesDone <- struct{}{}191}192193// Type returns SyslogTargetType.194func (t *SyslogTarget) Type() target.TargetType {195return target.SyslogTargetType196}197198// Ready indicates whether or not the syslog target is ready to be read from.199func (t *SyslogTarget) Ready() bool {200return t.transport.Ready()201}202203// DiscoveredLabels returns the set of labels discovered by the syslog target, which204// is always nil. Implements Target.205func (t *SyslogTarget) DiscoveredLabels() model.LabelSet {206return nil207}208209// Labels returns the set of labels that statically apply to all log entries210// produced by the SyslogTarget.211func (t *SyslogTarget) Labels() model.LabelSet {212return t.config.Labels213}214215// Details returns target-specific details.216func (t *SyslogTarget) Details() interface{} {217return map[string]string{}218}219220// Stop shuts down the SyslogTarget.221func (t *SyslogTarget) Stop() error {222err := t.transport.Close()223t.transport.Wait()224close(t.messages)225// wait for all pending messages to be processed and sent to handler226<-t.messagesDone227t.handler.Stop()228return err229}230231// ListenAddress returns the address SyslogTarget is listening on.232func (t *SyslogTarget) ListenAddress() net.Addr {233return t.transport.Addr()234}235236func (t *SyslogTarget) transportProtocol() string {237if t.config.ListenProtocol != "" {238return t.config.ListenProtocol239}240return DefaultProtocol241}242243244