Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/metrics/instance/configstore/api.go
5333 views
1
package configstore
2
3
import (
4
"errors"
5
"fmt"
6
"io"
7
"net/http"
8
"net/url"
9
"strings"
10
"sync"
11
12
"github.com/go-kit/log"
13
"github.com/go-kit/log/level"
14
"github.com/gorilla/mux"
15
"github.com/grafana/agent/pkg/metrics/cluster/configapi"
16
"github.com/grafana/agent/pkg/metrics/instance"
17
"github.com/prometheus/client_golang/prometheus"
18
)
19
20
// API is an HTTP API to interact with a configstore.
21
type API struct {
22
log log.Logger
23
storeMut sync.Mutex
24
store Store
25
validator Validator
26
27
totalCreatedConfigs prometheus.Counter
28
totalUpdatedConfigs prometheus.Counter
29
totalDeletedConfigs prometheus.Counter
30
31
enableGet bool
32
}
33
34
// Validator valides a config before putting it into the store.
35
// Validator is allowed to mutate the config and will only be given a copy.
36
type Validator = func(c *instance.Config) error
37
38
// NewAPI creates a new API. Store can be applied later with SetStore.
39
func NewAPI(l log.Logger, store Store, v Validator, enableGet bool) *API {
40
return &API{
41
log: l,
42
store: store,
43
validator: v,
44
45
totalCreatedConfigs: prometheus.NewCounter(prometheus.CounterOpts{
46
Name: "agent_metrics_ha_configs_created_total",
47
Help: "Total number of created scraping service configs",
48
}),
49
totalUpdatedConfigs: prometheus.NewCounter(prometheus.CounterOpts{
50
Name: "agent_metrics_ha_configs_updated_total",
51
Help: "Total number of updated scraping service configs",
52
}),
53
totalDeletedConfigs: prometheus.NewCounter(prometheus.CounterOpts{
54
Name: "agent_metrics_ha_configs_deleted_total",
55
Help: "Total number of deleted scraping service configs",
56
}),
57
enableGet: enableGet,
58
}
59
}
60
61
// WireAPI injects routes into the provided mux router for the config
62
// store API.
63
func (api *API) WireAPI(r *mux.Router) {
64
// Support URL-encoded config names. The handlers will need to decode the
65
// name when reading the path variable.
66
r = r.UseEncodedPath()
67
68
r.HandleFunc("/agent/api/v1/configs", api.ListConfigurations).Methods("GET")
69
getConfigHandler := messageHandlerFunc(http.StatusNotFound, "404 - config endpoint is disabled")
70
if api.enableGet {
71
getConfigHandler = api.GetConfiguration
72
}
73
r.HandleFunc("/agent/api/v1/configs/{name}", getConfigHandler).Methods("GET")
74
r.HandleFunc("/agent/api/v1/config/{name}", api.PutConfiguration).Methods("PUT", "POST")
75
r.HandleFunc("/agent/api/v1/config/{name}", api.DeleteConfiguration).Methods("DELETE")
76
}
77
78
// Describe implements prometheus.Collector.
79
func (api *API) Describe(ch chan<- *prometheus.Desc) {
80
ch <- api.totalCreatedConfigs.Desc()
81
ch <- api.totalUpdatedConfigs.Desc()
82
ch <- api.totalDeletedConfigs.Desc()
83
}
84
85
// Collect implements prometheus.Collector.
86
func (api *API) Collect(mm chan<- prometheus.Metric) {
87
mm <- api.totalCreatedConfigs
88
mm <- api.totalUpdatedConfigs
89
mm <- api.totalDeletedConfigs
90
}
91
92
// ListConfigurations returns a list of configurations.
93
func (api *API) ListConfigurations(rw http.ResponseWriter, r *http.Request) {
94
api.storeMut.Lock()
95
defer api.storeMut.Unlock()
96
if api.store == nil {
97
api.writeError(rw, http.StatusNotFound, fmt.Errorf("no config store running"))
98
return
99
}
100
101
keys, err := api.store.List(r.Context())
102
if errors.Is(err, ErrNotConnected) {
103
api.writeError(rw, http.StatusNotFound, fmt.Errorf("no config store running"))
104
return
105
} else if err != nil {
106
api.writeError(rw, http.StatusInternalServerError, fmt.Errorf("failed to write config: %w", err))
107
return
108
}
109
api.writeResponse(rw, http.StatusOK, configapi.ListConfigurationsResponse{Configs: keys})
110
}
111
112
// GetConfiguration gets an individual configuration.
113
func (api *API) GetConfiguration(rw http.ResponseWriter, r *http.Request) {
114
api.storeMut.Lock()
115
defer api.storeMut.Unlock()
116
if api.store == nil {
117
api.writeError(rw, http.StatusNotFound, fmt.Errorf("no config store running"))
118
return
119
}
120
121
configKey, err := getConfigName(r)
122
if err != nil {
123
api.writeError(rw, http.StatusBadRequest, err)
124
return
125
}
126
127
cfg, err := api.store.Get(r.Context(), configKey)
128
switch {
129
case errors.Is(err, ErrNotConnected):
130
api.writeError(rw, http.StatusNotFound, err)
131
case errors.As(err, &NotExistError{}):
132
api.writeError(rw, http.StatusNotFound, err)
133
case err != nil:
134
api.writeError(rw, http.StatusInternalServerError, err)
135
case err == nil:
136
bb, err := instance.MarshalConfig(&cfg, true)
137
if err != nil {
138
api.writeError(rw, http.StatusInternalServerError, fmt.Errorf("could not marshal config for response: %w", err))
139
return
140
}
141
api.writeResponse(rw, http.StatusOK, &configapi.GetConfigurationResponse{
142
Value: string(bb),
143
})
144
}
145
}
146
147
// PutConfiguration creates or updates a configuration.
148
func (api *API) PutConfiguration(rw http.ResponseWriter, r *http.Request) {
149
api.storeMut.Lock()
150
defer api.storeMut.Unlock()
151
if api.store == nil {
152
api.writeError(rw, http.StatusNotFound, fmt.Errorf("no config store running"))
153
return
154
}
155
156
configName, err := getConfigName(r)
157
if err != nil {
158
api.writeError(rw, http.StatusBadRequest, err)
159
return
160
}
161
162
var config strings.Builder
163
if _, err := io.Copy(&config, r.Body); err != nil {
164
api.writeError(rw, http.StatusInternalServerError, err)
165
return
166
}
167
168
cfg, err := instance.UnmarshalConfig(strings.NewReader(config.String()))
169
if err != nil {
170
api.writeError(rw, http.StatusBadRequest, fmt.Errorf("could not unmarshal config: %w", err))
171
return
172
}
173
cfg.Name = configName
174
175
if api.validator != nil {
176
validateCfg, err := instance.UnmarshalConfig(strings.NewReader(config.String()))
177
if err != nil {
178
api.writeError(rw, http.StatusBadRequest, fmt.Errorf("could not unmarshal config: %w", err))
179
return
180
}
181
validateCfg.Name = configName
182
183
if err := api.validator(validateCfg); err != nil {
184
api.writeError(rw, http.StatusBadRequest, fmt.Errorf("failed to validate config: %w", err))
185
return
186
}
187
}
188
189
created, err := api.store.Put(r.Context(), *cfg)
190
switch {
191
case errors.Is(err, ErrNotConnected):
192
api.writeError(rw, http.StatusNotFound, err)
193
case errors.As(err, &NotUniqueError{}):
194
api.writeError(rw, http.StatusBadRequest, err)
195
case err != nil:
196
api.writeError(rw, http.StatusInternalServerError, err)
197
default:
198
if created {
199
api.totalCreatedConfigs.Inc()
200
api.writeResponse(rw, http.StatusCreated, nil)
201
} else {
202
api.totalUpdatedConfigs.Inc()
203
api.writeResponse(rw, http.StatusOK, nil)
204
}
205
}
206
}
207
208
// DeleteConfiguration deletes a configuration.
209
func (api *API) DeleteConfiguration(rw http.ResponseWriter, r *http.Request) {
210
api.storeMut.Lock()
211
defer api.storeMut.Unlock()
212
if api.store == nil {
213
api.writeError(rw, http.StatusNotFound, fmt.Errorf("no config store running"))
214
return
215
}
216
217
configKey, err := getConfigName(r)
218
if err != nil {
219
api.writeError(rw, http.StatusBadRequest, err)
220
return
221
}
222
223
err = api.store.Delete(r.Context(), configKey)
224
switch {
225
case errors.Is(err, ErrNotConnected):
226
api.writeError(rw, http.StatusNotFound, err)
227
case errors.As(err, &NotExistError{}):
228
api.writeError(rw, http.StatusNotFound, err)
229
case err != nil:
230
api.writeError(rw, http.StatusInternalServerError, err)
231
default:
232
api.totalDeletedConfigs.Inc()
233
api.writeResponse(rw, http.StatusOK, nil)
234
}
235
}
236
237
func (api *API) writeError(rw http.ResponseWriter, statusCode int, writeErr error) {
238
err := configapi.WriteError(rw, statusCode, writeErr)
239
if err != nil {
240
level.Error(api.log).Log("msg", "failed to write response", "err", err)
241
}
242
}
243
244
func (api *API) writeResponse(rw http.ResponseWriter, statusCode int, v interface{}) {
245
err := configapi.WriteResponse(rw, statusCode, v)
246
if err != nil {
247
level.Error(api.log).Log("msg", "failed to write response", "err", err)
248
}
249
}
250
251
// getConfigName uses gorilla/mux's route variables to extract the
252
// "name" variable. If not found, getConfigName will return an error.
253
func getConfigName(r *http.Request) (string, error) {
254
vars := mux.Vars(r)
255
name := vars["name"]
256
name, err := url.PathUnescape(name)
257
if err != nil {
258
return "", fmt.Errorf("could not decode config name: %w", err)
259
}
260
return name, nil
261
}
262
263
func messageHandlerFunc(statusCode int, msg string) http.HandlerFunc {
264
return func(rw http.ResponseWriter, r *http.Request) {
265
rw.WriteHeader(statusCode)
266
_, _ = rw.Write([]byte(msg))
267
}
268
}
269
270