Path: blob/main/component/loki/source/heroku/heroku.go
4096 views
package heroku12import (3"context"4"reflect"5"sync"67"github.com/go-kit/log/level"8"github.com/grafana/agent/component"9"github.com/grafana/agent/component/common/loki"10fnet "github.com/grafana/agent/component/common/net"11flow_relabel "github.com/grafana/agent/component/common/relabel"12ht "github.com/grafana/agent/component/loki/source/heroku/internal/herokutarget"13"github.com/grafana/agent/pkg/util"14"github.com/prometheus/client_golang/prometheus"15"github.com/prometheus/common/model"16"github.com/prometheus/prometheus/model/relabel"17)1819func init() {20component.Register(component.Registration{21Name: "loki.source.heroku",22Args: Arguments{},2324Build: func(opts component.Options, args component.Arguments) (component.Component, error) {25return New(opts, args.(Arguments))26},27})28}2930// Arguments holds values which are used to configure the loki.source.heroku31// component.32type Arguments struct {33Server *fnet.ServerConfig `river:",squash"`34Labels map[string]string `river:"labels,attr,optional"`35UseIncomingTimestamp bool `river:"use_incoming_timestamp,attr,optional"`36ForwardTo []loki.LogsReceiver `river:"forward_to,attr"`37RelabelRules flow_relabel.Rules `river:"relabel_rules,attr,optional"`38}3940// Component implements the loki.source.heroku component.41type Component struct {42opts component.Options43metrics *ht.Metrics // Metrics about Heroku entries.44serverMetrics *util.UncheckedCollector // Metircs about the HTTP server managed by the component.4546mut sync.RWMutex47args Arguments48fanout []loki.LogsReceiver49target *ht.HerokuTarget5051handler loki.LogsReceiver52}5354// New creates a new loki.source.heroku component.55func New(o component.Options, args Arguments) (*Component, error) {56c := &Component{57opts: o,58metrics: ht.NewMetrics(o.Registerer),59mut: sync.RWMutex{},60args: Arguments{},61fanout: args.ForwardTo,62target: nil,63handler: make(loki.LogsReceiver),64serverMetrics: util.NewUncheckedCollector(nil),65}6667o.Registerer.MustRegister(c.serverMetrics)6869// Call to Update() to start readers and set receivers once at the start.70if err := c.Update(args); err != nil {71return nil, err72}7374return c, nil75}7677// Run implements component.Component.78func (c *Component) Run(ctx context.Context) error {79defer func() {80c.mut.Lock()81defer c.mut.Unlock()8283level.Info(c.opts.Logger).Log("msg", "loki.source.heroku component shutting down, stopping listener")84if c.target != nil {85err := c.target.Stop()86if err != nil {87level.Error(c.opts.Logger).Log("msg", "error while stopping heroku listener", "err", err)88}89}90}()9192for {93select {94case <-ctx.Done():95return nil96case entry := <-c.handler:97c.mut.RLock()98for _, receiver := range c.fanout {99receiver <- entry100}101c.mut.RUnlock()102}103}104}105106// Update implements component.Component.107func (c *Component) Update(args component.Arguments) error {108c.mut.Lock()109defer c.mut.Unlock()110111newArgs := args.(Arguments)112c.fanout = newArgs.ForwardTo113114var rcs []*relabel.Config115if newArgs.RelabelRules != nil && len(newArgs.RelabelRules) > 0 {116rcs = flow_relabel.ComponentToPromRelabelConfigs(newArgs.RelabelRules)117}118119restartRequired := changed(c.args.Server, newArgs.Server) ||120changed(c.args.RelabelRules, newArgs.RelabelRules) ||121changed(c.args.Labels, newArgs.Labels) ||122c.args.UseIncomingTimestamp != newArgs.UseIncomingTimestamp123if restartRequired {124if c.target != nil {125err := c.target.Stop()126if err != nil {127level.Error(c.opts.Logger).Log("msg", "error while stopping heroku listener", "err", err)128}129}130131// [ht.NewHerokuTarget] registers new metrics every time it is called. To132// avoid issues with re-registering metrics with the same name, we create a133// new registry for the target every time we create one, and pass it to an134// unchecked collector to bypass uniqueness checking.135registry := prometheus.NewRegistry()136c.serverMetrics.SetCollector(registry)137138entryHandler := loki.NewEntryHandler(c.handler, func() {})139t, err := ht.NewHerokuTarget(c.metrics, c.opts.Logger, entryHandler, rcs, newArgs.Convert(), registry)140if err != nil {141level.Error(c.opts.Logger).Log("msg", "failed to create heroku listener with provided config", "err", err)142return err143}144145c.target = t146c.args = newArgs147}148149return nil150}151152// Convert is used to bridge between the River and Promtail types.153func (args *Arguments) Convert() *ht.HerokuDrainTargetConfig {154lbls := make(model.LabelSet, len(args.Labels))155for k, v := range args.Labels {156lbls[model.LabelName(k)] = model.LabelValue(v)157}158159return &ht.HerokuDrainTargetConfig{160Server: args.Server,161Labels: lbls,162UseIncomingTimestamp: args.UseIncomingTimestamp,163}164}165166// DebugInfo returns information about the status of listener.167func (c *Component) DebugInfo() interface{} {168c.mut.RLock()169defer c.mut.RUnlock()170171var res = readerDebugInfo{172Ready: c.target.Ready(),173Address: c.target.HTTPListenAddress(),174}175176return res177}178179type readerDebugInfo struct {180Ready bool `river:"ready,attr"`181Address string `river:"address,attr"`182}183184func changed(prev, next any) bool {185return !reflect.DeepEqual(prev, next)186}187188189