Path: blob/main/pkg/metrics/instance/configstore/api.go
5333 views
package configstore12import (3"errors"4"fmt"5"io"6"net/http"7"net/url"8"strings"9"sync"1011"github.com/go-kit/log"12"github.com/go-kit/log/level"13"github.com/gorilla/mux"14"github.com/grafana/agent/pkg/metrics/cluster/configapi"15"github.com/grafana/agent/pkg/metrics/instance"16"github.com/prometheus/client_golang/prometheus"17)1819// API is an HTTP API to interact with a configstore.20type API struct {21log log.Logger22storeMut sync.Mutex23store Store24validator Validator2526totalCreatedConfigs prometheus.Counter27totalUpdatedConfigs prometheus.Counter28totalDeletedConfigs prometheus.Counter2930enableGet bool31}3233// Validator valides a config before putting it into the store.34// Validator is allowed to mutate the config and will only be given a copy.35type Validator = func(c *instance.Config) error3637// NewAPI creates a new API. Store can be applied later with SetStore.38func NewAPI(l log.Logger, store Store, v Validator, enableGet bool) *API {39return &API{40log: l,41store: store,42validator: v,4344totalCreatedConfigs: prometheus.NewCounter(prometheus.CounterOpts{45Name: "agent_metrics_ha_configs_created_total",46Help: "Total number of created scraping service configs",47}),48totalUpdatedConfigs: prometheus.NewCounter(prometheus.CounterOpts{49Name: "agent_metrics_ha_configs_updated_total",50Help: "Total number of updated scraping service configs",51}),52totalDeletedConfigs: prometheus.NewCounter(prometheus.CounterOpts{53Name: "agent_metrics_ha_configs_deleted_total",54Help: "Total number of deleted scraping service configs",55}),56enableGet: enableGet,57}58}5960// WireAPI injects routes into the provided mux router for the config61// store API.62func (api *API) WireAPI(r *mux.Router) {63// Support URL-encoded config names. The handlers will need to decode the64// name when reading the path variable.65r = r.UseEncodedPath()6667r.HandleFunc("/agent/api/v1/configs", api.ListConfigurations).Methods("GET")68getConfigHandler := messageHandlerFunc(http.StatusNotFound, "404 - config endpoint is disabled")69if api.enableGet {70getConfigHandler = api.GetConfiguration71}72r.HandleFunc("/agent/api/v1/configs/{name}", getConfigHandler).Methods("GET")73r.HandleFunc("/agent/api/v1/config/{name}", api.PutConfiguration).Methods("PUT", "POST")74r.HandleFunc("/agent/api/v1/config/{name}", api.DeleteConfiguration).Methods("DELETE")75}7677// Describe implements prometheus.Collector.78func (api *API) Describe(ch chan<- *prometheus.Desc) {79ch <- api.totalCreatedConfigs.Desc()80ch <- api.totalUpdatedConfigs.Desc()81ch <- api.totalDeletedConfigs.Desc()82}8384// Collect implements prometheus.Collector.85func (api *API) Collect(mm chan<- prometheus.Metric) {86mm <- api.totalCreatedConfigs87mm <- api.totalUpdatedConfigs88mm <- api.totalDeletedConfigs89}9091// ListConfigurations returns a list of configurations.92func (api *API) ListConfigurations(rw http.ResponseWriter, r *http.Request) {93api.storeMut.Lock()94defer api.storeMut.Unlock()95if api.store == nil {96api.writeError(rw, http.StatusNotFound, fmt.Errorf("no config store running"))97return98}99100keys, err := api.store.List(r.Context())101if errors.Is(err, ErrNotConnected) {102api.writeError(rw, http.StatusNotFound, fmt.Errorf("no config store running"))103return104} else if err != nil {105api.writeError(rw, http.StatusInternalServerError, fmt.Errorf("failed to write config: %w", err))106return107}108api.writeResponse(rw, http.StatusOK, configapi.ListConfigurationsResponse{Configs: keys})109}110111// GetConfiguration gets an individual configuration.112func (api *API) GetConfiguration(rw http.ResponseWriter, r *http.Request) {113api.storeMut.Lock()114defer api.storeMut.Unlock()115if api.store == nil {116api.writeError(rw, http.StatusNotFound, fmt.Errorf("no config store running"))117return118}119120configKey, err := getConfigName(r)121if err != nil {122api.writeError(rw, http.StatusBadRequest, err)123return124}125126cfg, err := api.store.Get(r.Context(), configKey)127switch {128case errors.Is(err, ErrNotConnected):129api.writeError(rw, http.StatusNotFound, err)130case errors.As(err, &NotExistError{}):131api.writeError(rw, http.StatusNotFound, err)132case err != nil:133api.writeError(rw, http.StatusInternalServerError, err)134case err == nil:135bb, err := instance.MarshalConfig(&cfg, true)136if err != nil {137api.writeError(rw, http.StatusInternalServerError, fmt.Errorf("could not marshal config for response: %w", err))138return139}140api.writeResponse(rw, http.StatusOK, &configapi.GetConfigurationResponse{141Value: string(bb),142})143}144}145146// PutConfiguration creates or updates a configuration.147func (api *API) PutConfiguration(rw http.ResponseWriter, r *http.Request) {148api.storeMut.Lock()149defer api.storeMut.Unlock()150if api.store == nil {151api.writeError(rw, http.StatusNotFound, fmt.Errorf("no config store running"))152return153}154155configName, err := getConfigName(r)156if err != nil {157api.writeError(rw, http.StatusBadRequest, err)158return159}160161var config strings.Builder162if _, err := io.Copy(&config, r.Body); err != nil {163api.writeError(rw, http.StatusInternalServerError, err)164return165}166167cfg, err := instance.UnmarshalConfig(strings.NewReader(config.String()))168if err != nil {169api.writeError(rw, http.StatusBadRequest, fmt.Errorf("could not unmarshal config: %w", err))170return171}172cfg.Name = configName173174if api.validator != nil {175validateCfg, err := instance.UnmarshalConfig(strings.NewReader(config.String()))176if err != nil {177api.writeError(rw, http.StatusBadRequest, fmt.Errorf("could not unmarshal config: %w", err))178return179}180validateCfg.Name = configName181182if err := api.validator(validateCfg); err != nil {183api.writeError(rw, http.StatusBadRequest, fmt.Errorf("failed to validate config: %w", err))184return185}186}187188created, err := api.store.Put(r.Context(), *cfg)189switch {190case errors.Is(err, ErrNotConnected):191api.writeError(rw, http.StatusNotFound, err)192case errors.As(err, &NotUniqueError{}):193api.writeError(rw, http.StatusBadRequest, err)194case err != nil:195api.writeError(rw, http.StatusInternalServerError, err)196default:197if created {198api.totalCreatedConfigs.Inc()199api.writeResponse(rw, http.StatusCreated, nil)200} else {201api.totalUpdatedConfigs.Inc()202api.writeResponse(rw, http.StatusOK, nil)203}204}205}206207// DeleteConfiguration deletes a configuration.208func (api *API) DeleteConfiguration(rw http.ResponseWriter, r *http.Request) {209api.storeMut.Lock()210defer api.storeMut.Unlock()211if api.store == nil {212api.writeError(rw, http.StatusNotFound, fmt.Errorf("no config store running"))213return214}215216configKey, err := getConfigName(r)217if err != nil {218api.writeError(rw, http.StatusBadRequest, err)219return220}221222err = api.store.Delete(r.Context(), configKey)223switch {224case errors.Is(err, ErrNotConnected):225api.writeError(rw, http.StatusNotFound, err)226case errors.As(err, &NotExistError{}):227api.writeError(rw, http.StatusNotFound, err)228case err != nil:229api.writeError(rw, http.StatusInternalServerError, err)230default:231api.totalDeletedConfigs.Inc()232api.writeResponse(rw, http.StatusOK, nil)233}234}235236func (api *API) writeError(rw http.ResponseWriter, statusCode int, writeErr error) {237err := configapi.WriteError(rw, statusCode, writeErr)238if err != nil {239level.Error(api.log).Log("msg", "failed to write response", "err", err)240}241}242243func (api *API) writeResponse(rw http.ResponseWriter, statusCode int, v interface{}) {244err := configapi.WriteResponse(rw, statusCode, v)245if err != nil {246level.Error(api.log).Log("msg", "failed to write response", "err", err)247}248}249250// getConfigName uses gorilla/mux's route variables to extract the251// "name" variable. If not found, getConfigName will return an error.252func getConfigName(r *http.Request) (string, error) {253vars := mux.Vars(r)254name := vars["name"]255name, err := url.PathUnescape(name)256if err != nil {257return "", fmt.Errorf("could not decode config name: %w", err)258}259return name, nil260}261262func messageHandlerFunc(statusCode int, msg string) http.HandlerFunc {263return func(rw http.ResponseWriter, r *http.Request) {264rw.WriteHeader(statusCode)265_, _ = rw.Write([]byte(msg))266}267}268269270