Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/otelcol/exporter/prometheus/prometheus.go
4096 views
1
// Package prometheus provides an otelcol.exporter.prometheus component.
2
package prometheus
3
4
import (
5
"context"
6
"fmt"
7
"sync"
8
"time"
9
10
"github.com/go-kit/log"
11
"github.com/grafana/agent/component"
12
"github.com/grafana/agent/component/otelcol"
13
"github.com/grafana/agent/component/otelcol/exporter/prometheus/internal/convert"
14
"github.com/grafana/agent/component/otelcol/internal/lazyconsumer"
15
"github.com/grafana/agent/component/prometheus"
16
"github.com/grafana/agent/pkg/river"
17
"github.com/prometheus/prometheus/storage"
18
)
19
20
func init() {
21
component.Register(component.Registration{
22
Name: "otelcol.exporter.prometheus",
23
Args: Arguments{},
24
Exports: otelcol.ConsumerExports{},
25
26
Build: func(o component.Options, a component.Arguments) (component.Component, error) {
27
return New(o, a.(Arguments))
28
},
29
})
30
}
31
32
// Arguments configures the otelcol.exporter.prometheus component.
33
type Arguments struct {
34
IncludeTargetInfo bool `river:"include_target_info,attr,optional"`
35
IncludeScopeInfo bool `river:"include_scope_info,attr,optional"`
36
GCFrequency time.Duration `river:"gc_frequency,attr,optional"`
37
ForwardTo []storage.Appendable `river:"forward_to,attr"`
38
}
39
40
var _ river.Unmarshaler = (*Arguments)(nil)
41
42
// DefaultArguments holds defaults values.
43
var DefaultArguments = Arguments{
44
IncludeTargetInfo: true,
45
IncludeScopeInfo: true,
46
GCFrequency: 5 * time.Minute,
47
}
48
49
// UnmarshalRiver implements river.Unmarshaler and applies defaults.
50
func (args *Arguments) UnmarshalRiver(f func(interface{}) error) error {
51
*args = DefaultArguments
52
53
type arguments Arguments
54
if err := f((*arguments)(args)); err != nil {
55
return err
56
}
57
58
if args.GCFrequency == 0 {
59
return fmt.Errorf("gc_frequency must be greater than 0")
60
}
61
62
return nil
63
}
64
65
// Component is the otelcol.exporter.prometheus component.
66
type Component struct {
67
log log.Logger
68
opts component.Options
69
70
fanout *prometheus.Fanout
71
converter *convert.Converter
72
73
mut sync.RWMutex
74
cfg Arguments
75
}
76
77
var _ component.Component = (*Component)(nil)
78
79
// New creates a new otelcol.exporter.prometheus component.
80
func New(o component.Options, c Arguments) (*Component, error) {
81
fanout := prometheus.NewFanout(nil, o.ID, o.Registerer)
82
83
converter := convert.New(o.Logger, fanout, convert.Options{
84
IncludeTargetInfo: true,
85
IncludeScopeInfo: true,
86
})
87
88
res := &Component{
89
log: o.Logger,
90
opts: o,
91
92
fanout: fanout,
93
converter: converter,
94
}
95
if err := res.Update(c); err != nil {
96
return nil, err
97
}
98
99
// Construct a consumer based on our converter and export it. This will
100
// remain the same throughout the component's lifetime, so we do this during
101
// component construction.
102
export := lazyconsumer.New(context.Background())
103
export.SetConsumers(nil, converter, nil)
104
o.OnStateChange(otelcol.ConsumerExports{Input: export})
105
106
return res, nil
107
}
108
109
// Run implements Component.
110
func (c *Component) Run(ctx context.Context) error {
111
for {
112
select {
113
case <-ctx.Done():
114
return nil
115
case <-time.After(c.nextGC()):
116
// TODO(rfratto): we may want to consider making this an option in the
117
// future, but hard-coding to 5 minutes is a reasonable default to start
118
// with.
119
c.converter.GC(5 * time.Minute)
120
}
121
}
122
}
123
124
func (c *Component) nextGC() time.Duration {
125
c.mut.RLock()
126
defer c.mut.RUnlock()
127
return c.cfg.GCFrequency
128
}
129
130
// Update implements Component.
131
func (c *Component) Update(newConfig component.Arguments) error {
132
c.mut.Lock()
133
defer c.mut.Unlock()
134
135
cfg := newConfig.(Arguments)
136
c.cfg = cfg
137
138
c.fanout.UpdateChildren(cfg.ForwardTo)
139
c.converter.UpdateOptions(convert.Options{
140
IncludeTargetInfo: cfg.IncludeTargetInfo,
141
IncludeScopeInfo: cfg.IncludeScopeInfo,
142
})
143
144
// If our forward_to argument changed, we need to flush the metadata cache to
145
// ensure the new children have all the metadata they need.
146
//
147
// For now, we always flush whenever we update, but we could do something
148
// more intelligent here in the future.
149
c.converter.FlushMetadata()
150
return nil
151
}
152
153