Path: blob/main/component/loki/source/gcplog/internal/gcplogtarget/push_target.go
4096 views
package gcplogtarget12// This code is copied from Promtail. The gcplogtarget package is used to3// configure and run the targets that can read log entries from cloud resource4// logs like bucket logs, load balancer logs, and Kubernetes cluster logs5// from GCP.67import (8"context"9"encoding/json"10"fmt"11"io"12"net/http"1314"github.com/go-kit/log"15"github.com/go-kit/log/level"16"github.com/gorilla/mux"17"github.com/prometheus/client_golang/prometheus"18"github.com/prometheus/common/model"19"github.com/prometheus/prometheus/model/relabel"2021"github.com/grafana/agent/component/common/loki"22fnet "github.com/grafana/agent/component/common/net"23)2425// PushTarget defines a server for receiving messages from a GCP PubSub push26// subscription.27type PushTarget struct {28logger log.Logger29jobName string30metrics *Metrics31config *PushConfig32entries chan<- loki.Entry33handler loki.EntryHandler34relabelConfigs []*relabel.Config35server *fnet.TargetServer36}3738// NewPushTarget constructs a PushTarget.39func NewPushTarget(metrics *Metrics, logger log.Logger, handler loki.EntryHandler, jobName string, config *PushConfig, relabel []*relabel.Config, reg prometheus.Registerer) (*PushTarget, error) {40wrappedLogger := log.With(logger, "component", "gcp_push")41srv, err := fnet.NewTargetServer(wrappedLogger, jobName+"_push_target", reg, config.Server)42if err != nil {43return nil, fmt.Errorf("failed to create loki http server: %w", err)44}45pt := &PushTarget{46server: srv,47logger: wrappedLogger,48jobName: jobName,49metrics: metrics,50config: config,51entries: handler.Chan(),52handler: handler,53relabelConfigs: relabel,54}5556err = pt.server.MountAndRun(func(router *mux.Router) {57router.Path("/gcp/api/v1/push").Methods("POST").Handler(http.HandlerFunc(pt.push))58})59if err != nil {60return nil, err61}6263return pt, nil64}6566func (p *PushTarget) push(w http.ResponseWriter, r *http.Request) {67defer r.Body.Close()6869// Create no-op context.WithTimeout returns to simplify logic70ctx := r.Context()71cancel := context.CancelFunc(func() {})72if p.config.PushTimeout != 0 {73ctx, cancel = context.WithTimeout(r.Context(), p.config.PushTimeout)74}75defer cancel()7677pushMessage := PushMessage{}78bs, err := io.ReadAll(r.Body)79if err != nil {80p.metrics.gcpPushErrors.WithLabelValues("read_error").Inc()81level.Warn(p.logger).Log("msg", "failed to read incoming gcp push request", "err", err.Error())82http.Error(w, err.Error(), http.StatusBadRequest)83return84}85err = json.Unmarshal(bs, &pushMessage)86if err != nil {87p.metrics.gcpPushErrors.WithLabelValues("format").Inc()88level.Warn(p.logger).Log("msg", "failed to unmarshall gcp push request", "err", err.Error())89http.Error(w, err.Error(), http.StatusBadRequest)90return91}92if err = pushMessage.Validate(); err != nil {93p.metrics.gcpPushErrors.WithLabelValues("invalid_message").Inc()94level.Warn(p.logger).Log("msg", "invalid gcp push request", "err", err.Error())95http.Error(w, err.Error(), http.StatusBadRequest)96return97}9899entry, err := translate(pushMessage, p.Labels(), p.config.UseIncomingTimestamp, p.relabelConfigs, r.Header.Get("X-Scope-OrgID"))100if err != nil {101p.metrics.gcpPushErrors.WithLabelValues("translation").Inc()102level.Warn(p.logger).Log("msg", "failed to translate gcp push request", "err", err.Error())103http.Error(w, err.Error(), http.StatusBadRequest)104return105}106107level.Debug(p.logger).Log("msg", fmt.Sprintf("Received line: %s", entry.Line))108109if err := p.doSendEntry(ctx, entry); err != nil {110// NOTE: timeout errors can be tracked with from the metrics exposed by111// the spun weaveworks server.112// loki.source.gcplog.componentid_push_target_request_duration_seconds_count{status_code="503"}113level.Warn(p.logger).Log("msg", "error sending log entry", "err", err.Error())114http.Error(w, err.Error(), http.StatusServiceUnavailable)115return116}117118p.metrics.gcpPushEntries.WithLabelValues().Inc()119w.WriteHeader(http.StatusNoContent)120}121122func (p *PushTarget) doSendEntry(ctx context.Context, entry loki.Entry) error {123select {124// Timeout the loki.Entry channel send operation, which is the only blocking operation in the handler125case <-ctx.Done():126return fmt.Errorf("timeout exceeded: %w", ctx.Err())127case p.entries <- entry:128return nil129}130}131132// Labels return the model.LabelSet that the target applies to log entries.133func (p *PushTarget) Labels() model.LabelSet {134lbls := make(model.LabelSet, len(p.config.Labels))135for k, v := range p.config.Labels {136lbls[model.LabelName(k)] = model.LabelValue(v)137}138return lbls139}140141// Details returns some debug information about the target.142func (p *PushTarget) Details() map[string]string {143return map[string]string{144"strategy": "push",145"labels": p.Labels().String(),146"server_address": p.server.HTTPListenAddr(),147}148}149150// Stop shuts down the push target.151func (p *PushTarget) Stop() error {152level.Info(p.logger).Log("msg", "stopping gcp push target", "job", p.jobName)153p.server.StopAndShutdown()154p.handler.Stop()155return nil156}157158159