Path: blob/main/component/loki/source/api/internal/lokipush/push_api_server.go
4096 views
package lokipush12import (3"bufio"4"io"5"net/http"6"sort"7"strings"8"sync"9"time"1011"github.com/go-kit/log"12"github.com/go-kit/log/level"13"github.com/gorilla/mux"14"github.com/grafana/agent/component/common/loki"15fnet "github.com/grafana/agent/component/common/net"16frelabel "github.com/grafana/agent/component/common/relabel"17"github.com/grafana/dskit/tenant"18"github.com/grafana/loki/pkg/loghttp/push"19"github.com/grafana/loki/pkg/logproto"20util_log "github.com/grafana/loki/pkg/util/log"21"github.com/prometheus/client_golang/prometheus"22"github.com/prometheus/common/model"23"github.com/prometheus/prometheus/model/labels"24"github.com/prometheus/prometheus/model/relabel"25promql_parser "github.com/prometheus/prometheus/promql/parser"26)2728type PushAPIServer struct {29logger log.Logger30serverConfig *fnet.ServerConfig31server *fnet.TargetServer32handler loki.EntryHandler3334rwMutex sync.RWMutex35labels model.LabelSet36relabelRules []*relabel.Config37keepTimestamp bool38}3940func NewPushAPIServer(logger log.Logger,41serverConfig *fnet.ServerConfig,42handler loki.EntryHandler,43registerer prometheus.Registerer,44) (*PushAPIServer, error) {4546s := &PushAPIServer{47logger: logger,48serverConfig: serverConfig,49handler: handler,50}5152srv, err := fnet.NewTargetServer(logger, "loki_source_api", registerer, serverConfig)53if err != nil {54return nil, err55}5657s.server = srv58return s, nil59}6061func (s *PushAPIServer) Run() error {62level.Info(s.logger).Log("msg", "starting push API server")6364err := s.server.MountAndRun(func(router *mux.Router) {65router.Path("/api/v1/push").Methods("POST").Handler(http.HandlerFunc(s.handleLoki))66router.Path("/api/v1/raw").Methods("POST").Handler(http.HandlerFunc(s.handlePlaintext))67router.Path("/ready").Methods("GET").Handler(http.HandlerFunc(s.ready))68})69return err70}7172func (s *PushAPIServer) ServerConfig() fnet.ServerConfig {73return *s.serverConfig74}7576func (s *PushAPIServer) Shutdown() {77level.Info(s.logger).Log("msg", "stopping push API server")78s.server.StopAndShutdown()79}8081func (s *PushAPIServer) SetLabels(labels model.LabelSet) {82s.rwMutex.Lock()83defer s.rwMutex.Unlock()84s.labels = labels85}8687func (s *PushAPIServer) getLabels() model.LabelSet {88s.rwMutex.RLock()89defer s.rwMutex.RUnlock()90return s.labels.Clone()91}9293func (s *PushAPIServer) SetKeepTimestamp(keepTimestamp bool) {94s.rwMutex.Lock()95defer s.rwMutex.Unlock()96s.keepTimestamp = keepTimestamp97}9899func (s *PushAPIServer) getKeepTimestamp() bool {100s.rwMutex.RLock()101defer s.rwMutex.RUnlock()102return s.keepTimestamp103}104105func (s *PushAPIServer) SetRelabelRules(rules frelabel.Rules) {106s.rwMutex.Lock()107defer s.rwMutex.Unlock()108s.relabelRules = frelabel.ComponentToPromRelabelConfigs(rules)109}110111func (s *PushAPIServer) getRelabelRules() []*relabel.Config {112s.rwMutex.RLock()113defer s.rwMutex.RUnlock()114newRules := make([]*relabel.Config, len(s.relabelRules))115for i, r := range s.relabelRules {116var rCopy = *r117newRules[i] = &rCopy118}119return newRules120}121122// NOTE: This code is copied from Promtail (3478e180211c17bfe2f3f3305f668d5520f40481) with changes kept to the minimum.123// Only the HTTP handler functions are copied to allow for flow-specific server configuration and lifecycle management.124func (s *PushAPIServer) handleLoki(w http.ResponseWriter, r *http.Request) {125logger := util_log.WithContext(r.Context(), util_log.Logger)126userID, _ := tenant.TenantID(r.Context())127req, err := push.ParseRequest(logger, userID, r, nil)128if err != nil {129level.Warn(s.logger).Log("msg", "failed to parse incoming push request", "err", err.Error())130http.Error(w, err.Error(), http.StatusBadRequest)131return132}133134// Take snapshot of current configs and apply consistently for the entire request.135addLabels := s.getLabels()136relabelRules := s.getRelabelRules()137keepTimestamp := s.getKeepTimestamp()138139var lastErr error140for _, stream := range req.Streams {141ls, err := promql_parser.ParseMetric(stream.Labels)142if err != nil {143lastErr = err144continue145}146sort.Sort(ls)147148lb := labels.NewBuilder(ls)149150// Add configured labels151for k, v := range addLabels {152lb.Set(string(k), string(v))153}154155// Apply relabeling156processed, keep := relabel.Process(lb.Labels(nil), relabelRules...)157if !keep || len(processed) == 0 {158w.WriteHeader(http.StatusNoContent)159return160}161162// Convert to model.LabelSet163filtered := model.LabelSet{}164for i := range processed {165if strings.HasPrefix(processed[i].Name, "__") {166continue167}168filtered[model.LabelName(processed[i].Name)] = model.LabelValue(processed[i].Value)169}170171for _, entry := range stream.Entries {172e := loki.Entry{173Labels: filtered.Clone(),174Entry: logproto.Entry{175Line: entry.Line,176},177}178if keepTimestamp {179e.Timestamp = entry.Timestamp180} else {181e.Timestamp = time.Now()182}183s.handler.Chan() <- e184}185}186187if lastErr != nil {188level.Warn(s.logger).Log("msg", "at least one entry in the push request failed to process", "err", lastErr.Error())189http.Error(w, lastErr.Error(), http.StatusBadRequest)190return191}192193w.WriteHeader(http.StatusNoContent)194}195196// NOTE: This code is copied from Promtail (3478e180211c17bfe2f3f3305f668d5520f40481) with changes kept to the minimum.197// Only the HTTP handler functions are copied to allow for flow-specific server configuration and lifecycle management.198func (s *PushAPIServer) handlePlaintext(w http.ResponseWriter, r *http.Request) {199entries := s.handler.Chan()200defer r.Body.Close()201body := bufio.NewReader(r.Body)202addLabels := s.getLabels()203for {204line, err := body.ReadString('\n')205if err != nil && err != io.EOF {206level.Warn(s.logger).Log("msg", "failed to read incoming push request", "err", err.Error())207http.Error(w, err.Error(), http.StatusBadRequest)208return209}210line = strings.TrimRight(line, "\r\n")211if line == "" {212if err == io.EOF {213break214}215continue216}217entries <- loki.Entry{218Labels: addLabels,219Entry: logproto.Entry{220Timestamp: time.Now(),221Line: line,222},223}224if err == io.EOF {225break226}227}228229w.WriteHeader(http.StatusNoContent)230}231232// NOTE: This code is copied from Promtail (3478e180211c17bfe2f3f3305f668d5520f40481) with changes kept to the minimum.233// Only the HTTP handler functions are copied to allow for flow-specific server configuration and lifecycle management.234func (s *PushAPIServer) ready(w http.ResponseWriter, r *http.Request) {235resp := "ready"236if _, err := w.Write([]byte(resp)); err != nil {237level.Error(s.logger).Log("msg", "failed to respond to ready endoint", "err", err)238}239}240241242