Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/prometheus/exporter/exporter.go
4095 views
1
package exporter
2
3
import (
4
"context"
5
"fmt"
6
"net/http"
7
"path"
8
"sync"
9
10
"github.com/go-kit/log/level"
11
"github.com/grafana/agent/component"
12
"github.com/grafana/agent/component/discovery"
13
"github.com/grafana/agent/pkg/integrations"
14
"github.com/prometheus/common/model"
15
)
16
17
// Creator is a function provided by an implementation to create a concrete exporter instance.
18
type Creator func(component.Options, component.Arguments) (integrations.Integration, error)
19
20
// Exports are simply a list of targets for a scraper to consume.
21
type Exports struct {
22
Targets []discovery.Target `river:"targets,attr"`
23
}
24
25
type Component struct {
26
opts component.Options
27
28
mut sync.Mutex
29
30
reload chan struct{}
31
32
creator Creator
33
multiTargetFunc func(discovery.Target, component.Arguments) []discovery.Target
34
baseTarget discovery.Target
35
36
exporter integrations.Integration
37
metricsHandler http.Handler
38
}
39
40
// New creates a new exporter component.
41
func New(creator Creator, name string) func(component.Options, component.Arguments) (component.Component, error) {
42
return newExporter(creator, name, nil)
43
}
44
45
// NewMultiTarget creates a new exporter component that supports multiple targets.
46
func NewMultiTarget(creator Creator, name string, multiTargetFunc func(discovery.Target, component.Arguments) []discovery.Target) func(component.Options, component.Arguments) (component.Component, error) {
47
return newExporter(creator, name, multiTargetFunc)
48
}
49
50
func newExporter(creator Creator, name string, multiTargetFunc func(discovery.Target, component.Arguments) []discovery.Target) func(component.Options, component.Arguments) (component.Component, error) {
51
return func(opts component.Options, args component.Arguments) (component.Component, error) {
52
c := &Component{
53
opts: opts,
54
reload: make(chan struct{}, 1),
55
creator: creator,
56
multiTargetFunc: multiTargetFunc,
57
}
58
jobName := fmt.Sprintf("integrations/%s", name)
59
c.baseTarget = discovery.Target{
60
model.AddressLabel: opts.HTTPListenAddr,
61
model.SchemeLabel: "http",
62
model.MetricsPathLabel: path.Join(opts.HTTPPath, "metrics"),
63
"instance": opts.ID,
64
"job": jobName,
65
"__meta_agent_integration_name": jobName,
66
"__meta_agent_integration_instance": opts.ID,
67
}
68
69
// Call to Update() to set the output once at the start.
70
if err := c.Update(args); err != nil {
71
return nil, err
72
}
73
74
return c, nil
75
}
76
}
77
78
// Run implements component.Component.
79
func (c *Component) Run(ctx context.Context) error {
80
var cancel context.CancelFunc
81
for {
82
select {
83
case <-ctx.Done():
84
return nil
85
case <-c.reload:
86
// cancel any previously running exporter
87
if cancel != nil {
88
cancel()
89
}
90
// create new context so we can cancel it if we get any future updates
91
// since it is derived from the main run context, it only needs to be
92
// canceled directly if we receive new updates
93
newCtx, cancelFunc := context.WithCancel(ctx)
94
cancel = cancelFunc
95
96
// finally create and run new exporter
97
c.mut.Lock()
98
exporter := c.exporter
99
c.metricsHandler = c.getHttpHandler(exporter)
100
c.mut.Unlock()
101
go func() {
102
if err := exporter.Run(newCtx); err != nil {
103
level.Error(c.opts.Logger).Log("msg", "error running exporter", "err", err)
104
}
105
}()
106
}
107
}
108
}
109
110
// Update implements component.Component.
111
func (c *Component) Update(args component.Arguments) error {
112
exporter, err := c.creator(c.opts, args)
113
if err != nil {
114
return err
115
}
116
c.mut.Lock()
117
c.exporter = exporter
118
119
var targets []discovery.Target
120
if c.multiTargetFunc == nil {
121
targets = []discovery.Target{c.baseTarget}
122
} else {
123
targets = c.multiTargetFunc(c.baseTarget, args)
124
}
125
126
c.opts.OnStateChange(Exports{
127
Targets: targets,
128
})
129
c.mut.Unlock()
130
select {
131
case c.reload <- struct{}{}:
132
default:
133
}
134
return err
135
}
136
137
// get the http handler once and save it, so we don't create extra garbage
138
func (c *Component) getHttpHandler(integration integrations.Integration) http.Handler {
139
h, err := integration.MetricsHandler()
140
if err != nil {
141
level.Error(c.opts.Logger).Log("msg", "failed to creating metrics handler", "err", err)
142
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
143
w.WriteHeader(http.StatusInternalServerError)
144
})
145
}
146
return h
147
}
148
149
// Handler serves metrics endpoint from the integration implementation.
150
func (c *Component) Handler() http.Handler {
151
c.mut.Lock()
152
defer c.mut.Unlock()
153
return c.metricsHandler
154
}
155
156