Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/integrations/v2/app_agent_receiver/app_agent_receiver.go
5389 views
1
package app_agent_receiver //nolint:golint
2
3
import (
4
"context"
5
"fmt"
6
"net/http"
7
8
"github.com/go-kit/log"
9
"github.com/go-kit/log/level"
10
"github.com/gorilla/mux"
11
"github.com/grafana/agent/pkg/integrations/v2"
12
"github.com/grafana/agent/pkg/integrations/v2/metricsutils"
13
"github.com/grafana/agent/pkg/traces/pushreceiver"
14
"github.com/prometheus/client_golang/prometheus"
15
"github.com/prometheus/client_golang/prometheus/promhttp"
16
"github.com/weaveworks/common/instrument"
17
"github.com/weaveworks/common/middleware"
18
"go.opentelemetry.io/collector/component"
19
"go.opentelemetry.io/collector/consumer"
20
)
21
22
type appAgentReceiverIntegration struct {
23
integrations.MetricsIntegration
24
appAgentReceiverHandler AppAgentReceiverHandler
25
logger log.Logger
26
conf *Config
27
reg prometheus.Registerer
28
29
requestDurationCollector *prometheus.HistogramVec
30
receivedMessageSizeCollector *prometheus.HistogramVec
31
sentMessageSizeCollector *prometheus.HistogramVec
32
inflightRequestsCollector *prometheus.GaugeVec
33
}
34
35
// Static typecheck tests
36
var (
37
_ integrations.Integration = (*appAgentReceiverIntegration)(nil)
38
_ integrations.HTTPIntegration = (*appAgentReceiverIntegration)(nil)
39
_ integrations.MetricsIntegration = (*appAgentReceiverIntegration)(nil)
40
)
41
42
// NewIntegration converts this config into an instance of an integration
43
func (c *Config) NewIntegration(l log.Logger, globals integrations.Globals) (integrations.Integration, error) {
44
reg := prometheus.NewRegistry()
45
sourcemapLogger := log.With(l, "subcomponent", "sourcemaps")
46
sourcemapStore := NewSourceMapStore(sourcemapLogger, c.SourceMaps, reg, nil, nil)
47
48
receiverMetricsExporter := NewReceiverMetricsExporter(reg)
49
50
var exp = []appAgentReceiverExporter{
51
receiverMetricsExporter,
52
}
53
54
if len(c.LogsInstance) > 0 {
55
getLogsInstance := func() (logsInstance, error) {
56
instance := globals.Logs.Instance(c.LogsInstance)
57
if instance == nil {
58
return nil, fmt.Errorf("logs instance \"%s\" not found", c.LogsInstance)
59
}
60
return instance, nil
61
}
62
63
if _, err := getLogsInstance(); err != nil {
64
return nil, err
65
}
66
67
lokiExporter := NewLogsExporter(
68
l,
69
LogsExporterConfig{
70
GetLogsInstance: getLogsInstance,
71
Labels: c.LogsLabels,
72
SendEntryTimeout: c.LogsSendTimeout,
73
},
74
sourcemapStore,
75
)
76
exp = append(exp, lokiExporter)
77
}
78
79
if len(c.TracesInstance) > 0 {
80
getTracesConsumer := func() (consumer.Traces, error) {
81
tracesInstance := globals.Tracing.Instance(c.TracesInstance)
82
if tracesInstance == nil {
83
return nil, fmt.Errorf("traces instance \"%s\" not found", c.TracesInstance)
84
}
85
factory := tracesInstance.GetFactory(component.KindReceiver, pushreceiver.TypeStr)
86
if factory == nil {
87
return nil, fmt.Errorf("push receiver factory not found for traces instance \"%s\"", c.TracesInstance)
88
}
89
consumer := factory.(*pushreceiver.Factory).Consumer
90
if consumer == nil {
91
return nil, fmt.Errorf("consumer not set for push receiver factory on traces instance \"%s\"", c.TracesInstance)
92
}
93
return consumer, nil
94
}
95
if _, err := getTracesConsumer(); err != nil {
96
return nil, err
97
}
98
tracesExporter := NewTracesExporter(getTracesConsumer)
99
exp = append(exp, tracesExporter)
100
}
101
102
handler := NewAppAgentReceiverHandler(c, exp, reg)
103
104
metricsIntegration, err := metricsutils.NewMetricsHandlerIntegration(l, c, c.Common, globals, promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
105
if err != nil {
106
return nil, err
107
}
108
109
requestDurationCollector := prometheus.NewHistogramVec(prometheus.HistogramOpts{
110
Name: "app_agent_receiver_request_duration_seconds",
111
Help: "Time (in seconds) spent serving HTTP requests.",
112
Buckets: instrument.DefBuckets,
113
}, []string{"method", "route", "status_code", "ws"})
114
reg.MustRegister(requestDurationCollector)
115
116
receivedMessageSizeCollector := prometheus.NewHistogramVec(prometheus.HistogramOpts{
117
Name: "app_agent_receiver_request_message_bytes",
118
Help: "Size (in bytes) of messages received in the request.",
119
Buckets: middleware.BodySizeBuckets,
120
}, []string{"method", "route"})
121
reg.MustRegister(receivedMessageSizeCollector)
122
123
sentMessageSizeCollector := prometheus.NewHistogramVec(prometheus.HistogramOpts{
124
Name: "app_agent_receiver_response_message_bytes",
125
Help: "Size (in bytes) of messages sent in response.",
126
Buckets: middleware.BodySizeBuckets,
127
}, []string{"method", "route"})
128
reg.MustRegister(sentMessageSizeCollector)
129
130
inflightRequestsCollector := prometheus.NewGaugeVec(prometheus.GaugeOpts{
131
Name: "app_agent_receiver_inflight_requests",
132
Help: "Current number of inflight requests.",
133
}, []string{"method", "route"})
134
reg.MustRegister(inflightRequestsCollector)
135
136
return &appAgentReceiverIntegration{
137
MetricsIntegration: metricsIntegration,
138
appAgentReceiverHandler: handler,
139
logger: l,
140
conf: c,
141
reg: reg,
142
143
requestDurationCollector: requestDurationCollector,
144
receivedMessageSizeCollector: receivedMessageSizeCollector,
145
sentMessageSizeCollector: sentMessageSizeCollector,
146
inflightRequestsCollector: inflightRequestsCollector,
147
}, nil
148
}
149
150
// RunIntegration implements Integration
151
func (i *appAgentReceiverIntegration) RunIntegration(ctx context.Context) error {
152
r := mux.NewRouter()
153
r.Handle("/collect", i.appAgentReceiverHandler.HTTPHandler(i.logger)).Methods("POST", "OPTIONS")
154
155
mw := middleware.Instrument{
156
RouteMatcher: r,
157
Duration: i.requestDurationCollector,
158
RequestBodySize: i.receivedMessageSizeCollector,
159
ResponseBodySize: i.sentMessageSizeCollector,
160
InflightRequests: i.inflightRequestsCollector,
161
}
162
163
srv := &http.Server{
164
Addr: fmt.Sprintf("%s:%d", i.conf.Server.Host, i.conf.Server.Port),
165
Handler: mw.Wrap(r),
166
}
167
errChan := make(chan error, 1)
168
169
go func() {
170
level.Info(i.logger).Log("msg", "starting app agent receiver", "host", i.conf.Server.Host, "port", i.conf.Server.Port)
171
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
172
errChan <- err
173
}
174
}()
175
176
select {
177
case <-ctx.Done():
178
if err := srv.Shutdown(ctx); err != nil {
179
return err
180
}
181
case err := <-errChan:
182
close(errChan)
183
return err
184
}
185
186
return nil
187
}
188
189
func init() {
190
integrations.Register(&Config{}, integrations.TypeMultiplex)
191
}
192
193