Path: blob/main/component/loki/source/syslog/syslog.go
4096 views
package syslog12import (3"context"4"reflect"5"sync"67"github.com/go-kit/log/level"8"github.com/grafana/agent/component"9"github.com/grafana/agent/component/common/loki"10flow_relabel "github.com/grafana/agent/component/common/relabel"11st "github.com/grafana/agent/component/loki/source/syslog/internal/syslogtarget"12"github.com/prometheus/prometheus/model/relabel"13)1415func init() {16component.Register(component.Registration{17Name: "loki.source.syslog",18Args: Arguments{},1920Build: func(opts component.Options, args component.Arguments) (component.Component, error) {21return New(opts, args.(Arguments))22},23})24}2526// Arguments holds values which are used to configure the loki.source.syslog27// component.28type Arguments struct {29SyslogListeners []ListenerConfig `river:"listener,block"`30ForwardTo []loki.LogsReceiver `river:"forward_to,attr"`31RelabelRules flow_relabel.Rules `river:"relabel_rules,attr,optional"`32}3334// Component implements the loki.source.syslog component.35type Component struct {36opts component.Options37metrics *st.Metrics3839mut sync.RWMutex40args Arguments41fanout []loki.LogsReceiver42targets []*st.SyslogTarget4344handler loki.LogsReceiver45}4647// New creates a new loki.source.syslog component.48func New(o component.Options, args Arguments) (*Component, error) {49c := &Component{50opts: o,51metrics: st.NewMetrics(o.Registerer),52handler: make(loki.LogsReceiver),53fanout: args.ForwardTo,5455targets: []*st.SyslogTarget{},56}5758// Call to Update() to start readers and set receivers once at the start.59if err := c.Update(args); err != nil {60return nil, err61}6263return c, nil64}6566// Run implements component.Component.67func (c *Component) Run(ctx context.Context) error {68defer func() {69level.Info(c.opts.Logger).Log("msg", "loki.source.syslog component shutting down, stopping listeners")70for _, l := range c.targets {71err := l.Stop()72if err != nil {73level.Error(c.opts.Logger).Log("msg", "error while stopping syslog listener", "err", err)74}75}76}()7778for {79select {80case <-ctx.Done():81return nil82case entry := <-c.handler:83c.mut.RLock()84for _, receiver := range c.fanout {85receiver <- entry86}87c.mut.RUnlock()88}89}90}9192// Update implements component.Component.93func (c *Component) Update(args component.Arguments) error {94c.mut.Lock()95defer c.mut.Unlock()9697newArgs := args.(Arguments)98c.fanout = newArgs.ForwardTo99100var rcs []*relabel.Config101if newArgs.RelabelRules != nil && len(newArgs.RelabelRules) > 0 {102rcs = flow_relabel.ComponentToPromRelabelConfigs(newArgs.RelabelRules)103}104105if listenersChanged(c.args.SyslogListeners, newArgs.SyslogListeners) || relabelRulesChanged(c.args.RelabelRules, newArgs.RelabelRules) {106for _, l := range c.targets {107err := l.Stop()108if err != nil {109level.Error(c.opts.Logger).Log("msg", "error while stopping syslog listener", "err", err)110}111}112c.targets = make([]*st.SyslogTarget, 0)113entryHandler := loki.NewEntryHandler(c.handler, func() {})114115for _, cfg := range newArgs.SyslogListeners {116t, err := st.NewSyslogTarget(c.metrics, c.opts.Logger, entryHandler, rcs, cfg.Convert())117if err != nil {118level.Error(c.opts.Logger).Log("msg", "failed to create syslog listener with provided config", "err", err)119continue120}121c.targets = append(c.targets, t)122}123124c.args = newArgs125}126127return nil128}129130// DebugInfo returns information about the status of listeners.131func (c *Component) DebugInfo() interface{} {132var res readerDebugInfo133134for _, t := range c.targets {135res.ListenersInfo = append(res.ListenersInfo, listenerInfo{136Type: string(t.Type()),137Ready: t.Ready(),138ListenAddress: t.ListenAddress().String(),139Labels: t.Labels().String(),140})141}142return res143}144145type readerDebugInfo struct {146ListenersInfo []listenerInfo `river:"listeners_info,attr"`147}148149type listenerInfo struct {150Type string `river:"type,attr"`151Ready bool `river:"ready,attr"`152ListenAddress string `river:"listen_address,attr"`153Labels string `river:"labels,attr"`154}155156func listenersChanged(prev, next []ListenerConfig) bool {157return !reflect.DeepEqual(prev, next)158}159func relabelRulesChanged(prev, next flow_relabel.Rules) bool {160return !reflect.DeepEqual(prev, next)161}162163164