Path: blob/main/pkg/integrations/statsd_exporter/statsd_exporter.go
5380 views
// Package statsd_exporter embeds https://github.com/prometheus/statsd_exporter1package statsd_exporter //nolint:golint23import (4"context"5"fmt"6"net"7"net/http"8"os"9"strconv"10"time"1112"github.com/go-kit/log"13"github.com/go-kit/log/level"14"github.com/grafana/agent/pkg/build"15"github.com/grafana/agent/pkg/integrations"16"github.com/grafana/agent/pkg/integrations/config"17integrations_v2 "github.com/grafana/agent/pkg/integrations/v2"18"github.com/grafana/agent/pkg/integrations/v2/metricsutils"19"github.com/prometheus/client_golang/prometheus"20"github.com/prometheus/client_golang/prometheus/promhttp"21"github.com/prometheus/statsd_exporter/pkg/address"22"github.com/prometheus/statsd_exporter/pkg/event"23"github.com/prometheus/statsd_exporter/pkg/exporter"24"github.com/prometheus/statsd_exporter/pkg/line"25"github.com/prometheus/statsd_exporter/pkg/listener"26"github.com/prometheus/statsd_exporter/pkg/mapper"27"github.com/prometheus/statsd_exporter/pkg/mappercache/lru"28"github.com/prometheus/statsd_exporter/pkg/mappercache/randomreplacement"29"gopkg.in/yaml.v2"30)3132// DefaultConfig holds the default settings for the statsd_exporter integration.33var DefaultConfig = Config{34ListenUDP: ":9125",35ListenTCP: ":9125",36UnixSocketMode: "755",3738CacheSize: 1000,39CacheType: "lru",40EventQueueSize: 10000,41EventFlushThreshold: 1000,42EventFlushInterval: 200 * time.Millisecond,4344ParseDogStatsd: true,45ParseInfluxDB: true,46ParseLibrato: true,47ParseSignalFX: true,48}4950// Config controls the statsd_exporter integration.51type Config struct {52ListenUDP string `yaml:"listen_udp,omitempty"`53ListenTCP string `yaml:"listen_tcp,omitempty"`54ListenUnixgram string `yaml:"listen_unixgram,omitempty"`55UnixSocketMode string `yaml:"unix_socket_mode,omitempty"`56MappingConfig *mapper.MetricMapper `yaml:"mapping_config,omitempty"`5758ReadBuffer int `yaml:"read_buffer,omitempty"`59CacheSize int `yaml:"cache_size,omitempty"`60CacheType string `yaml:"cache_type,omitempty"`61EventQueueSize int `yaml:"event_queue_size,omitempty"`62EventFlushThreshold int `yaml:"event_flush_threshold,omitempty"`63EventFlushInterval time.Duration `yaml:"event_flush_interval,omitempty"`6465ParseDogStatsd bool `yaml:"parse_dogstatsd_tags,omitempty"`66ParseInfluxDB bool `yaml:"parse_influxdb_tags,omitempty"`67ParseLibrato bool `yaml:"parse_librato_tags,omitempty"`68ParseSignalFX bool `yaml:"parse_signalfx_tags,omitempty"`69}7071// UnmarshalYAML implements yaml.Unmarshaler for Config.72func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {73*c = DefaultConfig7475type plain Config76return unmarshal((*plain)(c))77}7879// Name returns the name of the integration that this config represents.80func (c *Config) Name() string {81return "statsd_exporter"82}8384// InstanceKey returns the hostname:port of the agent.85func (c *Config) InstanceKey(agentKey string) (string, error) {86return agentKey, nil87}8889// NewIntegration converts this config into an instance of an integration.90func (c *Config) NewIntegration(l log.Logger) (integrations.Integration, error) {91return New(l, c)92}9394func init() {95integrations.RegisterIntegration(&Config{})96integrations_v2.RegisterLegacy(&Config{}, integrations_v2.TypeSingleton, metricsutils.NewNamedShim("statsd"))97}9899// Exporter defines the statsd_exporter integration.100type Exporter struct {101cfg *Config102reg *prometheus.Registry103metrics *Metrics104exporter *exporter.Exporter105log log.Logger106}107108// New creates a new statsd_exporter integration. The integration scrapes109// metrics from a statsd process.110func New(log log.Logger, c *Config) (integrations.Integration, error) {111reg := prometheus.NewRegistry()112113m, err := NewMetrics(reg)114if err != nil {115return nil, fmt.Errorf("failed to create metrics for network listeners: %w", err)116}117118if c.ListenUDP == "" && c.ListenTCP == "" && c.ListenUnixgram == "" {119return nil, fmt.Errorf("at least one of UDP/TCP/Unixgram listeners must be used")120}121statsdMapper := &mapper.MetricMapper{122Registerer: reg,123MappingsCount: m.MappingsCount,124Logger: log,125}126127if c.MappingConfig != nil {128cfgBytes, err := yaml.Marshal(c.MappingConfig)129if err != nil {130return nil, fmt.Errorf("failed to serialize mapping config: %w", err)131}132133err = statsdMapper.InitFromYAMLString(string(cfgBytes))134if err != nil {135return nil, fmt.Errorf("failed to load mapping config: %w", err)136}137}138139var cache mapper.MetricMapperCache140if c.CacheSize != 0 {141switch c.CacheType {142case "lru":143cache, err = lru.NewMetricMapperLRUCache(statsdMapper.Registerer, c.CacheSize)144case "random":145cache, err = randomreplacement.NewMetricMapperRRCache(statsdMapper.Registerer, c.CacheSize)146default:147err = fmt.Errorf("unsupported cache type %q", c.CacheType)148}149if err != nil {150return nil, err151}152}153if cache != nil {154statsdMapper.UseCache(cache)155}156157e := exporter.NewExporter(reg, statsdMapper, log, m.EventsActions, m.EventsUnmapped, m.ErrorEventStats, m.EventStats, m.ConflictingEventStats, m.MetricsCount)158159if err := reg.Register(build.NewCollector("statsd_exporter")); err != nil {160return nil, fmt.Errorf("couldn't register version metrics: %w", err)161}162163return &Exporter{164cfg: c,165metrics: m,166exporter: e,167reg: reg,168log: log,169}, nil170}171172// MetricsHandler returns the HTTP handler for the integration.173func (e *Exporter) MetricsHandler() (http.Handler, error) {174return promhttp.HandlerFor(e.reg, promhttp.HandlerOpts{175ErrorHandling: promhttp.ContinueOnError,176}), nil177}178179// ScrapeConfigs satisfies Integration.ScrapeConfigs.180func (e *Exporter) ScrapeConfigs() []config.ScrapeConfig {181return []config.ScrapeConfig{{JobName: e.cfg.Name(), MetricsPath: "/metrics"}}182}183184// Run satisfies Run.185func (e *Exporter) Run(ctx context.Context) error {186parser := line.NewParser()187if e.cfg.ParseDogStatsd {188parser.EnableDogstatsdParsing()189}190if e.cfg.ParseInfluxDB {191parser.EnableInfluxdbParsing()192}193if e.cfg.ParseLibrato {194parser.EnableLibratoParsing()195}196if e.cfg.ParseSignalFX {197parser.EnableSignalFXParsing()198}199200events := make(chan event.Events, e.cfg.EventQueueSize)201defer close(events)202eventQueue := event.NewEventQueue(events, e.cfg.EventFlushThreshold, e.cfg.EventFlushInterval, e.metrics.EventsFlushed)203204if e.cfg.ListenUDP != "" {205addr, err := address.UDPAddrFromString(e.cfg.ListenUDP)206if err != nil {207return fmt.Errorf("invalid UDP listen address %s: %w", e.cfg.ListenUDP, err)208}209uconn, err := net.ListenUDP("udp", addr)210if err != nil {211return fmt.Errorf("failed to start UDP listener: %w", err)212}213defer func() {214err := uconn.Close()215if err != nil {216level.Warn(e.log).Log("msg", "failed to close UDP listener", "err", err)217}218}()219220if e.cfg.ReadBuffer != 0 {221err = uconn.SetReadBuffer(e.cfg.ReadBuffer)222if err != nil {223return fmt.Errorf("failed to set UDP read buffer: %w", err)224}225}226227ul := &listener.StatsDUDPListener{228Conn: uconn,229EventHandler: eventQueue,230Logger: e.log,231LineParser: parser,232UDPPackets: e.metrics.UDPPackets,233LinesReceived: e.metrics.LinesReceived,234EventsFlushed: e.metrics.EventsFlushed,235SampleErrors: *e.metrics.SampleErrors,236SamplesReceived: e.metrics.SamplesReceived,237TagErrors: e.metrics.TagErrors,238TagsReceived: e.metrics.TagsReceived,239}240241go ul.Listen()242}243244if e.cfg.ListenTCP != "" {245addr, err := address.TCPAddrFromString(e.cfg.ListenTCP)246if err != nil {247return fmt.Errorf("invalid TCP listen address %s: %w", e.cfg.ListenTCP, err)248}249tconn, err := net.ListenTCP("tcp", addr)250if err != nil {251return fmt.Errorf("failed to start TCP listener: %w", err)252}253defer func() {254err := tconn.Close()255if err != nil {256level.Warn(e.log).Log("msg", "failed to close TCP listener", "err", err)257}258}()259260tl := &listener.StatsDTCPListener{261Conn: tconn,262EventHandler: eventQueue,263Logger: e.log,264LineParser: parser,265LinesReceived: e.metrics.LinesReceived,266EventsFlushed: e.metrics.EventsFlushed,267SampleErrors: *e.metrics.SampleErrors,268SamplesReceived: e.metrics.SamplesReceived,269TagErrors: e.metrics.TagErrors,270TagsReceived: e.metrics.TagsReceived,271TCPConnections: e.metrics.TCPConnections,272TCPErrors: e.metrics.TCPErrors,273TCPLineTooLong: e.metrics.TCPLineTooLong,274}275276go tl.Listen()277}278279if e.cfg.ListenUnixgram != "" {280var err error281if _, err = os.Stat(e.cfg.ListenUnixgram); !os.IsNotExist(err) {282return fmt.Errorf("unixgram socket %s already exists: %w", e.cfg.ListenUnixgram, err)283}284uxgconn, err := net.ListenUnixgram("unixgram", &net.UnixAddr{285Net: "unixgram",286Name: e.cfg.ListenUnixgram,287})288if err != nil {289return fmt.Errorf("failed to listen on unixgram socket: %w", err)290}291defer func() {292err := uxgconn.Close()293if err != nil {294level.Warn(e.log).Log("msg", "failed to close unixgram listener", "err", err)295}296}()297298if e.cfg.ReadBuffer != 0 {299err = uxgconn.SetReadBuffer(e.cfg.ReadBuffer)300if err != nil {301return fmt.Errorf("error setting unixgram read buffer: %w", err)302}303}304305ul := &listener.StatsDUnixgramListener{306Conn: uxgconn,307EventHandler: eventQueue,308Logger: e.log,309LineParser: parser,310UnixgramPackets: e.metrics.UnixgramPackets,311LinesReceived: e.metrics.LinesReceived,312EventsFlushed: e.metrics.EventsFlushed,313SampleErrors: *e.metrics.SampleErrors,314SamplesReceived: e.metrics.SamplesReceived,315TagErrors: e.metrics.TagErrors,316TagsReceived: e.metrics.TagsReceived,317}318319go ul.Listen()320321// If it's an abstract unix domain socket, it won't exist on fs so we can't322// chmod it either.323if _, err := os.Stat(e.cfg.ListenUnixgram); !os.IsNotExist(err) {324defer os.Remove(e.cfg.ListenUnixgram)325326// Convert the string to octet327perm, err := strconv.ParseInt("0"+e.cfg.UnixSocketMode, 8, 32)328if err != nil {329level.Warn(e.log).Log("msg", "bad permission on unixgram socket, ignoring", "permission", e.cfg.UnixSocketMode, "socket", e.cfg.ListenUnixgram, "err", err)330} else {331err = os.Chmod(e.cfg.ListenUnixgram, os.FileMode(perm))332if err != nil {333level.Warn(e.log).Log("msg", "failed to change unixgram socket permission", "socket", e.cfg.ListenUnixgram, "err", err)334}335}336}337}338339go e.exporter.Listen(events)340341<-ctx.Done()342return nil343}344345346