Path: blob/main/pkg/integrations/v2/app_agent_receiver/handler.go
5340 views
package app_agent_receiver12import (3"context"4"sync"56"crypto/subtle"7"encoding/json"8"net/http"910"github.com/go-kit/log"11"github.com/go-kit/log/level"12"github.com/prometheus/client_golang/prometheus"13"github.com/rs/cors"14"golang.org/x/time/rate"15)1617const apiKeyHeader = "x-api-key"1819type appAgentReceiverExporter interface {20Name() string21Export(ctx context.Context, payload Payload) error22}2324// AppAgentReceiverHandler struct controls the data ingestion http handler of the receiver25type AppAgentReceiverHandler struct {26exporters []appAgentReceiverExporter27config *Config28rateLimiter *rate.Limiter29exporterErrorsCollector *prometheus.CounterVec30}3132// NewAppAgentReceiverHandler creates a new AppReceiver instance based on the given configuration33func NewAppAgentReceiverHandler(conf *Config, exporters []appAgentReceiverExporter, reg prometheus.Registerer) AppAgentReceiverHandler {34var rateLimiter *rate.Limiter35if conf.Server.RateLimiting.Enabled {36var rps float6437if conf.Server.RateLimiting.RPS > 0 {38rps = conf.Server.RateLimiting.RPS39}4041var b int42if conf.Server.RateLimiting.Burstiness > 0 {43b = conf.Server.RateLimiting.Burstiness44}45rateLimiter = rate.NewLimiter(rate.Limit(rps), b)46}4748exporterErrorsCollector := prometheus.NewCounterVec(prometheus.CounterOpts{49Name: "app_agent_receiver_exporter_errors_total",50Help: "Total number of errors produced by a receiver exporter",51}, []string{"exporter"})5253reg.MustRegister(exporterErrorsCollector)5455return AppAgentReceiverHandler{56exporters: exporters,57config: conf,58rateLimiter: rateLimiter,59exporterErrorsCollector: exporterErrorsCollector,60}61}6263// HTTPHandler is the http.Handler for the receiver. It will do the following64// 0. Enable CORS for the configured hosts65// 1. Check if the request should be rate limited66// 2. Verify that the payload size is within limits67// 3. Start two go routines for exporters processing and exporting data respectively68// 4. Respond with 202 once all the work is done69func (ar *AppAgentReceiverHandler) HTTPHandler(logger log.Logger) http.Handler {70var handler http.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {71// Check rate limiting state72if ar.config.Server.RateLimiting.Enabled {73if ok := ar.rateLimiter.Allow(); !ok {74http.Error(w, http.StatusText(http.StatusTooManyRequests), http.StatusTooManyRequests)75return76}77}7879// check API key if one is provided80if len(ar.config.Server.APIKey) > 0 && subtle.ConstantTimeCompare([]byte(r.Header.Get(apiKeyHeader)), []byte(ar.config.Server.APIKey)) == 0 {81http.Error(w, "api key not provided or incorrect", http.StatusUnauthorized)82return83}8485// Verify content length. We trust net/http to give us the correct number86if ar.config.Server.MaxAllowedPayloadSize > 0 && r.ContentLength > ar.config.Server.MaxAllowedPayloadSize {87http.Error(w, http.StatusText(http.StatusRequestEntityTooLarge), http.StatusRequestEntityTooLarge)88return89}9091var p Payload92err := json.NewDecoder(r.Body).Decode(&p)93if err != nil {94http.Error(w, err.Error(), http.StatusBadRequest)95return96}9798var wg sync.WaitGroup99100for _, exporter := range ar.exporters {101wg.Add(1)102go func(exp appAgentReceiverExporter) {103defer wg.Done()104if err := exp.Export(r.Context(), p); err != nil {105level.Error(logger).Log("msg", "exporter error", "exporter", exp.Name(), "error", err)106ar.exporterErrorsCollector.WithLabelValues(exp.Name()).Inc()107}108}(exporter)109}110111wg.Wait()112w.WriteHeader(http.StatusAccepted)113_, _ = w.Write([]byte("ok"))114})115116if len(ar.config.Server.CORSAllowedOrigins) > 0 {117c := cors.New(cors.Options{118AllowedOrigins: ar.config.Server.CORSAllowedOrigins,119AllowedHeaders: []string{apiKeyHeader, "content-type"},120})121handler = c.Handler(handler)122}123124return handler125}126127128