Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/integrations/v2/app_agent_receiver/handler.go
5340 views
1
package app_agent_receiver
2
3
import (
4
"context"
5
"sync"
6
7
"crypto/subtle"
8
"encoding/json"
9
"net/http"
10
11
"github.com/go-kit/log"
12
"github.com/go-kit/log/level"
13
"github.com/prometheus/client_golang/prometheus"
14
"github.com/rs/cors"
15
"golang.org/x/time/rate"
16
)
17
18
const apiKeyHeader = "x-api-key"
19
20
type appAgentReceiverExporter interface {
21
Name() string
22
Export(ctx context.Context, payload Payload) error
23
}
24
25
// AppAgentReceiverHandler struct controls the data ingestion http handler of the receiver
26
type AppAgentReceiverHandler struct {
27
exporters []appAgentReceiverExporter
28
config *Config
29
rateLimiter *rate.Limiter
30
exporterErrorsCollector *prometheus.CounterVec
31
}
32
33
// NewAppAgentReceiverHandler creates a new AppReceiver instance based on the given configuration
34
func NewAppAgentReceiverHandler(conf *Config, exporters []appAgentReceiverExporter, reg prometheus.Registerer) AppAgentReceiverHandler {
35
var rateLimiter *rate.Limiter
36
if conf.Server.RateLimiting.Enabled {
37
var rps float64
38
if conf.Server.RateLimiting.RPS > 0 {
39
rps = conf.Server.RateLimiting.RPS
40
}
41
42
var b int
43
if conf.Server.RateLimiting.Burstiness > 0 {
44
b = conf.Server.RateLimiting.Burstiness
45
}
46
rateLimiter = rate.NewLimiter(rate.Limit(rps), b)
47
}
48
49
exporterErrorsCollector := prometheus.NewCounterVec(prometheus.CounterOpts{
50
Name: "app_agent_receiver_exporter_errors_total",
51
Help: "Total number of errors produced by a receiver exporter",
52
}, []string{"exporter"})
53
54
reg.MustRegister(exporterErrorsCollector)
55
56
return AppAgentReceiverHandler{
57
exporters: exporters,
58
config: conf,
59
rateLimiter: rateLimiter,
60
exporterErrorsCollector: exporterErrorsCollector,
61
}
62
}
63
64
// HTTPHandler is the http.Handler for the receiver. It will do the following
65
// 0. Enable CORS for the configured hosts
66
// 1. Check if the request should be rate limited
67
// 2. Verify that the payload size is within limits
68
// 3. Start two go routines for exporters processing and exporting data respectively
69
// 4. Respond with 202 once all the work is done
70
func (ar *AppAgentReceiverHandler) HTTPHandler(logger log.Logger) http.Handler {
71
var handler http.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
72
// Check rate limiting state
73
if ar.config.Server.RateLimiting.Enabled {
74
if ok := ar.rateLimiter.Allow(); !ok {
75
http.Error(w, http.StatusText(http.StatusTooManyRequests), http.StatusTooManyRequests)
76
return
77
}
78
}
79
80
// check API key if one is provided
81
if len(ar.config.Server.APIKey) > 0 && subtle.ConstantTimeCompare([]byte(r.Header.Get(apiKeyHeader)), []byte(ar.config.Server.APIKey)) == 0 {
82
http.Error(w, "api key not provided or incorrect", http.StatusUnauthorized)
83
return
84
}
85
86
// Verify content length. We trust net/http to give us the correct number
87
if ar.config.Server.MaxAllowedPayloadSize > 0 && r.ContentLength > ar.config.Server.MaxAllowedPayloadSize {
88
http.Error(w, http.StatusText(http.StatusRequestEntityTooLarge), http.StatusRequestEntityTooLarge)
89
return
90
}
91
92
var p Payload
93
err := json.NewDecoder(r.Body).Decode(&p)
94
if err != nil {
95
http.Error(w, err.Error(), http.StatusBadRequest)
96
return
97
}
98
99
var wg sync.WaitGroup
100
101
for _, exporter := range ar.exporters {
102
wg.Add(1)
103
go func(exp appAgentReceiverExporter) {
104
defer wg.Done()
105
if err := exp.Export(r.Context(), p); err != nil {
106
level.Error(logger).Log("msg", "exporter error", "exporter", exp.Name(), "error", err)
107
ar.exporterErrorsCollector.WithLabelValues(exp.Name()).Inc()
108
}
109
}(exporter)
110
}
111
112
wg.Wait()
113
w.WriteHeader(http.StatusAccepted)
114
_, _ = w.Write([]byte("ok"))
115
})
116
117
if len(ar.config.Server.CORSAllowedOrigins) > 0 {
118
c := cors.New(cors.Options{
119
AllowedOrigins: ar.config.Server.CORSAllowedOrigins,
120
AllowedHeaders: []string{apiKeyHeader, "content-type"},
121
})
122
handler = c.Handler(handler)
123
}
124
125
return handler
126
}
127
128