Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/syslog/syslog.go
4096 views
1
package syslog
2
3
import (
4
"context"
5
"reflect"
6
"sync"
7
8
"github.com/go-kit/log/level"
9
"github.com/grafana/agent/component"
10
"github.com/grafana/agent/component/common/loki"
11
flow_relabel "github.com/grafana/agent/component/common/relabel"
12
st "github.com/grafana/agent/component/loki/source/syslog/internal/syslogtarget"
13
"github.com/prometheus/prometheus/model/relabel"
14
)
15
16
func init() {
17
component.Register(component.Registration{
18
Name: "loki.source.syslog",
19
Args: Arguments{},
20
21
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
22
return New(opts, args.(Arguments))
23
},
24
})
25
}
26
27
// Arguments holds values which are used to configure the loki.source.syslog
28
// component.
29
type Arguments struct {
30
SyslogListeners []ListenerConfig `river:"listener,block"`
31
ForwardTo []loki.LogsReceiver `river:"forward_to,attr"`
32
RelabelRules flow_relabel.Rules `river:"relabel_rules,attr,optional"`
33
}
34
35
// Component implements the loki.source.syslog component.
36
type Component struct {
37
opts component.Options
38
metrics *st.Metrics
39
40
mut sync.RWMutex
41
args Arguments
42
fanout []loki.LogsReceiver
43
targets []*st.SyslogTarget
44
45
handler loki.LogsReceiver
46
}
47
48
// New creates a new loki.source.syslog component.
49
func New(o component.Options, args Arguments) (*Component, error) {
50
c := &Component{
51
opts: o,
52
metrics: st.NewMetrics(o.Registerer),
53
handler: make(loki.LogsReceiver),
54
fanout: args.ForwardTo,
55
56
targets: []*st.SyslogTarget{},
57
}
58
59
// Call to Update() to start readers and set receivers once at the start.
60
if err := c.Update(args); err != nil {
61
return nil, err
62
}
63
64
return c, nil
65
}
66
67
// Run implements component.Component.
68
func (c *Component) Run(ctx context.Context) error {
69
defer func() {
70
level.Info(c.opts.Logger).Log("msg", "loki.source.syslog component shutting down, stopping listeners")
71
for _, l := range c.targets {
72
err := l.Stop()
73
if err != nil {
74
level.Error(c.opts.Logger).Log("msg", "error while stopping syslog listener", "err", err)
75
}
76
}
77
}()
78
79
for {
80
select {
81
case <-ctx.Done():
82
return nil
83
case entry := <-c.handler:
84
c.mut.RLock()
85
for _, receiver := range c.fanout {
86
receiver <- entry
87
}
88
c.mut.RUnlock()
89
}
90
}
91
}
92
93
// Update implements component.Component.
94
func (c *Component) Update(args component.Arguments) error {
95
c.mut.Lock()
96
defer c.mut.Unlock()
97
98
newArgs := args.(Arguments)
99
c.fanout = newArgs.ForwardTo
100
101
var rcs []*relabel.Config
102
if newArgs.RelabelRules != nil && len(newArgs.RelabelRules) > 0 {
103
rcs = flow_relabel.ComponentToPromRelabelConfigs(newArgs.RelabelRules)
104
}
105
106
if listenersChanged(c.args.SyslogListeners, newArgs.SyslogListeners) || relabelRulesChanged(c.args.RelabelRules, newArgs.RelabelRules) {
107
for _, l := range c.targets {
108
err := l.Stop()
109
if err != nil {
110
level.Error(c.opts.Logger).Log("msg", "error while stopping syslog listener", "err", err)
111
}
112
}
113
c.targets = make([]*st.SyslogTarget, 0)
114
entryHandler := loki.NewEntryHandler(c.handler, func() {})
115
116
for _, cfg := range newArgs.SyslogListeners {
117
t, err := st.NewSyslogTarget(c.metrics, c.opts.Logger, entryHandler, rcs, cfg.Convert())
118
if err != nil {
119
level.Error(c.opts.Logger).Log("msg", "failed to create syslog listener with provided config", "err", err)
120
continue
121
}
122
c.targets = append(c.targets, t)
123
}
124
125
c.args = newArgs
126
}
127
128
return nil
129
}
130
131
// DebugInfo returns information about the status of listeners.
132
func (c *Component) DebugInfo() interface{} {
133
var res readerDebugInfo
134
135
for _, t := range c.targets {
136
res.ListenersInfo = append(res.ListenersInfo, listenerInfo{
137
Type: string(t.Type()),
138
Ready: t.Ready(),
139
ListenAddress: t.ListenAddress().String(),
140
Labels: t.Labels().String(),
141
})
142
}
143
return res
144
}
145
146
type readerDebugInfo struct {
147
ListenersInfo []listenerInfo `river:"listeners_info,attr"`
148
}
149
150
type listenerInfo struct {
151
Type string `river:"type,attr"`
152
Ready bool `river:"ready,attr"`
153
ListenAddress string `river:"listen_address,attr"`
154
Labels string `river:"labels,attr"`
155
}
156
157
func listenersChanged(prev, next []ListenerConfig) bool {
158
return !reflect.DeepEqual(prev, next)
159
}
160
func relabelRulesChanged(prev, next flow_relabel.Rules) bool {
161
return !reflect.DeepEqual(prev, next)
162
}
163
164