Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/syslog/internal/syslogtarget/syslogtarget.go
4097 views
1
package syslogtarget
2
3
// This code is copied from Promtail. The syslogtarget package is used to
4
// configure and run the targets that can read syslog entries and forward them
5
// to other loki components.
6
7
import (
8
"errors"
9
"fmt"
10
"net"
11
"strings"
12
"time"
13
14
"github.com/go-kit/log"
15
"github.com/go-kit/log/level"
16
"github.com/influxdata/go-syslog/v3"
17
"github.com/influxdata/go-syslog/v3/rfc5424"
18
"github.com/prometheus/common/model"
19
"github.com/prometheus/prometheus/model/labels"
20
"github.com/prometheus/prometheus/model/relabel"
21
22
"github.com/grafana/agent/component/common/loki"
23
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
24
"github.com/grafana/loki/clients/pkg/promtail/targets/target"
25
26
"github.com/grafana/loki/pkg/logproto"
27
)
28
29
var (
30
DefaultIdleTimeout = 120 * time.Second
31
DefaultMaxMessageLength = 8192
32
DefaultProtocol = protocolTCP
33
)
34
35
// SyslogTarget listens to syslog messages.
36
// nolint:revive
37
type SyslogTarget struct {
38
metrics *Metrics
39
logger log.Logger
40
handler loki.EntryHandler
41
config *scrapeconfig.SyslogTargetConfig
42
relabelConfig []*relabel.Config
43
44
transport Transport
45
46
messages chan message
47
messagesDone chan struct{}
48
}
49
50
type message struct {
51
labels model.LabelSet
52
message string
53
timestamp time.Time
54
}
55
56
// NewSyslogTarget configures a new SyslogTarget.
57
func NewSyslogTarget(
58
metrics *Metrics,
59
logger log.Logger,
60
handler loki.EntryHandler,
61
relabel []*relabel.Config,
62
config *scrapeconfig.SyslogTargetConfig,
63
) (*SyslogTarget, error) {
64
65
t := &SyslogTarget{
66
metrics: metrics,
67
logger: logger,
68
handler: handler,
69
config: config,
70
relabelConfig: relabel,
71
messagesDone: make(chan struct{}),
72
}
73
74
switch t.transportProtocol() {
75
case protocolTCP:
76
t.transport = NewSyslogTCPTransport(
77
config,
78
t.handleMessage,
79
t.handleMessageError,
80
logger,
81
)
82
case protocolUDP:
83
t.transport = NewSyslogUDPTransport(
84
config,
85
t.handleMessage,
86
t.handleMessageError,
87
logger,
88
)
89
default:
90
return nil, fmt.Errorf("invalid transport protocol. expected 'tcp' or 'udp', got '%s'", t.transportProtocol())
91
}
92
93
t.messages = make(chan message)
94
go t.messageSender(handler.Chan())
95
96
err := t.transport.Run()
97
if err != nil {
98
return nil, err
99
}
100
return t, nil
101
}
102
103
func (t *SyslogTarget) handleMessageError(err error) {
104
var ne net.Error
105
if errors.As(err, &ne) && ne.Timeout() {
106
level.Debug(t.logger).Log("msg", "connection timed out", "err", ne)
107
return
108
}
109
level.Warn(t.logger).Log("msg", "error parsing syslog stream", "err", err)
110
t.metrics.syslogParsingErrors.Inc()
111
}
112
113
func (t *SyslogTarget) handleMessage(connLabels labels.Labels, msg syslog.Message) {
114
rfc5424Msg := msg.(*rfc5424.SyslogMessage)
115
116
if rfc5424Msg.Message == nil {
117
t.metrics.syslogEmptyMessages.Inc()
118
return
119
}
120
121
lb := labels.NewBuilder(connLabels)
122
if v := rfc5424Msg.SeverityLevel(); v != nil {
123
lb.Set("__syslog_message_severity", *v)
124
}
125
if v := rfc5424Msg.FacilityLevel(); v != nil {
126
lb.Set("__syslog_message_facility", *v)
127
}
128
if v := rfc5424Msg.Hostname; v != nil {
129
lb.Set("__syslog_message_hostname", *v)
130
}
131
if v := rfc5424Msg.Appname; v != nil {
132
lb.Set("__syslog_message_app_name", *v)
133
}
134
if v := rfc5424Msg.ProcID; v != nil {
135
lb.Set("__syslog_message_proc_id", *v)
136
}
137
if v := rfc5424Msg.MsgID; v != nil {
138
lb.Set("__syslog_message_msg_id", *v)
139
}
140
141
if t.config.LabelStructuredData && rfc5424Msg.StructuredData != nil {
142
for id, params := range *rfc5424Msg.StructuredData {
143
id = strings.ReplaceAll(id, "@", "_")
144
for name, value := range params {
145
key := "__syslog_message_sd_" + id + "_" + name
146
lb.Set(key, value)
147
}
148
}
149
}
150
151
processed, _ := relabel.Process(lb.Labels(nil), t.relabelConfig...)
152
153
filtered := make(model.LabelSet)
154
for _, lbl := range processed {
155
if strings.HasPrefix(lbl.Name, "__") {
156
continue
157
}
158
filtered[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
159
}
160
161
var timestamp time.Time
162
if t.config.UseIncomingTimestamp && rfc5424Msg.Timestamp != nil {
163
timestamp = *rfc5424Msg.Timestamp
164
} else {
165
timestamp = time.Now()
166
}
167
168
m := *rfc5424Msg.Message
169
if t.config.UseRFC5424Message {
170
fullMsg, err := rfc5424Msg.String()
171
if err != nil {
172
level.Debug(t.logger).Log("msg", "failed to convert rfc5424 message to string; using message field instead", "err", err)
173
} else {
174
m = fullMsg
175
}
176
}
177
t.messages <- message{filtered, m, timestamp}
178
}
179
180
func (t *SyslogTarget) messageSender(entries chan<- loki.Entry) {
181
for msg := range t.messages {
182
entries <- loki.Entry{
183
Labels: msg.labels,
184
Entry: logproto.Entry{
185
Timestamp: msg.timestamp,
186
Line: msg.message,
187
},
188
}
189
t.metrics.syslogEntries.Inc()
190
}
191
t.messagesDone <- struct{}{}
192
}
193
194
// Type returns SyslogTargetType.
195
func (t *SyslogTarget) Type() target.TargetType {
196
return target.SyslogTargetType
197
}
198
199
// Ready indicates whether or not the syslog target is ready to be read from.
200
func (t *SyslogTarget) Ready() bool {
201
return t.transport.Ready()
202
}
203
204
// DiscoveredLabels returns the set of labels discovered by the syslog target, which
205
// is always nil. Implements Target.
206
func (t *SyslogTarget) DiscoveredLabels() model.LabelSet {
207
return nil
208
}
209
210
// Labels returns the set of labels that statically apply to all log entries
211
// produced by the SyslogTarget.
212
func (t *SyslogTarget) Labels() model.LabelSet {
213
return t.config.Labels
214
}
215
216
// Details returns target-specific details.
217
func (t *SyslogTarget) Details() interface{} {
218
return map[string]string{}
219
}
220
221
// Stop shuts down the SyslogTarget.
222
func (t *SyslogTarget) Stop() error {
223
err := t.transport.Close()
224
t.transport.Wait()
225
close(t.messages)
226
// wait for all pending messages to be processed and sent to handler
227
<-t.messagesDone
228
t.handler.Stop()
229
return err
230
}
231
232
// ListenAddress returns the address SyslogTarget is listening on.
233
func (t *SyslogTarget) ListenAddress() net.Addr {
234
return t.transport.Addr()
235
}
236
237
func (t *SyslogTarget) transportProtocol() string {
238
if t.config.ListenProtocol != "" {
239
return t.config.ListenProtocol
240
}
241
return DefaultProtocol
242
}
243
244