Path: blob/main/component/loki/source/heroku/internal/herokutarget/herokutarget.go
4096 views
package herokutarget12// This code is copied from Promtail. The herokutarget package is used to3// configure and run the targets that can read heroku entries and forward them4// to other loki components.56import (7"fmt"8"net/http"9"strings"10"time"1112"github.com/go-kit/log"13"github.com/go-kit/log/level"14"github.com/gorilla/mux"15"github.com/grafana/agent/component/common/loki"16fnet "github.com/grafana/agent/component/common/net"17"github.com/grafana/loki/pkg/logproto"18herokuEncoding "github.com/heroku/x/logplex/encoding"19"github.com/prometheus/client_golang/prometheus"20"github.com/prometheus/common/model"21"github.com/prometheus/prometheus/model/labels"22"github.com/prometheus/prometheus/model/relabel"23)2425const ReservedLabelTenantID = "__tenant_id__"2627// HerokuDrainTargetConfig describes a scrape config to listen and consume heroku logs, in the HTTPS drain manner.28type HerokuDrainTargetConfig struct {29Server *fnet.ServerConfig3031// Labels optionally holds labels to associate with each record received on the push api.32Labels model.LabelSet3334// UseIncomingTimestamp sets the timestamp to the incoming heroku log entry timestamp. If false,35// promtail will assign the current timestamp to the log entry when it was processed.36UseIncomingTimestamp bool37}3839type HerokuTarget struct {40logger log.Logger41handler loki.EntryHandler42config *HerokuDrainTargetConfig43metrics *Metrics44relabelConfigs []*relabel.Config45server *fnet.TargetServer46}4748// NewHerokuTarget creates a brand new Heroku Drain target, capable of receiving logs from a Heroku application through an HTTP drain.49func NewHerokuTarget(metrics *Metrics, logger log.Logger, handler loki.EntryHandler, relabel []*relabel.Config, config *HerokuDrainTargetConfig, reg prometheus.Registerer) (*HerokuTarget, error) {50wrappedLogger := log.With(logger, "component", "heroku_drain")5152srv, err := fnet.NewTargetServer(wrappedLogger, "loki_source_heroku_drain_target", reg, config.Server)53if err != nil {54return nil, fmt.Errorf("failed to create loki server: %w", err)55}5657ht := &HerokuTarget{58server: srv,59metrics: metrics,60logger: wrappedLogger,61handler: handler,62config: config,63relabelConfigs: relabel,64}6566err = ht.server.MountAndRun(func(router *mux.Router) {67router.Path(ht.DrainEndpoint()).Methods("POST").Handler(http.HandlerFunc(ht.drain))68router.Path(ht.HealthyEndpoint()).Methods("GET").Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) }))69})70if err != nil {71return nil, err72}7374return ht, nil75}7677func (h *HerokuTarget) drain(w http.ResponseWriter, r *http.Request) {78entries := h.handler.Chan()79defer r.Body.Close()80herokuScanner := herokuEncoding.NewDrainScanner(r.Body)81for herokuScanner.Scan() {82ts := time.Now()83message := herokuScanner.Message()84lb := labels.NewBuilder(nil)85lb.Set("__heroku_drain_host", message.Hostname)86lb.Set("__heroku_drain_app", message.Application)87lb.Set("__heroku_drain_proc", message.Process)88lb.Set("__heroku_drain_log_id", message.ID)8990if h.config.UseIncomingTimestamp {91ts = message.Timestamp92}9394// Create __heroku_drain_param_<name> labels from query parameters95params := r.URL.Query()96for k, v := range params {97lb.Set(fmt.Sprintf("__heroku_drain_param_%s", k), strings.Join(v, ","))98}99100tenantIDHeaderValue := r.Header.Get("X-Scope-OrgID")101if tenantIDHeaderValue != "" {102// If present, first inject the tenant ID in, so it can be relabeled if necessary103lb.Set(ReservedLabelTenantID, tenantIDHeaderValue)104}105106processed, _ := relabel.Process(lb.Labels(nil), h.relabelConfigs...)107108// Start with the set of labels fixed in the configuration109filtered := h.Labels().Clone()110for _, lbl := range processed {111if strings.HasPrefix(lbl.Name, "__") {112continue113}114filtered[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)115}116117// Then, inject it as the reserved label, so it's used by the remote write client118if tenantIDHeaderValue != "" {119filtered[ReservedLabelTenantID] = model.LabelValue(tenantIDHeaderValue)120}121122entries <- loki.Entry{123Labels: filtered,124Entry: logproto.Entry{125Timestamp: ts,126Line: message.Message,127},128}129h.metrics.herokuEntries.Inc()130}131err := herokuScanner.Err()132if err != nil {133h.metrics.herokuErrors.Inc()134level.Warn(h.logger).Log("msg", "failed to read incoming heroku request", "err", err.Error())135http.Error(w, err.Error(), http.StatusBadRequest)136return137}138w.WriteHeader(http.StatusNoContent)139}140141func (h *HerokuTarget) Labels() model.LabelSet {142return h.config.Labels143}144145func (h *HerokuTarget) HTTPListenAddress() string {146return h.server.HTTPListenAddr()147}148149func (h *HerokuTarget) DrainEndpoint() string {150return "/heroku/api/v1/drain"151}152153func (h *HerokuTarget) HealthyEndpoint() string {154return "/healthy"155}156157func (h *HerokuTarget) Ready() bool {158req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s%s", h.HTTPListenAddress(), h.HealthyEndpoint()), nil)159if err != nil {160return false161}162163res, err := http.DefaultClient.Do(req)164if err != nil || res.StatusCode != http.StatusOK {165return false166}167168return true169}170171func (h *HerokuTarget) Details() interface{} {172return map[string]string{}173}174175func (h *HerokuTarget) Stop() error {176level.Info(h.logger).Log("msg", "stopping heroku drain target")177h.server.StopAndShutdown()178h.handler.Stop()179return nil180}181182183