Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/heroku/internal/herokutarget/herokutarget.go
4096 views
1
package herokutarget
2
3
// This code is copied from Promtail. The herokutarget package is used to
4
// configure and run the targets that can read heroku entries and forward them
5
// to other loki components.
6
7
import (
8
"fmt"
9
"net/http"
10
"strings"
11
"time"
12
13
"github.com/go-kit/log"
14
"github.com/go-kit/log/level"
15
"github.com/gorilla/mux"
16
"github.com/grafana/agent/component/common/loki"
17
fnet "github.com/grafana/agent/component/common/net"
18
"github.com/grafana/loki/pkg/logproto"
19
herokuEncoding "github.com/heroku/x/logplex/encoding"
20
"github.com/prometheus/client_golang/prometheus"
21
"github.com/prometheus/common/model"
22
"github.com/prometheus/prometheus/model/labels"
23
"github.com/prometheus/prometheus/model/relabel"
24
)
25
26
const ReservedLabelTenantID = "__tenant_id__"
27
28
// HerokuDrainTargetConfig describes a scrape config to listen and consume heroku logs, in the HTTPS drain manner.
29
type HerokuDrainTargetConfig struct {
30
Server *fnet.ServerConfig
31
32
// Labels optionally holds labels to associate with each record received on the push api.
33
Labels model.LabelSet
34
35
// UseIncomingTimestamp sets the timestamp to the incoming heroku log entry timestamp. If false,
36
// promtail will assign the current timestamp to the log entry when it was processed.
37
UseIncomingTimestamp bool
38
}
39
40
type HerokuTarget struct {
41
logger log.Logger
42
handler loki.EntryHandler
43
config *HerokuDrainTargetConfig
44
metrics *Metrics
45
relabelConfigs []*relabel.Config
46
server *fnet.TargetServer
47
}
48
49
// NewHerokuTarget creates a brand new Heroku Drain target, capable of receiving logs from a Heroku application through an HTTP drain.
50
func NewHerokuTarget(metrics *Metrics, logger log.Logger, handler loki.EntryHandler, relabel []*relabel.Config, config *HerokuDrainTargetConfig, reg prometheus.Registerer) (*HerokuTarget, error) {
51
wrappedLogger := log.With(logger, "component", "heroku_drain")
52
53
srv, err := fnet.NewTargetServer(wrappedLogger, "loki_source_heroku_drain_target", reg, config.Server)
54
if err != nil {
55
return nil, fmt.Errorf("failed to create loki server: %w", err)
56
}
57
58
ht := &HerokuTarget{
59
server: srv,
60
metrics: metrics,
61
logger: wrappedLogger,
62
handler: handler,
63
config: config,
64
relabelConfigs: relabel,
65
}
66
67
err = ht.server.MountAndRun(func(router *mux.Router) {
68
router.Path(ht.DrainEndpoint()).Methods("POST").Handler(http.HandlerFunc(ht.drain))
69
router.Path(ht.HealthyEndpoint()).Methods("GET").Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) }))
70
})
71
if err != nil {
72
return nil, err
73
}
74
75
return ht, nil
76
}
77
78
func (h *HerokuTarget) drain(w http.ResponseWriter, r *http.Request) {
79
entries := h.handler.Chan()
80
defer r.Body.Close()
81
herokuScanner := herokuEncoding.NewDrainScanner(r.Body)
82
for herokuScanner.Scan() {
83
ts := time.Now()
84
message := herokuScanner.Message()
85
lb := labels.NewBuilder(nil)
86
lb.Set("__heroku_drain_host", message.Hostname)
87
lb.Set("__heroku_drain_app", message.Application)
88
lb.Set("__heroku_drain_proc", message.Process)
89
lb.Set("__heroku_drain_log_id", message.ID)
90
91
if h.config.UseIncomingTimestamp {
92
ts = message.Timestamp
93
}
94
95
// Create __heroku_drain_param_<name> labels from query parameters
96
params := r.URL.Query()
97
for k, v := range params {
98
lb.Set(fmt.Sprintf("__heroku_drain_param_%s", k), strings.Join(v, ","))
99
}
100
101
tenantIDHeaderValue := r.Header.Get("X-Scope-OrgID")
102
if tenantIDHeaderValue != "" {
103
// If present, first inject the tenant ID in, so it can be relabeled if necessary
104
lb.Set(ReservedLabelTenantID, tenantIDHeaderValue)
105
}
106
107
processed, _ := relabel.Process(lb.Labels(nil), h.relabelConfigs...)
108
109
// Start with the set of labels fixed in the configuration
110
filtered := h.Labels().Clone()
111
for _, lbl := range processed {
112
if strings.HasPrefix(lbl.Name, "__") {
113
continue
114
}
115
filtered[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
116
}
117
118
// Then, inject it as the reserved label, so it's used by the remote write client
119
if tenantIDHeaderValue != "" {
120
filtered[ReservedLabelTenantID] = model.LabelValue(tenantIDHeaderValue)
121
}
122
123
entries <- loki.Entry{
124
Labels: filtered,
125
Entry: logproto.Entry{
126
Timestamp: ts,
127
Line: message.Message,
128
},
129
}
130
h.metrics.herokuEntries.Inc()
131
}
132
err := herokuScanner.Err()
133
if err != nil {
134
h.metrics.herokuErrors.Inc()
135
level.Warn(h.logger).Log("msg", "failed to read incoming heroku request", "err", err.Error())
136
http.Error(w, err.Error(), http.StatusBadRequest)
137
return
138
}
139
w.WriteHeader(http.StatusNoContent)
140
}
141
142
func (h *HerokuTarget) Labels() model.LabelSet {
143
return h.config.Labels
144
}
145
146
func (h *HerokuTarget) HTTPListenAddress() string {
147
return h.server.HTTPListenAddr()
148
}
149
150
func (h *HerokuTarget) DrainEndpoint() string {
151
return "/heroku/api/v1/drain"
152
}
153
154
func (h *HerokuTarget) HealthyEndpoint() string {
155
return "/healthy"
156
}
157
158
func (h *HerokuTarget) Ready() bool {
159
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s%s", h.HTTPListenAddress(), h.HealthyEndpoint()), nil)
160
if err != nil {
161
return false
162
}
163
164
res, err := http.DefaultClient.Do(req)
165
if err != nil || res.StatusCode != http.StatusOK {
166
return false
167
}
168
169
return true
170
}
171
172
func (h *HerokuTarget) Details() interface{} {
173
return map[string]string{}
174
}
175
176
func (h *HerokuTarget) Stop() error {
177
level.Info(h.logger).Log("msg", "stopping heroku drain target")
178
h.server.StopAndShutdown()
179
h.handler.Stop()
180
return nil
181
}
182
183