Path: blob/main/component/loki/source/gelf/internal/target/gelftarget.go
4100 views
package target12// This code is copied from Promtail. The target package is used to3// configure and run the targets that can read gelf entries and forward them4// to other loki components.56import (7"bytes"8"context"9"strings"10"sync"11"time"1213"github.com/grafana/agent/component/common/loki"1415"github.com/go-kit/log"16"github.com/go-kit/log/level"17"github.com/grafana/go-gelf/v2/gelf"18"github.com/prometheus/common/model"19"github.com/prometheus/prometheus/model/labels"20"github.com/prometheus/prometheus/model/relabel"2122"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"23"github.com/grafana/loki/clients/pkg/promtail/targets/target"2425"github.com/grafana/loki/pkg/logproto"26)2728// SeverityLevels maps severity levels to severity string levels.29var SeverityLevels = map[int32]string{300: "emergency",311: "alert",322: "critical",333: "error",344: "warning",355: "notice",366: "informational",377: "debug",38}3940// Target listens to gelf messages on udp.41type Target struct {42metrics *Metrics43logger log.Logger44handler loki.EntryHandler45config *scrapeconfig.GelfTargetConfig46relabelConfig []*relabel.Config47gelfReader *gelf.Reader48encodeBuff *bytes.Buffer49wg sync.WaitGroup5051ctx context.Context52ctxCancel context.CancelFunc53}5455// NewTarget configures a new Gelf Target.56func NewTarget(57metrics *Metrics,58logger log.Logger,59handler loki.EntryHandler,60relabel []*relabel.Config,61config *scrapeconfig.GelfTargetConfig,62) (*Target, error) {6364if config.ListenAddress == "" {65config.ListenAddress = ":12201"66}6768gelfReader, err := gelf.NewReader(config.ListenAddress)69if err != nil {70return nil, err71}72ctx, cancel := context.WithCancel(context.Background())7374t := &Target{75metrics: metrics,76logger: logger,77handler: handler,78config: config,79relabelConfig: relabel,80gelfReader: gelfReader,81encodeBuff: bytes.NewBuffer(make([]byte, 0, 1024)),8283ctx: ctx,84ctxCancel: cancel,85}8687t.run()88return t, err89}9091func (t *Target) run() {92t.wg.Add(1)93go func() {94defer t.wg.Done()95level.Info(t.logger).Log("msg", "listening for GELF UDP messages", "listen_address", t.config.ListenAddress)96for {97select {98case <-t.ctx.Done():99level.Info(t.logger).Log("msg", "GELF UDP listener shutdown", "listen_address", t.config.ListenAddress)100return101default:102msg, err := t.gelfReader.ReadMessage()103if err != nil {104level.Error(t.logger).Log("msg", "error while reading gelf message", "listen_address", t.config.ListenAddress, "err", err)105t.metrics.gelfErrors.Inc()106continue107}108if msg != nil {109t.metrics.gelfEntries.Inc()110t.handleMessage(msg)111}112}113}114}()115}116117func (t *Target) handleMessage(msg *gelf.Message) {118lb := labels.NewBuilder(nil)119120// Add all labels from the config.121for k, v := range t.config.Labels {122lb.Set(string(k), string(v))123}124lb.Set("__gelf_message_level", SeverityLevels[msg.Level])125lb.Set("__gelf_message_host", msg.Host)126lb.Set("__gelf_message_version", msg.Version)127lb.Set("__gelf_message_facility", msg.Facility)128129processed, _ := relabel.Process(lb.Labels(nil), t.relabelConfig...)130131filtered := make(model.LabelSet)132for _, lbl := range processed {133if strings.HasPrefix(lbl.Name, "__") {134continue135}136filtered[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)137}138139var timestamp time.Time140if t.config.UseIncomingTimestamp && msg.TimeUnix != 0 {141// TimeUnix is the timestamp of the message, in seconds since the UNIX epoch with decimals for fractional seconds.142timestamp = secondsToUnixTimestamp(msg.TimeUnix)143} else {144timestamp = time.Now()145}146t.encodeBuff.Reset()147err := msg.MarshalJSONBuf(t.encodeBuff)148if err != nil {149level.Error(t.logger).Log("msg", "error while marshalling gelf message", "listen_address", t.config.ListenAddress, "err", err)150t.metrics.gelfErrors.Inc()151return152}153t.handler.Chan() <- loki.Entry{154Labels: filtered,155Entry: logproto.Entry{156Timestamp: timestamp,157Line: t.encodeBuff.String(),158},159}160}161162func secondsToUnixTimestamp(seconds float64) time.Time {163return time.Unix(0, int64(seconds*float64(time.Second)))164}165166// Type returns GelfTargetType.167func (t *Target) Type() target.TargetType {168return target.GelfTargetType169}170171// Ready indicates whether or not the gelf target is ready to be read from.172func (t *Target) Ready() bool {173return true174}175176// DiscoveredLabels returns the set of labels discovered by the gelf target, which177// is always nil. Implements Target.178func (t *Target) DiscoveredLabels() model.LabelSet {179return nil180}181182// Labels returns the set of labels that statically apply to all log entries183// produced by the GelfTarget.184func (t *Target) Labels() model.LabelSet {185return t.config.Labels186}187188// Details returns target-specific details.189func (t *Target) Details() interface{} {190return map[string]string{}191}192193// Stop shuts down the GelfTarget.194func (t *Target) Stop() {195level.Info(t.logger).Log("msg", "Shutting down GELF UDP listener", "listen_address", t.config.ListenAddress)196t.ctxCancel()197if err := t.gelfReader.Close(); err != nil {198level.Error(t.logger).Log("msg", "error while closing gelf reader", "err", err)199}200t.wg.Wait()201t.handler.Stop()202}203204205