Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/integrations/statsd_exporter/statsd_exporter.go
5380 views
1
// Package statsd_exporter embeds https://github.com/prometheus/statsd_exporter
2
package statsd_exporter //nolint:golint
3
4
import (
5
"context"
6
"fmt"
7
"net"
8
"net/http"
9
"os"
10
"strconv"
11
"time"
12
13
"github.com/go-kit/log"
14
"github.com/go-kit/log/level"
15
"github.com/grafana/agent/pkg/build"
16
"github.com/grafana/agent/pkg/integrations"
17
"github.com/grafana/agent/pkg/integrations/config"
18
integrations_v2 "github.com/grafana/agent/pkg/integrations/v2"
19
"github.com/grafana/agent/pkg/integrations/v2/metricsutils"
20
"github.com/prometheus/client_golang/prometheus"
21
"github.com/prometheus/client_golang/prometheus/promhttp"
22
"github.com/prometheus/statsd_exporter/pkg/address"
23
"github.com/prometheus/statsd_exporter/pkg/event"
24
"github.com/prometheus/statsd_exporter/pkg/exporter"
25
"github.com/prometheus/statsd_exporter/pkg/line"
26
"github.com/prometheus/statsd_exporter/pkg/listener"
27
"github.com/prometheus/statsd_exporter/pkg/mapper"
28
"github.com/prometheus/statsd_exporter/pkg/mappercache/lru"
29
"github.com/prometheus/statsd_exporter/pkg/mappercache/randomreplacement"
30
"gopkg.in/yaml.v2"
31
)
32
33
// DefaultConfig holds the default settings for the statsd_exporter integration.
34
var DefaultConfig = Config{
35
ListenUDP: ":9125",
36
ListenTCP: ":9125",
37
UnixSocketMode: "755",
38
39
CacheSize: 1000,
40
CacheType: "lru",
41
EventQueueSize: 10000,
42
EventFlushThreshold: 1000,
43
EventFlushInterval: 200 * time.Millisecond,
44
45
ParseDogStatsd: true,
46
ParseInfluxDB: true,
47
ParseLibrato: true,
48
ParseSignalFX: true,
49
}
50
51
// Config controls the statsd_exporter integration.
52
type Config struct {
53
ListenUDP string `yaml:"listen_udp,omitempty"`
54
ListenTCP string `yaml:"listen_tcp,omitempty"`
55
ListenUnixgram string `yaml:"listen_unixgram,omitempty"`
56
UnixSocketMode string `yaml:"unix_socket_mode,omitempty"`
57
MappingConfig *mapper.MetricMapper `yaml:"mapping_config,omitempty"`
58
59
ReadBuffer int `yaml:"read_buffer,omitempty"`
60
CacheSize int `yaml:"cache_size,omitempty"`
61
CacheType string `yaml:"cache_type,omitempty"`
62
EventQueueSize int `yaml:"event_queue_size,omitempty"`
63
EventFlushThreshold int `yaml:"event_flush_threshold,omitempty"`
64
EventFlushInterval time.Duration `yaml:"event_flush_interval,omitempty"`
65
66
ParseDogStatsd bool `yaml:"parse_dogstatsd_tags,omitempty"`
67
ParseInfluxDB bool `yaml:"parse_influxdb_tags,omitempty"`
68
ParseLibrato bool `yaml:"parse_librato_tags,omitempty"`
69
ParseSignalFX bool `yaml:"parse_signalfx_tags,omitempty"`
70
}
71
72
// UnmarshalYAML implements yaml.Unmarshaler for Config.
73
func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
74
*c = DefaultConfig
75
76
type plain Config
77
return unmarshal((*plain)(c))
78
}
79
80
// Name returns the name of the integration that this config represents.
81
func (c *Config) Name() string {
82
return "statsd_exporter"
83
}
84
85
// InstanceKey returns the hostname:port of the agent.
86
func (c *Config) InstanceKey(agentKey string) (string, error) {
87
return agentKey, nil
88
}
89
90
// NewIntegration converts this config into an instance of an integration.
91
func (c *Config) NewIntegration(l log.Logger) (integrations.Integration, error) {
92
return New(l, c)
93
}
94
95
func init() {
96
integrations.RegisterIntegration(&Config{})
97
integrations_v2.RegisterLegacy(&Config{}, integrations_v2.TypeSingleton, metricsutils.NewNamedShim("statsd"))
98
}
99
100
// Exporter defines the statsd_exporter integration.
101
type Exporter struct {
102
cfg *Config
103
reg *prometheus.Registry
104
metrics *Metrics
105
exporter *exporter.Exporter
106
log log.Logger
107
}
108
109
// New creates a new statsd_exporter integration. The integration scrapes
110
// metrics from a statsd process.
111
func New(log log.Logger, c *Config) (integrations.Integration, error) {
112
reg := prometheus.NewRegistry()
113
114
m, err := NewMetrics(reg)
115
if err != nil {
116
return nil, fmt.Errorf("failed to create metrics for network listeners: %w", err)
117
}
118
119
if c.ListenUDP == "" && c.ListenTCP == "" && c.ListenUnixgram == "" {
120
return nil, fmt.Errorf("at least one of UDP/TCP/Unixgram listeners must be used")
121
}
122
statsdMapper := &mapper.MetricMapper{
123
Registerer: reg,
124
MappingsCount: m.MappingsCount,
125
Logger: log,
126
}
127
128
if c.MappingConfig != nil {
129
cfgBytes, err := yaml.Marshal(c.MappingConfig)
130
if err != nil {
131
return nil, fmt.Errorf("failed to serialize mapping config: %w", err)
132
}
133
134
err = statsdMapper.InitFromYAMLString(string(cfgBytes))
135
if err != nil {
136
return nil, fmt.Errorf("failed to load mapping config: %w", err)
137
}
138
}
139
140
var cache mapper.MetricMapperCache
141
if c.CacheSize != 0 {
142
switch c.CacheType {
143
case "lru":
144
cache, err = lru.NewMetricMapperLRUCache(statsdMapper.Registerer, c.CacheSize)
145
case "random":
146
cache, err = randomreplacement.NewMetricMapperRRCache(statsdMapper.Registerer, c.CacheSize)
147
default:
148
err = fmt.Errorf("unsupported cache type %q", c.CacheType)
149
}
150
if err != nil {
151
return nil, err
152
}
153
}
154
if cache != nil {
155
statsdMapper.UseCache(cache)
156
}
157
158
e := exporter.NewExporter(reg, statsdMapper, log, m.EventsActions, m.EventsUnmapped, m.ErrorEventStats, m.EventStats, m.ConflictingEventStats, m.MetricsCount)
159
160
if err := reg.Register(build.NewCollector("statsd_exporter")); err != nil {
161
return nil, fmt.Errorf("couldn't register version metrics: %w", err)
162
}
163
164
return &Exporter{
165
cfg: c,
166
metrics: m,
167
exporter: e,
168
reg: reg,
169
log: log,
170
}, nil
171
}
172
173
// MetricsHandler returns the HTTP handler for the integration.
174
func (e *Exporter) MetricsHandler() (http.Handler, error) {
175
return promhttp.HandlerFor(e.reg, promhttp.HandlerOpts{
176
ErrorHandling: promhttp.ContinueOnError,
177
}), nil
178
}
179
180
// ScrapeConfigs satisfies Integration.ScrapeConfigs.
181
func (e *Exporter) ScrapeConfigs() []config.ScrapeConfig {
182
return []config.ScrapeConfig{{JobName: e.cfg.Name(), MetricsPath: "/metrics"}}
183
}
184
185
// Run satisfies Run.
186
func (e *Exporter) Run(ctx context.Context) error {
187
parser := line.NewParser()
188
if e.cfg.ParseDogStatsd {
189
parser.EnableDogstatsdParsing()
190
}
191
if e.cfg.ParseInfluxDB {
192
parser.EnableInfluxdbParsing()
193
}
194
if e.cfg.ParseLibrato {
195
parser.EnableLibratoParsing()
196
}
197
if e.cfg.ParseSignalFX {
198
parser.EnableSignalFXParsing()
199
}
200
201
events := make(chan event.Events, e.cfg.EventQueueSize)
202
defer close(events)
203
eventQueue := event.NewEventQueue(events, e.cfg.EventFlushThreshold, e.cfg.EventFlushInterval, e.metrics.EventsFlushed)
204
205
if e.cfg.ListenUDP != "" {
206
addr, err := address.UDPAddrFromString(e.cfg.ListenUDP)
207
if err != nil {
208
return fmt.Errorf("invalid UDP listen address %s: %w", e.cfg.ListenUDP, err)
209
}
210
uconn, err := net.ListenUDP("udp", addr)
211
if err != nil {
212
return fmt.Errorf("failed to start UDP listener: %w", err)
213
}
214
defer func() {
215
err := uconn.Close()
216
if err != nil {
217
level.Warn(e.log).Log("msg", "failed to close UDP listener", "err", err)
218
}
219
}()
220
221
if e.cfg.ReadBuffer != 0 {
222
err = uconn.SetReadBuffer(e.cfg.ReadBuffer)
223
if err != nil {
224
return fmt.Errorf("failed to set UDP read buffer: %w", err)
225
}
226
}
227
228
ul := &listener.StatsDUDPListener{
229
Conn: uconn,
230
EventHandler: eventQueue,
231
Logger: e.log,
232
LineParser: parser,
233
UDPPackets: e.metrics.UDPPackets,
234
LinesReceived: e.metrics.LinesReceived,
235
EventsFlushed: e.metrics.EventsFlushed,
236
SampleErrors: *e.metrics.SampleErrors,
237
SamplesReceived: e.metrics.SamplesReceived,
238
TagErrors: e.metrics.TagErrors,
239
TagsReceived: e.metrics.TagsReceived,
240
}
241
242
go ul.Listen()
243
}
244
245
if e.cfg.ListenTCP != "" {
246
addr, err := address.TCPAddrFromString(e.cfg.ListenTCP)
247
if err != nil {
248
return fmt.Errorf("invalid TCP listen address %s: %w", e.cfg.ListenTCP, err)
249
}
250
tconn, err := net.ListenTCP("tcp", addr)
251
if err != nil {
252
return fmt.Errorf("failed to start TCP listener: %w", err)
253
}
254
defer func() {
255
err := tconn.Close()
256
if err != nil {
257
level.Warn(e.log).Log("msg", "failed to close TCP listener", "err", err)
258
}
259
}()
260
261
tl := &listener.StatsDTCPListener{
262
Conn: tconn,
263
EventHandler: eventQueue,
264
Logger: e.log,
265
LineParser: parser,
266
LinesReceived: e.metrics.LinesReceived,
267
EventsFlushed: e.metrics.EventsFlushed,
268
SampleErrors: *e.metrics.SampleErrors,
269
SamplesReceived: e.metrics.SamplesReceived,
270
TagErrors: e.metrics.TagErrors,
271
TagsReceived: e.metrics.TagsReceived,
272
TCPConnections: e.metrics.TCPConnections,
273
TCPErrors: e.metrics.TCPErrors,
274
TCPLineTooLong: e.metrics.TCPLineTooLong,
275
}
276
277
go tl.Listen()
278
}
279
280
if e.cfg.ListenUnixgram != "" {
281
var err error
282
if _, err = os.Stat(e.cfg.ListenUnixgram); !os.IsNotExist(err) {
283
return fmt.Errorf("unixgram socket %s already exists: %w", e.cfg.ListenUnixgram, err)
284
}
285
uxgconn, err := net.ListenUnixgram("unixgram", &net.UnixAddr{
286
Net: "unixgram",
287
Name: e.cfg.ListenUnixgram,
288
})
289
if err != nil {
290
return fmt.Errorf("failed to listen on unixgram socket: %w", err)
291
}
292
defer func() {
293
err := uxgconn.Close()
294
if err != nil {
295
level.Warn(e.log).Log("msg", "failed to close unixgram listener", "err", err)
296
}
297
}()
298
299
if e.cfg.ReadBuffer != 0 {
300
err = uxgconn.SetReadBuffer(e.cfg.ReadBuffer)
301
if err != nil {
302
return fmt.Errorf("error setting unixgram read buffer: %w", err)
303
}
304
}
305
306
ul := &listener.StatsDUnixgramListener{
307
Conn: uxgconn,
308
EventHandler: eventQueue,
309
Logger: e.log,
310
LineParser: parser,
311
UnixgramPackets: e.metrics.UnixgramPackets,
312
LinesReceived: e.metrics.LinesReceived,
313
EventsFlushed: e.metrics.EventsFlushed,
314
SampleErrors: *e.metrics.SampleErrors,
315
SamplesReceived: e.metrics.SamplesReceived,
316
TagErrors: e.metrics.TagErrors,
317
TagsReceived: e.metrics.TagsReceived,
318
}
319
320
go ul.Listen()
321
322
// If it's an abstract unix domain socket, it won't exist on fs so we can't
323
// chmod it either.
324
if _, err := os.Stat(e.cfg.ListenUnixgram); !os.IsNotExist(err) {
325
defer os.Remove(e.cfg.ListenUnixgram)
326
327
// Convert the string to octet
328
perm, err := strconv.ParseInt("0"+e.cfg.UnixSocketMode, 8, 32)
329
if err != nil {
330
level.Warn(e.log).Log("msg", "bad permission on unixgram socket, ignoring", "permission", e.cfg.UnixSocketMode, "socket", e.cfg.ListenUnixgram, "err", err)
331
} else {
332
err = os.Chmod(e.cfg.ListenUnixgram, os.FileMode(perm))
333
if err != nil {
334
level.Warn(e.log).Log("msg", "failed to change unixgram socket permission", "socket", e.cfg.ListenUnixgram, "err", err)
335
}
336
}
337
}
338
}
339
340
go e.exporter.Listen(events)
341
342
<-ctx.Done()
343
return nil
344
}
345
346