Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/gelf/gelf.go
4096 views
1
package gelf
2
3
import (
4
"context"
5
"sync"
6
7
"github.com/grafana/agent/component"
8
"github.com/grafana/agent/component/common/loki"
9
flow_relabel "github.com/grafana/agent/component/common/relabel"
10
"github.com/grafana/agent/component/loki/source/gelf/internal/target"
11
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
12
"github.com/prometheus/common/model"
13
"github.com/prometheus/prometheus/model/relabel"
14
)
15
16
func init() {
17
component.Register(component.Registration{
18
Name: "loki.source.gelf",
19
Args: Arguments{},
20
21
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
22
return New(opts, args.(Arguments))
23
},
24
})
25
}
26
27
var _ component.Component = (*Component)(nil)
28
29
// Component is a receiver for graylog formatted log files.
30
type Component struct {
31
mut sync.RWMutex
32
target *target.Target
33
o component.Options
34
metrics *target.Metrics
35
handler *handler
36
receivers []loki.LogsReceiver
37
}
38
39
// Run starts the component.
40
func (c *Component) Run(ctx context.Context) error {
41
defer func() {
42
c.target.Stop()
43
}()
44
for {
45
select {
46
case <-ctx.Done():
47
return nil
48
case entry := <-c.handler.c:
49
c.mut.RLock()
50
lokiEntry := loki.Entry{
51
Labels: entry.Labels,
52
Entry: entry.Entry,
53
}
54
if lokiEntry.Labels["job"] == "" {
55
lokiEntry.Labels["job"] = model.LabelValue(c.o.ID)
56
}
57
for _, r := range c.receivers {
58
r <- lokiEntry
59
}
60
c.mut.RUnlock()
61
}
62
}
63
}
64
65
// Update updates the fields of the component.
66
func (c *Component) Update(args component.Arguments) error {
67
newArgs := args.(Arguments)
68
69
c.mut.Lock()
70
defer c.mut.Unlock()
71
72
if c.target != nil {
73
c.target.Stop()
74
}
75
c.receivers = newArgs.Receivers
76
77
var rcs []*relabel.Config
78
if newArgs.RelabelRules != nil && len(newArgs.RelabelRules) > 0 {
79
rcs = flow_relabel.ComponentToPromRelabelConfigs(newArgs.RelabelRules)
80
}
81
82
t, err := target.NewTarget(c.metrics, c.o.Logger, c.handler, rcs, convertConfig(newArgs))
83
if err != nil {
84
return err
85
}
86
c.target = t
87
return nil
88
}
89
90
// Arguments are the arguments for the component.
91
type Arguments struct {
92
// ListenAddress only supports UDP.
93
ListenAddress string `river:"listen_address,attr,optional"`
94
UseIncomingTimestamp bool `river:"use_incoming_timestamp,attr,optional"`
95
RelabelRules flow_relabel.Rules `river:"relabel_rules,attr,optional"`
96
Receivers []loki.LogsReceiver `river:"forward_to,attr"`
97
}
98
99
func defaultArgs() Arguments {
100
return Arguments{
101
ListenAddress: "0.0.0.0:12201",
102
UseIncomingTimestamp: false,
103
}
104
}
105
106
// UnmarshalRiver implements river.Unmarshaler.
107
func (r *Arguments) UnmarshalRiver(f func(v interface{}) error) error {
108
*r = defaultArgs()
109
110
type arguments Arguments
111
if err := f((*arguments)(r)); err != nil {
112
return err
113
}
114
115
return nil
116
}
117
118
func convertConfig(a Arguments) *scrapeconfig.GelfTargetConfig {
119
return &scrapeconfig.GelfTargetConfig{
120
ListenAddress: a.ListenAddress,
121
Labels: nil,
122
UseIncomingTimestamp: a.UseIncomingTimestamp,
123
}
124
}
125
126
// New creates a new gelf component.
127
func New(o component.Options, args Arguments) (component.Component, error) {
128
metrics := target.NewMetrics(o.Registerer)
129
c := &Component{
130
o: o,
131
metrics: metrics,
132
handler: &handler{c: make(chan loki.Entry)},
133
}
134
// Call to Update() to start readers and set receivers once at the start.
135
if err := c.Update(args); err != nil {
136
return nil, err
137
}
138
return c, nil
139
}
140
141
type handler struct {
142
c chan loki.Entry
143
}
144
145
func (h *handler) Chan() chan<- loki.Entry {
146
return h.c
147
}
148
func (handler) Stop() {
149
// noop.
150
}
151
152