Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/heroku/heroku.go
4096 views
1
package heroku
2
3
import (
4
"context"
5
"reflect"
6
"sync"
7
8
"github.com/go-kit/log/level"
9
"github.com/grafana/agent/component"
10
"github.com/grafana/agent/component/common/loki"
11
fnet "github.com/grafana/agent/component/common/net"
12
flow_relabel "github.com/grafana/agent/component/common/relabel"
13
ht "github.com/grafana/agent/component/loki/source/heroku/internal/herokutarget"
14
"github.com/grafana/agent/pkg/util"
15
"github.com/prometheus/client_golang/prometheus"
16
"github.com/prometheus/common/model"
17
"github.com/prometheus/prometheus/model/relabel"
18
)
19
20
func init() {
21
component.Register(component.Registration{
22
Name: "loki.source.heroku",
23
Args: Arguments{},
24
25
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
26
return New(opts, args.(Arguments))
27
},
28
})
29
}
30
31
// Arguments holds values which are used to configure the loki.source.heroku
32
// component.
33
type Arguments struct {
34
Server *fnet.ServerConfig `river:",squash"`
35
Labels map[string]string `river:"labels,attr,optional"`
36
UseIncomingTimestamp bool `river:"use_incoming_timestamp,attr,optional"`
37
ForwardTo []loki.LogsReceiver `river:"forward_to,attr"`
38
RelabelRules flow_relabel.Rules `river:"relabel_rules,attr,optional"`
39
}
40
41
// Component implements the loki.source.heroku component.
42
type Component struct {
43
opts component.Options
44
metrics *ht.Metrics // Metrics about Heroku entries.
45
serverMetrics *util.UncheckedCollector // Metircs about the HTTP server managed by the component.
46
47
mut sync.RWMutex
48
args Arguments
49
fanout []loki.LogsReceiver
50
target *ht.HerokuTarget
51
52
handler loki.LogsReceiver
53
}
54
55
// New creates a new loki.source.heroku component.
56
func New(o component.Options, args Arguments) (*Component, error) {
57
c := &Component{
58
opts: o,
59
metrics: ht.NewMetrics(o.Registerer),
60
mut: sync.RWMutex{},
61
args: Arguments{},
62
fanout: args.ForwardTo,
63
target: nil,
64
handler: make(loki.LogsReceiver),
65
serverMetrics: util.NewUncheckedCollector(nil),
66
}
67
68
o.Registerer.MustRegister(c.serverMetrics)
69
70
// Call to Update() to start readers and set receivers once at the start.
71
if err := c.Update(args); err != nil {
72
return nil, err
73
}
74
75
return c, nil
76
}
77
78
// Run implements component.Component.
79
func (c *Component) Run(ctx context.Context) error {
80
defer func() {
81
c.mut.Lock()
82
defer c.mut.Unlock()
83
84
level.Info(c.opts.Logger).Log("msg", "loki.source.heroku component shutting down, stopping listener")
85
if c.target != nil {
86
err := c.target.Stop()
87
if err != nil {
88
level.Error(c.opts.Logger).Log("msg", "error while stopping heroku listener", "err", err)
89
}
90
}
91
}()
92
93
for {
94
select {
95
case <-ctx.Done():
96
return nil
97
case entry := <-c.handler:
98
c.mut.RLock()
99
for _, receiver := range c.fanout {
100
receiver <- entry
101
}
102
c.mut.RUnlock()
103
}
104
}
105
}
106
107
// Update implements component.Component.
108
func (c *Component) Update(args component.Arguments) error {
109
c.mut.Lock()
110
defer c.mut.Unlock()
111
112
newArgs := args.(Arguments)
113
c.fanout = newArgs.ForwardTo
114
115
var rcs []*relabel.Config
116
if newArgs.RelabelRules != nil && len(newArgs.RelabelRules) > 0 {
117
rcs = flow_relabel.ComponentToPromRelabelConfigs(newArgs.RelabelRules)
118
}
119
120
restartRequired := changed(c.args.Server, newArgs.Server) ||
121
changed(c.args.RelabelRules, newArgs.RelabelRules) ||
122
changed(c.args.Labels, newArgs.Labels) ||
123
c.args.UseIncomingTimestamp != newArgs.UseIncomingTimestamp
124
if restartRequired {
125
if c.target != nil {
126
err := c.target.Stop()
127
if err != nil {
128
level.Error(c.opts.Logger).Log("msg", "error while stopping heroku listener", "err", err)
129
}
130
}
131
132
// [ht.NewHerokuTarget] registers new metrics every time it is called. To
133
// avoid issues with re-registering metrics with the same name, we create a
134
// new registry for the target every time we create one, and pass it to an
135
// unchecked collector to bypass uniqueness checking.
136
registry := prometheus.NewRegistry()
137
c.serverMetrics.SetCollector(registry)
138
139
entryHandler := loki.NewEntryHandler(c.handler, func() {})
140
t, err := ht.NewHerokuTarget(c.metrics, c.opts.Logger, entryHandler, rcs, newArgs.Convert(), registry)
141
if err != nil {
142
level.Error(c.opts.Logger).Log("msg", "failed to create heroku listener with provided config", "err", err)
143
return err
144
}
145
146
c.target = t
147
c.args = newArgs
148
}
149
150
return nil
151
}
152
153
// Convert is used to bridge between the River and Promtail types.
154
func (args *Arguments) Convert() *ht.HerokuDrainTargetConfig {
155
lbls := make(model.LabelSet, len(args.Labels))
156
for k, v := range args.Labels {
157
lbls[model.LabelName(k)] = model.LabelValue(v)
158
}
159
160
return &ht.HerokuDrainTargetConfig{
161
Server: args.Server,
162
Labels: lbls,
163
UseIncomingTimestamp: args.UseIncomingTimestamp,
164
}
165
}
166
167
// DebugInfo returns information about the status of listener.
168
func (c *Component) DebugInfo() interface{} {
169
c.mut.RLock()
170
defer c.mut.RUnlock()
171
172
var res = readerDebugInfo{
173
Ready: c.target.Ready(),
174
Address: c.target.HTTPListenAddress(),
175
}
176
177
return res
178
}
179
180
type readerDebugInfo struct {
181
Ready bool `river:"ready,attr"`
182
Address string `river:"address,attr"`
183
}
184
185
func changed(prev, next any) bool {
186
return !reflect.DeepEqual(prev, next)
187
}
188
189