Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/integrations/v2/subsystem.go
5304 views
1
package integrations
2
3
import (
4
"context"
5
"encoding/json"
6
"fmt"
7
"net/http"
8
"sync"
9
"time"
10
11
"github.com/go-kit/log"
12
"github.com/gorilla/mux"
13
"github.com/grafana/agent/pkg/integrations/v2/autoscrape"
14
"github.com/grafana/agent/pkg/metrics"
15
"github.com/prometheus/common/model"
16
http_sd "github.com/prometheus/prometheus/discovery/http"
17
)
18
19
const (
20
// IntegrationsSDEndpoint is the API endpoint where the integration HTTP SD
21
// API is exposed. The API uses query parameters to customize what gets
22
// returned by discovery.
23
IntegrationsSDEndpoint = "/agent/api/v1/metrics/integrations/sd"
24
25
// IntegrationsAutoscrapeTargetsEndpoint is the API endpoint where autoscrape
26
// integrations targets are exposed.
27
IntegrationsAutoscrapeTargetsEndpoint = "/agent/api/v1/metrics/integrations/targets"
28
)
29
30
// DefaultSubsystemOptions holds the default settings for a Controller.
31
var (
32
DefaultSubsystemOptions = SubsystemOptions{
33
Metrics: DefaultMetricsSubsystemOptions,
34
}
35
36
DefaultMetricsSubsystemOptions = MetricsSubsystemOptions{
37
Autoscrape: autoscrape.DefaultGlobal,
38
}
39
)
40
41
// SubsystemOptions controls how the integrations subsystem behaves.
42
type SubsystemOptions struct {
43
Metrics MetricsSubsystemOptions `yaml:"metrics,omitempty"`
44
45
// Configs are configurations of integration to create. Unmarshaled through
46
// the custom UnmarshalYAML method of Controller.
47
Configs Configs `yaml:"-"`
48
}
49
50
// MetricsSubsystemOptions controls how metrics integrations behave.
51
type MetricsSubsystemOptions struct {
52
Autoscrape autoscrape.Global `yaml:"autoscrape,omitempty"`
53
}
54
55
// ApplyDefaults will apply defaults to o.
56
func (o *SubsystemOptions) ApplyDefaults(mcfg *metrics.Config) error {
57
if o.Metrics.Autoscrape.ScrapeInterval == 0 {
58
o.Metrics.Autoscrape.ScrapeInterval = mcfg.Global.Prometheus.ScrapeInterval
59
}
60
if o.Metrics.Autoscrape.ScrapeTimeout == 0 {
61
o.Metrics.Autoscrape.ScrapeTimeout = mcfg.Global.Prometheus.ScrapeTimeout
62
}
63
64
return nil
65
}
66
67
// MarshalYAML implements yaml.Marshaler for SubsystemOptions. Integrations
68
// will be marshaled inline.
69
func (o SubsystemOptions) MarshalYAML() (interface{}, error) {
70
return MarshalYAML(o)
71
}
72
73
// UnmarshalYAML implements yaml.Unmarshaler for SubsystemOptions. Inline
74
// integrations will be unmarshaled into o.Configs.
75
func (o *SubsystemOptions) UnmarshalYAML(unmarshal func(interface{}) error) error {
76
*o = DefaultSubsystemOptions
77
return UnmarshalYAML(o, unmarshal)
78
}
79
80
// Subsystem runs the integrations subsystem, managing a set of integrations.
81
type Subsystem struct {
82
logger log.Logger
83
84
mut sync.RWMutex
85
globals Globals
86
apiHandler http.Handler // generated from controller
87
autoscraper *autoscrape.Scraper
88
89
ctrl *controller
90
stopController context.CancelFunc
91
controllerExited chan struct{}
92
}
93
94
// NewSubsystem creates and starts a new integrations Subsystem. Every field in
95
// IntegrationOptions must be filled out.
96
func NewSubsystem(l log.Logger, globals Globals) (*Subsystem, error) {
97
autoscraper := autoscrape.NewScraper(l, globals.Metrics.InstanceManager(), globals.DialContextFunc)
98
99
l = log.With(l, "component", "integrations")
100
101
ctrl, err := newController(l, controllerConfig(globals.SubsystemOpts.Configs), globals)
102
if err != nil {
103
autoscraper.Stop()
104
return nil, err
105
}
106
107
ctx, cancel := context.WithCancel(context.Background())
108
109
ctrlExited := make(chan struct{})
110
go func() {
111
ctrl.run(ctx)
112
close(ctrlExited)
113
}()
114
115
s := &Subsystem{
116
logger: l,
117
118
globals: globals,
119
autoscraper: autoscraper,
120
121
ctrl: ctrl,
122
stopController: cancel,
123
controllerExited: ctrlExited,
124
}
125
if err := s.ApplyConfig(globals); err != nil {
126
cancel()
127
autoscraper.Stop()
128
return nil, err
129
}
130
return s, nil
131
}
132
133
// ApplyConfig updates the configuration of the integrations subsystem.
134
func (s *Subsystem) ApplyConfig(globals Globals) error {
135
const prefix = "/integrations/"
136
137
s.mut.Lock()
138
defer s.mut.Unlock()
139
140
if err := s.ctrl.UpdateController(controllerConfig(globals.SubsystemOpts.Configs), globals); err != nil {
141
return fmt.Errorf("error applying integrations: %w", err)
142
}
143
144
var firstErr error
145
saveFirstErr := func(err error) {
146
if firstErr == nil {
147
firstErr = err
148
}
149
}
150
151
// Set up HTTP wiring
152
{
153
handler, err := s.ctrl.Handler(prefix)
154
if err != nil {
155
saveFirstErr(fmt.Errorf("HTTP handler update failed: %w", err))
156
}
157
s.apiHandler = handler
158
}
159
160
// Set up self-scraping
161
{
162
httpSDConfig := http_sd.DefaultSDConfig
163
httpSDConfig.RefreshInterval = model.Duration(time.Second * 5) // TODO(rfratto): make configurable?
164
165
apiURL := globals.CloneAgentBaseURL()
166
apiURL.Path = IntegrationsSDEndpoint
167
httpSDConfig.URL = apiURL.String()
168
169
scrapeConfigs := s.ctrl.ScrapeConfigs(prefix, &httpSDConfig)
170
if err := s.autoscraper.ApplyConfig(scrapeConfigs); err != nil {
171
saveFirstErr(fmt.Errorf("configuring autoscraper failed: %w", err))
172
}
173
}
174
175
s.globals = globals
176
return firstErr
177
}
178
179
// WireAPI hooks up integration endpoints to r.
180
func (s *Subsystem) WireAPI(r *mux.Router) {
181
const prefix = "/integrations"
182
r.PathPrefix(prefix).HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
183
s.mut.RLock()
184
handler := s.apiHandler
185
s.mut.RUnlock()
186
187
if handler == nil {
188
rw.WriteHeader(http.StatusServiceUnavailable)
189
fmt.Fprintf(rw, "Integrations HTTP endpoints not yet available")
190
return
191
}
192
handler.ServeHTTP(rw, r)
193
})
194
195
r.HandleFunc(IntegrationsSDEndpoint, func(rw http.ResponseWriter, r *http.Request) {
196
targetOptions, err := TargetOptionsFromParams(r.URL.Query())
197
if err != nil {
198
http.Error(rw, fmt.Sprintf("invalid query parameters: %s", err), http.StatusBadRequest)
199
return
200
}
201
202
rw.Header().Set("Content-Type", "application/json")
203
rw.WriteHeader(http.StatusOK)
204
205
tgs := s.ctrl.Targets(Endpoint{
206
Host: r.Host,
207
Prefix: prefix,
208
}, targetOptions)
209
210
// Normalize targets. We may have targets in the group with non-address
211
// labels. These need to be retained, so we'll just split everything up
212
// into multiple groups.
213
//
214
// TODO(rfratto): optimize to remove redundant groups
215
finalTgs := []*targetGroup{}
216
for _, group := range tgs {
217
for _, target := range group.Targets {
218
// Create the final labels for the group. This will be everything from
219
// the group and the target (except for model.AddressLabel). Labels
220
// from target take precedence labels from over group.
221
groupLabels := group.Labels.Merge(target)
222
delete(groupLabels, model.AddressLabel)
223
224
finalTgs = append(finalTgs, &targetGroup{
225
Targets: []model.LabelSet{{model.AddressLabel: target[model.AddressLabel]}},
226
Labels: groupLabels,
227
})
228
}
229
}
230
231
enc := json.NewEncoder(rw)
232
_ = enc.Encode(finalTgs)
233
})
234
235
r.HandleFunc(IntegrationsAutoscrapeTargetsEndpoint, func(rw http.ResponseWriter, r *http.Request) {
236
allTargets := s.autoscraper.TargetsActive()
237
metrics.ListTargetsHandler(allTargets).ServeHTTP(rw, r)
238
})
239
}
240
241
// Stop stops the manager and all running integrations. Blocks until all
242
// running integrations exit.
243
func (s *Subsystem) Stop() {
244
s.autoscraper.Stop()
245
s.stopController()
246
<-s.controllerExited
247
}
248
249