Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/gelf/internal/target/gelftarget.go
4100 views
1
package target
2
3
// This code is copied from Promtail. The target package is used to
4
// configure and run the targets that can read gelf entries and forward them
5
// to other loki components.
6
7
import (
8
"bytes"
9
"context"
10
"strings"
11
"sync"
12
"time"
13
14
"github.com/grafana/agent/component/common/loki"
15
16
"github.com/go-kit/log"
17
"github.com/go-kit/log/level"
18
"github.com/grafana/go-gelf/v2/gelf"
19
"github.com/prometheus/common/model"
20
"github.com/prometheus/prometheus/model/labels"
21
"github.com/prometheus/prometheus/model/relabel"
22
23
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
24
"github.com/grafana/loki/clients/pkg/promtail/targets/target"
25
26
"github.com/grafana/loki/pkg/logproto"
27
)
28
29
// SeverityLevels maps severity levels to severity string levels.
30
var SeverityLevels = map[int32]string{
31
0: "emergency",
32
1: "alert",
33
2: "critical",
34
3: "error",
35
4: "warning",
36
5: "notice",
37
6: "informational",
38
7: "debug",
39
}
40
41
// Target listens to gelf messages on udp.
42
type Target struct {
43
metrics *Metrics
44
logger log.Logger
45
handler loki.EntryHandler
46
config *scrapeconfig.GelfTargetConfig
47
relabelConfig []*relabel.Config
48
gelfReader *gelf.Reader
49
encodeBuff *bytes.Buffer
50
wg sync.WaitGroup
51
52
ctx context.Context
53
ctxCancel context.CancelFunc
54
}
55
56
// NewTarget configures a new Gelf Target.
57
func NewTarget(
58
metrics *Metrics,
59
logger log.Logger,
60
handler loki.EntryHandler,
61
relabel []*relabel.Config,
62
config *scrapeconfig.GelfTargetConfig,
63
) (*Target, error) {
64
65
if config.ListenAddress == "" {
66
config.ListenAddress = ":12201"
67
}
68
69
gelfReader, err := gelf.NewReader(config.ListenAddress)
70
if err != nil {
71
return nil, err
72
}
73
ctx, cancel := context.WithCancel(context.Background())
74
75
t := &Target{
76
metrics: metrics,
77
logger: logger,
78
handler: handler,
79
config: config,
80
relabelConfig: relabel,
81
gelfReader: gelfReader,
82
encodeBuff: bytes.NewBuffer(make([]byte, 0, 1024)),
83
84
ctx: ctx,
85
ctxCancel: cancel,
86
}
87
88
t.run()
89
return t, err
90
}
91
92
func (t *Target) run() {
93
t.wg.Add(1)
94
go func() {
95
defer t.wg.Done()
96
level.Info(t.logger).Log("msg", "listening for GELF UDP messages", "listen_address", t.config.ListenAddress)
97
for {
98
select {
99
case <-t.ctx.Done():
100
level.Info(t.logger).Log("msg", "GELF UDP listener shutdown", "listen_address", t.config.ListenAddress)
101
return
102
default:
103
msg, err := t.gelfReader.ReadMessage()
104
if err != nil {
105
level.Error(t.logger).Log("msg", "error while reading gelf message", "listen_address", t.config.ListenAddress, "err", err)
106
t.metrics.gelfErrors.Inc()
107
continue
108
}
109
if msg != nil {
110
t.metrics.gelfEntries.Inc()
111
t.handleMessage(msg)
112
}
113
}
114
}
115
}()
116
}
117
118
func (t *Target) handleMessage(msg *gelf.Message) {
119
lb := labels.NewBuilder(nil)
120
121
// Add all labels from the config.
122
for k, v := range t.config.Labels {
123
lb.Set(string(k), string(v))
124
}
125
lb.Set("__gelf_message_level", SeverityLevels[msg.Level])
126
lb.Set("__gelf_message_host", msg.Host)
127
lb.Set("__gelf_message_version", msg.Version)
128
lb.Set("__gelf_message_facility", msg.Facility)
129
130
processed, _ := relabel.Process(lb.Labels(nil), t.relabelConfig...)
131
132
filtered := make(model.LabelSet)
133
for _, lbl := range processed {
134
if strings.HasPrefix(lbl.Name, "__") {
135
continue
136
}
137
filtered[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
138
}
139
140
var timestamp time.Time
141
if t.config.UseIncomingTimestamp && msg.TimeUnix != 0 {
142
// TimeUnix is the timestamp of the message, in seconds since the UNIX epoch with decimals for fractional seconds.
143
timestamp = secondsToUnixTimestamp(msg.TimeUnix)
144
} else {
145
timestamp = time.Now()
146
}
147
t.encodeBuff.Reset()
148
err := msg.MarshalJSONBuf(t.encodeBuff)
149
if err != nil {
150
level.Error(t.logger).Log("msg", "error while marshalling gelf message", "listen_address", t.config.ListenAddress, "err", err)
151
t.metrics.gelfErrors.Inc()
152
return
153
}
154
t.handler.Chan() <- loki.Entry{
155
Labels: filtered,
156
Entry: logproto.Entry{
157
Timestamp: timestamp,
158
Line: t.encodeBuff.String(),
159
},
160
}
161
}
162
163
func secondsToUnixTimestamp(seconds float64) time.Time {
164
return time.Unix(0, int64(seconds*float64(time.Second)))
165
}
166
167
// Type returns GelfTargetType.
168
func (t *Target) Type() target.TargetType {
169
return target.GelfTargetType
170
}
171
172
// Ready indicates whether or not the gelf target is ready to be read from.
173
func (t *Target) Ready() bool {
174
return true
175
}
176
177
// DiscoveredLabels returns the set of labels discovered by the gelf target, which
178
// is always nil. Implements Target.
179
func (t *Target) DiscoveredLabels() model.LabelSet {
180
return nil
181
}
182
183
// Labels returns the set of labels that statically apply to all log entries
184
// produced by the GelfTarget.
185
func (t *Target) Labels() model.LabelSet {
186
return t.config.Labels
187
}
188
189
// Details returns target-specific details.
190
func (t *Target) Details() interface{} {
191
return map[string]string{}
192
}
193
194
// Stop shuts down the GelfTarget.
195
func (t *Target) Stop() {
196
level.Info(t.logger).Log("msg", "Shutting down GELF UDP listener", "listen_address", t.config.ListenAddress)
197
t.ctxCancel()
198
if err := t.gelfReader.Close(); err != nil {
199
level.Error(t.logger).Log("msg", "error while closing gelf reader", "err", err)
200
}
201
t.wg.Wait()
202
t.handler.Stop()
203
}
204
205