Path: blob/main/pkg/integrations/v2/app_agent_receiver/app_agent_receiver.go
5389 views
package app_agent_receiver //nolint:golint12import (3"context"4"fmt"5"net/http"67"github.com/go-kit/log"8"github.com/go-kit/log/level"9"github.com/gorilla/mux"10"github.com/grafana/agent/pkg/integrations/v2"11"github.com/grafana/agent/pkg/integrations/v2/metricsutils"12"github.com/grafana/agent/pkg/traces/pushreceiver"13"github.com/prometheus/client_golang/prometheus"14"github.com/prometheus/client_golang/prometheus/promhttp"15"github.com/weaveworks/common/instrument"16"github.com/weaveworks/common/middleware"17"go.opentelemetry.io/collector/component"18"go.opentelemetry.io/collector/consumer"19)2021type appAgentReceiverIntegration struct {22integrations.MetricsIntegration23appAgentReceiverHandler AppAgentReceiverHandler24logger log.Logger25conf *Config26reg prometheus.Registerer2728requestDurationCollector *prometheus.HistogramVec29receivedMessageSizeCollector *prometheus.HistogramVec30sentMessageSizeCollector *prometheus.HistogramVec31inflightRequestsCollector *prometheus.GaugeVec32}3334// Static typecheck tests35var (36_ integrations.Integration = (*appAgentReceiverIntegration)(nil)37_ integrations.HTTPIntegration = (*appAgentReceiverIntegration)(nil)38_ integrations.MetricsIntegration = (*appAgentReceiverIntegration)(nil)39)4041// NewIntegration converts this config into an instance of an integration42func (c *Config) NewIntegration(l log.Logger, globals integrations.Globals) (integrations.Integration, error) {43reg := prometheus.NewRegistry()44sourcemapLogger := log.With(l, "subcomponent", "sourcemaps")45sourcemapStore := NewSourceMapStore(sourcemapLogger, c.SourceMaps, reg, nil, nil)4647receiverMetricsExporter := NewReceiverMetricsExporter(reg)4849var exp = []appAgentReceiverExporter{50receiverMetricsExporter,51}5253if len(c.LogsInstance) > 0 {54getLogsInstance := func() (logsInstance, error) {55instance := globals.Logs.Instance(c.LogsInstance)56if instance == nil {57return nil, fmt.Errorf("logs instance \"%s\" not found", c.LogsInstance)58}59return instance, nil60}6162if _, err := getLogsInstance(); err != nil {63return nil, err64}6566lokiExporter := NewLogsExporter(67l,68LogsExporterConfig{69GetLogsInstance: getLogsInstance,70Labels: c.LogsLabels,71SendEntryTimeout: c.LogsSendTimeout,72},73sourcemapStore,74)75exp = append(exp, lokiExporter)76}7778if len(c.TracesInstance) > 0 {79getTracesConsumer := func() (consumer.Traces, error) {80tracesInstance := globals.Tracing.Instance(c.TracesInstance)81if tracesInstance == nil {82return nil, fmt.Errorf("traces instance \"%s\" not found", c.TracesInstance)83}84factory := tracesInstance.GetFactory(component.KindReceiver, pushreceiver.TypeStr)85if factory == nil {86return nil, fmt.Errorf("push receiver factory not found for traces instance \"%s\"", c.TracesInstance)87}88consumer := factory.(*pushreceiver.Factory).Consumer89if consumer == nil {90return nil, fmt.Errorf("consumer not set for push receiver factory on traces instance \"%s\"", c.TracesInstance)91}92return consumer, nil93}94if _, err := getTracesConsumer(); err != nil {95return nil, err96}97tracesExporter := NewTracesExporter(getTracesConsumer)98exp = append(exp, tracesExporter)99}100101handler := NewAppAgentReceiverHandler(c, exp, reg)102103metricsIntegration, err := metricsutils.NewMetricsHandlerIntegration(l, c, c.Common, globals, promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))104if err != nil {105return nil, err106}107108requestDurationCollector := prometheus.NewHistogramVec(prometheus.HistogramOpts{109Name: "app_agent_receiver_request_duration_seconds",110Help: "Time (in seconds) spent serving HTTP requests.",111Buckets: instrument.DefBuckets,112}, []string{"method", "route", "status_code", "ws"})113reg.MustRegister(requestDurationCollector)114115receivedMessageSizeCollector := prometheus.NewHistogramVec(prometheus.HistogramOpts{116Name: "app_agent_receiver_request_message_bytes",117Help: "Size (in bytes) of messages received in the request.",118Buckets: middleware.BodySizeBuckets,119}, []string{"method", "route"})120reg.MustRegister(receivedMessageSizeCollector)121122sentMessageSizeCollector := prometheus.NewHistogramVec(prometheus.HistogramOpts{123Name: "app_agent_receiver_response_message_bytes",124Help: "Size (in bytes) of messages sent in response.",125Buckets: middleware.BodySizeBuckets,126}, []string{"method", "route"})127reg.MustRegister(sentMessageSizeCollector)128129inflightRequestsCollector := prometheus.NewGaugeVec(prometheus.GaugeOpts{130Name: "app_agent_receiver_inflight_requests",131Help: "Current number of inflight requests.",132}, []string{"method", "route"})133reg.MustRegister(inflightRequestsCollector)134135return &appAgentReceiverIntegration{136MetricsIntegration: metricsIntegration,137appAgentReceiverHandler: handler,138logger: l,139conf: c,140reg: reg,141142requestDurationCollector: requestDurationCollector,143receivedMessageSizeCollector: receivedMessageSizeCollector,144sentMessageSizeCollector: sentMessageSizeCollector,145inflightRequestsCollector: inflightRequestsCollector,146}, nil147}148149// RunIntegration implements Integration150func (i *appAgentReceiverIntegration) RunIntegration(ctx context.Context) error {151r := mux.NewRouter()152r.Handle("/collect", i.appAgentReceiverHandler.HTTPHandler(i.logger)).Methods("POST", "OPTIONS")153154mw := middleware.Instrument{155RouteMatcher: r,156Duration: i.requestDurationCollector,157RequestBodySize: i.receivedMessageSizeCollector,158ResponseBodySize: i.sentMessageSizeCollector,159InflightRequests: i.inflightRequestsCollector,160}161162srv := &http.Server{163Addr: fmt.Sprintf("%s:%d", i.conf.Server.Host, i.conf.Server.Port),164Handler: mw.Wrap(r),165}166errChan := make(chan error, 1)167168go func() {169level.Info(i.logger).Log("msg", "starting app agent receiver", "host", i.conf.Server.Host, "port", i.conf.Server.Port)170if err := srv.ListenAndServe(); err != http.ErrServerClosed {171errChan <- err172}173}()174175select {176case <-ctx.Done():177if err := srv.Shutdown(ctx); err != nil {178return err179}180case err := <-errChan:181close(errChan)182return err183}184185return nil186}187188func init() {189integrations.Register(&Config{}, integrations.TypeMultiplex)190}191192193