Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/api/internal/lokipush/push_api_server.go
4096 views
1
package lokipush
2
3
import (
4
"bufio"
5
"io"
6
"net/http"
7
"sort"
8
"strings"
9
"sync"
10
"time"
11
12
"github.com/go-kit/log"
13
"github.com/go-kit/log/level"
14
"github.com/gorilla/mux"
15
"github.com/grafana/agent/component/common/loki"
16
fnet "github.com/grafana/agent/component/common/net"
17
frelabel "github.com/grafana/agent/component/common/relabel"
18
"github.com/grafana/dskit/tenant"
19
"github.com/grafana/loki/pkg/loghttp/push"
20
"github.com/grafana/loki/pkg/logproto"
21
util_log "github.com/grafana/loki/pkg/util/log"
22
"github.com/prometheus/client_golang/prometheus"
23
"github.com/prometheus/common/model"
24
"github.com/prometheus/prometheus/model/labels"
25
"github.com/prometheus/prometheus/model/relabel"
26
promql_parser "github.com/prometheus/prometheus/promql/parser"
27
)
28
29
type PushAPIServer struct {
30
logger log.Logger
31
serverConfig *fnet.ServerConfig
32
server *fnet.TargetServer
33
handler loki.EntryHandler
34
35
rwMutex sync.RWMutex
36
labels model.LabelSet
37
relabelRules []*relabel.Config
38
keepTimestamp bool
39
}
40
41
func NewPushAPIServer(logger log.Logger,
42
serverConfig *fnet.ServerConfig,
43
handler loki.EntryHandler,
44
registerer prometheus.Registerer,
45
) (*PushAPIServer, error) {
46
47
s := &PushAPIServer{
48
logger: logger,
49
serverConfig: serverConfig,
50
handler: handler,
51
}
52
53
srv, err := fnet.NewTargetServer(logger, "loki_source_api", registerer, serverConfig)
54
if err != nil {
55
return nil, err
56
}
57
58
s.server = srv
59
return s, nil
60
}
61
62
func (s *PushAPIServer) Run() error {
63
level.Info(s.logger).Log("msg", "starting push API server")
64
65
err := s.server.MountAndRun(func(router *mux.Router) {
66
router.Path("/api/v1/push").Methods("POST").Handler(http.HandlerFunc(s.handleLoki))
67
router.Path("/api/v1/raw").Methods("POST").Handler(http.HandlerFunc(s.handlePlaintext))
68
router.Path("/ready").Methods("GET").Handler(http.HandlerFunc(s.ready))
69
})
70
return err
71
}
72
73
func (s *PushAPIServer) ServerConfig() fnet.ServerConfig {
74
return *s.serverConfig
75
}
76
77
func (s *PushAPIServer) Shutdown() {
78
level.Info(s.logger).Log("msg", "stopping push API server")
79
s.server.StopAndShutdown()
80
}
81
82
func (s *PushAPIServer) SetLabels(labels model.LabelSet) {
83
s.rwMutex.Lock()
84
defer s.rwMutex.Unlock()
85
s.labels = labels
86
}
87
88
func (s *PushAPIServer) getLabels() model.LabelSet {
89
s.rwMutex.RLock()
90
defer s.rwMutex.RUnlock()
91
return s.labels.Clone()
92
}
93
94
func (s *PushAPIServer) SetKeepTimestamp(keepTimestamp bool) {
95
s.rwMutex.Lock()
96
defer s.rwMutex.Unlock()
97
s.keepTimestamp = keepTimestamp
98
}
99
100
func (s *PushAPIServer) getKeepTimestamp() bool {
101
s.rwMutex.RLock()
102
defer s.rwMutex.RUnlock()
103
return s.keepTimestamp
104
}
105
106
func (s *PushAPIServer) SetRelabelRules(rules frelabel.Rules) {
107
s.rwMutex.Lock()
108
defer s.rwMutex.Unlock()
109
s.relabelRules = frelabel.ComponentToPromRelabelConfigs(rules)
110
}
111
112
func (s *PushAPIServer) getRelabelRules() []*relabel.Config {
113
s.rwMutex.RLock()
114
defer s.rwMutex.RUnlock()
115
newRules := make([]*relabel.Config, len(s.relabelRules))
116
for i, r := range s.relabelRules {
117
var rCopy = *r
118
newRules[i] = &rCopy
119
}
120
return newRules
121
}
122
123
// NOTE: This code is copied from Promtail (3478e180211c17bfe2f3f3305f668d5520f40481) with changes kept to the minimum.
124
// Only the HTTP handler functions are copied to allow for flow-specific server configuration and lifecycle management.
125
func (s *PushAPIServer) handleLoki(w http.ResponseWriter, r *http.Request) {
126
logger := util_log.WithContext(r.Context(), util_log.Logger)
127
userID, _ := tenant.TenantID(r.Context())
128
req, err := push.ParseRequest(logger, userID, r, nil)
129
if err != nil {
130
level.Warn(s.logger).Log("msg", "failed to parse incoming push request", "err", err.Error())
131
http.Error(w, err.Error(), http.StatusBadRequest)
132
return
133
}
134
135
// Take snapshot of current configs and apply consistently for the entire request.
136
addLabels := s.getLabels()
137
relabelRules := s.getRelabelRules()
138
keepTimestamp := s.getKeepTimestamp()
139
140
var lastErr error
141
for _, stream := range req.Streams {
142
ls, err := promql_parser.ParseMetric(stream.Labels)
143
if err != nil {
144
lastErr = err
145
continue
146
}
147
sort.Sort(ls)
148
149
lb := labels.NewBuilder(ls)
150
151
// Add configured labels
152
for k, v := range addLabels {
153
lb.Set(string(k), string(v))
154
}
155
156
// Apply relabeling
157
processed, keep := relabel.Process(lb.Labels(nil), relabelRules...)
158
if !keep || len(processed) == 0 {
159
w.WriteHeader(http.StatusNoContent)
160
return
161
}
162
163
// Convert to model.LabelSet
164
filtered := model.LabelSet{}
165
for i := range processed {
166
if strings.HasPrefix(processed[i].Name, "__") {
167
continue
168
}
169
filtered[model.LabelName(processed[i].Name)] = model.LabelValue(processed[i].Value)
170
}
171
172
for _, entry := range stream.Entries {
173
e := loki.Entry{
174
Labels: filtered.Clone(),
175
Entry: logproto.Entry{
176
Line: entry.Line,
177
},
178
}
179
if keepTimestamp {
180
e.Timestamp = entry.Timestamp
181
} else {
182
e.Timestamp = time.Now()
183
}
184
s.handler.Chan() <- e
185
}
186
}
187
188
if lastErr != nil {
189
level.Warn(s.logger).Log("msg", "at least one entry in the push request failed to process", "err", lastErr.Error())
190
http.Error(w, lastErr.Error(), http.StatusBadRequest)
191
return
192
}
193
194
w.WriteHeader(http.StatusNoContent)
195
}
196
197
// NOTE: This code is copied from Promtail (3478e180211c17bfe2f3f3305f668d5520f40481) with changes kept to the minimum.
198
// Only the HTTP handler functions are copied to allow for flow-specific server configuration and lifecycle management.
199
func (s *PushAPIServer) handlePlaintext(w http.ResponseWriter, r *http.Request) {
200
entries := s.handler.Chan()
201
defer r.Body.Close()
202
body := bufio.NewReader(r.Body)
203
addLabels := s.getLabels()
204
for {
205
line, err := body.ReadString('\n')
206
if err != nil && err != io.EOF {
207
level.Warn(s.logger).Log("msg", "failed to read incoming push request", "err", err.Error())
208
http.Error(w, err.Error(), http.StatusBadRequest)
209
return
210
}
211
line = strings.TrimRight(line, "\r\n")
212
if line == "" {
213
if err == io.EOF {
214
break
215
}
216
continue
217
}
218
entries <- loki.Entry{
219
Labels: addLabels,
220
Entry: logproto.Entry{
221
Timestamp: time.Now(),
222
Line: line,
223
},
224
}
225
if err == io.EOF {
226
break
227
}
228
}
229
230
w.WriteHeader(http.StatusNoContent)
231
}
232
233
// NOTE: This code is copied from Promtail (3478e180211c17bfe2f3f3305f668d5520f40481) with changes kept to the minimum.
234
// Only the HTTP handler functions are copied to allow for flow-specific server configuration and lifecycle management.
235
func (s *PushAPIServer) ready(w http.ResponseWriter, r *http.Request) {
236
resp := "ready"
237
if _, err := w.Write([]byte(resp)); err != nil {
238
level.Error(s.logger).Log("msg", "failed to respond to ready endoint", "err", err)
239
}
240
}
241
242