Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/gcplog/internal/gcplogtarget/push_target.go
4096 views
1
package gcplogtarget
2
3
// This code is copied from Promtail. The gcplogtarget package is used to
4
// configure and run the targets that can read log entries from cloud resource
5
// logs like bucket logs, load balancer logs, and Kubernetes cluster logs
6
// from GCP.
7
8
import (
9
"context"
10
"encoding/json"
11
"fmt"
12
"io"
13
"net/http"
14
15
"github.com/go-kit/log"
16
"github.com/go-kit/log/level"
17
"github.com/gorilla/mux"
18
"github.com/prometheus/client_golang/prometheus"
19
"github.com/prometheus/common/model"
20
"github.com/prometheus/prometheus/model/relabel"
21
22
"github.com/grafana/agent/component/common/loki"
23
fnet "github.com/grafana/agent/component/common/net"
24
)
25
26
// PushTarget defines a server for receiving messages from a GCP PubSub push
27
// subscription.
28
type PushTarget struct {
29
logger log.Logger
30
jobName string
31
metrics *Metrics
32
config *PushConfig
33
entries chan<- loki.Entry
34
handler loki.EntryHandler
35
relabelConfigs []*relabel.Config
36
server *fnet.TargetServer
37
}
38
39
// NewPushTarget constructs a PushTarget.
40
func NewPushTarget(metrics *Metrics, logger log.Logger, handler loki.EntryHandler, jobName string, config *PushConfig, relabel []*relabel.Config, reg prometheus.Registerer) (*PushTarget, error) {
41
wrappedLogger := log.With(logger, "component", "gcp_push")
42
srv, err := fnet.NewTargetServer(wrappedLogger, jobName+"_push_target", reg, config.Server)
43
if err != nil {
44
return nil, fmt.Errorf("failed to create loki http server: %w", err)
45
}
46
pt := &PushTarget{
47
server: srv,
48
logger: wrappedLogger,
49
jobName: jobName,
50
metrics: metrics,
51
config: config,
52
entries: handler.Chan(),
53
handler: handler,
54
relabelConfigs: relabel,
55
}
56
57
err = pt.server.MountAndRun(func(router *mux.Router) {
58
router.Path("/gcp/api/v1/push").Methods("POST").Handler(http.HandlerFunc(pt.push))
59
})
60
if err != nil {
61
return nil, err
62
}
63
64
return pt, nil
65
}
66
67
func (p *PushTarget) push(w http.ResponseWriter, r *http.Request) {
68
defer r.Body.Close()
69
70
// Create no-op context.WithTimeout returns to simplify logic
71
ctx := r.Context()
72
cancel := context.CancelFunc(func() {})
73
if p.config.PushTimeout != 0 {
74
ctx, cancel = context.WithTimeout(r.Context(), p.config.PushTimeout)
75
}
76
defer cancel()
77
78
pushMessage := PushMessage{}
79
bs, err := io.ReadAll(r.Body)
80
if err != nil {
81
p.metrics.gcpPushErrors.WithLabelValues("read_error").Inc()
82
level.Warn(p.logger).Log("msg", "failed to read incoming gcp push request", "err", err.Error())
83
http.Error(w, err.Error(), http.StatusBadRequest)
84
return
85
}
86
err = json.Unmarshal(bs, &pushMessage)
87
if err != nil {
88
p.metrics.gcpPushErrors.WithLabelValues("format").Inc()
89
level.Warn(p.logger).Log("msg", "failed to unmarshall gcp push request", "err", err.Error())
90
http.Error(w, err.Error(), http.StatusBadRequest)
91
return
92
}
93
if err = pushMessage.Validate(); err != nil {
94
p.metrics.gcpPushErrors.WithLabelValues("invalid_message").Inc()
95
level.Warn(p.logger).Log("msg", "invalid gcp push request", "err", err.Error())
96
http.Error(w, err.Error(), http.StatusBadRequest)
97
return
98
}
99
100
entry, err := translate(pushMessage, p.Labels(), p.config.UseIncomingTimestamp, p.relabelConfigs, r.Header.Get("X-Scope-OrgID"))
101
if err != nil {
102
p.metrics.gcpPushErrors.WithLabelValues("translation").Inc()
103
level.Warn(p.logger).Log("msg", "failed to translate gcp push request", "err", err.Error())
104
http.Error(w, err.Error(), http.StatusBadRequest)
105
return
106
}
107
108
level.Debug(p.logger).Log("msg", fmt.Sprintf("Received line: %s", entry.Line))
109
110
if err := p.doSendEntry(ctx, entry); err != nil {
111
// NOTE: timeout errors can be tracked with from the metrics exposed by
112
// the spun weaveworks server.
113
// loki.source.gcplog.componentid_push_target_request_duration_seconds_count{status_code="503"}
114
level.Warn(p.logger).Log("msg", "error sending log entry", "err", err.Error())
115
http.Error(w, err.Error(), http.StatusServiceUnavailable)
116
return
117
}
118
119
p.metrics.gcpPushEntries.WithLabelValues().Inc()
120
w.WriteHeader(http.StatusNoContent)
121
}
122
123
func (p *PushTarget) doSendEntry(ctx context.Context, entry loki.Entry) error {
124
select {
125
// Timeout the loki.Entry channel send operation, which is the only blocking operation in the handler
126
case <-ctx.Done():
127
return fmt.Errorf("timeout exceeded: %w", ctx.Err())
128
case p.entries <- entry:
129
return nil
130
}
131
}
132
133
// Labels return the model.LabelSet that the target applies to log entries.
134
func (p *PushTarget) Labels() model.LabelSet {
135
lbls := make(model.LabelSet, len(p.config.Labels))
136
for k, v := range p.config.Labels {
137
lbls[model.LabelName(k)] = model.LabelValue(v)
138
}
139
return lbls
140
}
141
142
// Details returns some debug information about the target.
143
func (p *PushTarget) Details() map[string]string {
144
return map[string]string{
145
"strategy": "push",
146
"labels": p.Labels().String(),
147
"server_address": p.server.HTTPListenAddr(),
148
}
149
}
150
151
// Stop shuts down the push target.
152
func (p *PushTarget) Stop() error {
153
level.Info(p.logger).Log("msg", "stopping gcp push target", "job", p.jobName)
154
p.server.StopAndShutdown()
155
p.handler.Stop()
156
return nil
157
}
158
159