Path: blob/main/component/common/loki/client/client.go
4096 views
package client12// This code is copied from Promtail. The client package is used to configure3// and run the clients that can send log entries to a Loki instance.45import (6"bufio"7"bytes"8"context"9"crypto/sha256"10"errors"11"fmt"12"io"13"net/http"14"strconv"15"sync"16"time"1718"github.com/go-kit/log"19"github.com/go-kit/log/level"20"github.com/grafana/agent/component/common/loki"21"github.com/grafana/agent/pkg/build"22"github.com/grafana/dskit/backoff"23lokiutil "github.com/grafana/loki/pkg/util"24"github.com/prometheus/client_golang/prometheus"25"github.com/prometheus/common/config"26"github.com/prometheus/common/model"27"github.com/prometheus/prometheus/promql/parser"28)2930const (31contentType = "application/x-protobuf"32maxErrMsgLen = 10243334// Label reserved to override the tenant ID while processing35// pipeline stages36ReservedLabelTenantID = "__tenant_id__"3738LatencyLabel = "filename"39HostLabel = "host"40ClientLabel = "client"41)4243var UserAgent = fmt.Sprintf("GrafanaAgent/%s", build.Version)4445type Metrics struct {46encodedBytes *prometheus.CounterVec47sentBytes *prometheus.CounterVec48droppedBytes *prometheus.CounterVec49sentEntries *prometheus.CounterVec50droppedEntries *prometheus.CounterVec51requestDuration *prometheus.HistogramVec52batchRetries *prometheus.CounterVec53countersWithHost []*prometheus.CounterVec54streamLag *prometheus.GaugeVec55}5657func NewMetrics(reg prometheus.Registerer, streamLagLabels []string) *Metrics {58var m Metrics5960m.encodedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{61Name: "loki_write_encoded_bytes_total",62Help: "Number of bytes encoded and ready to send.",63}, []string{HostLabel})64m.sentBytes = prometheus.NewCounterVec(prometheus.CounterOpts{65Name: "loki_write_sent_bytes_total",66Help: "Number of bytes sent.",67}, []string{HostLabel})68m.droppedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{69Name: "loki_write_dropped_bytes_total",70Help: "Number of bytes dropped because failed to be sent to the ingester after all retries.",71}, []string{HostLabel})72m.sentEntries = prometheus.NewCounterVec(prometheus.CounterOpts{73Name: "loki_write_sent_entries_total",74Help: "Number of log entries sent to the ingester.",75}, []string{HostLabel})76m.droppedEntries = prometheus.NewCounterVec(prometheus.CounterOpts{77Name: "loki_write_dropped_entries_total",78Help: "Number of log entries dropped because failed to be sent to the ingester after all retries.",79}, []string{HostLabel})80m.requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{81Name: "loki_write_request_duration_seconds",82Help: "Duration of send requests.",83}, []string{"status_code", HostLabel})84m.batchRetries = prometheus.NewCounterVec(prometheus.CounterOpts{85Name: "loki_write_batch_retries_total",86Help: "Number of times batches has had to be retried.",87}, []string{HostLabel})8889m.countersWithHost = []*prometheus.CounterVec{90m.encodedBytes, m.sentBytes, m.droppedBytes, m.sentEntries, m.droppedEntries,91}9293streamLagLabelsMerged := []string{HostLabel, ClientLabel}94streamLagLabelsMerged = append(streamLagLabelsMerged, streamLagLabels...)95m.streamLag = prometheus.NewGaugeVec(prometheus.GaugeOpts{96Name: "loki_write_stream_lag_seconds",97Help: "Difference between current time and last batch timestamp for successful sends",98}, streamLagLabelsMerged)99100if reg != nil {101m.encodedBytes = mustRegisterOrGet(reg, m.encodedBytes).(*prometheus.CounterVec)102m.sentBytes = mustRegisterOrGet(reg, m.sentBytes).(*prometheus.CounterVec)103m.droppedBytes = mustRegisterOrGet(reg, m.droppedBytes).(*prometheus.CounterVec)104m.sentEntries = mustRegisterOrGet(reg, m.sentEntries).(*prometheus.CounterVec)105m.droppedEntries = mustRegisterOrGet(reg, m.droppedEntries).(*prometheus.CounterVec)106m.requestDuration = mustRegisterOrGet(reg, m.requestDuration).(*prometheus.HistogramVec)107m.batchRetries = mustRegisterOrGet(reg, m.batchRetries).(*prometheus.CounterVec)108m.streamLag = mustRegisterOrGet(reg, m.streamLag).(*prometheus.GaugeVec)109}110111return &m112}113114func mustRegisterOrGet(reg prometheus.Registerer, c prometheus.Collector) prometheus.Collector {115if err := reg.Register(c); err != nil {116if are, ok := err.(prometheus.AlreadyRegisteredError); ok {117return are.ExistingCollector118}119panic(err)120}121return c122}123124// Client pushes entries to Loki and can be stopped125type Client interface {126loki.EntryHandler127// Stop goroutine sending batch of entries without retries.128StopNow()129Name() string130}131132// Client for pushing logs in snappy-compressed protos over HTTP.133type client struct {134name string135metrics *Metrics136streamLagLabels []string137logger log.Logger138cfg Config139client *http.Client140entries chan loki.Entry141142once sync.Once143wg sync.WaitGroup144145externalLabels model.LabelSet146147// ctx is used in any upstream calls from the `client`.148ctx context.Context149cancel context.CancelFunc150maxStreams int151}152153// Tripperware can wrap a roundtripper.154type Tripperware func(http.RoundTripper) http.RoundTripper155156// New makes a new Client.157func New(metrics *Metrics, cfg Config, streamLagLabels []string, maxStreams int, logger log.Logger) (Client, error) {158if cfg.StreamLagLabels.String() != "" {159return nil, fmt.Errorf("client config stream_lag_labels is deprecated in favour of the config file options block field, and will be ignored: %+v", cfg.StreamLagLabels.String())160}161return newClient(metrics, cfg, streamLagLabels, maxStreams, logger)162}163164func newClient(metrics *Metrics, cfg Config, streamLagLabels []string, maxStreams int, logger log.Logger) (*client, error) {165if cfg.URL.URL == nil {166return nil, errors.New("client needs target URL")167}168169ctx, cancel := context.WithCancel(context.Background())170171c := &client{172logger: log.With(logger, "component", "client", "host", cfg.URL.Host),173cfg: cfg,174entries: make(chan loki.Entry),175metrics: metrics,176streamLagLabels: streamLagLabels,177name: asSha256(cfg),178179externalLabels: cfg.ExternalLabels.LabelSet,180ctx: ctx,181cancel: cancel,182maxStreams: maxStreams,183}184if cfg.Name != "" {185c.name = cfg.Name186}187188err := cfg.Client.Validate()189if err != nil {190return nil, err191}192193c.client, err = config.NewClientFromConfig(cfg.Client, "GrafanaAgent", config.WithHTTP2Disabled())194if err != nil {195return nil, err196}197198c.client.Timeout = cfg.Timeout199200// Initialize counters to 0 so the metrics are exported before the first201// occurrence of incrementing to avoid missing metrics.202for _, counter := range c.metrics.countersWithHost {203counter.WithLabelValues(c.cfg.URL.Host).Add(0)204}205206c.wg.Add(1)207go c.run()208return c, nil209}210211// NewWithTripperware creates a new Loki client with a custom tripperware.212func NewWithTripperware(metrics *Metrics, cfg Config, streamLagLabels []string, maxStreams int, logger log.Logger, tp Tripperware) (Client, error) {213c, err := newClient(metrics, cfg, streamLagLabels, maxStreams, logger)214if err != nil {215return nil, err216}217218if tp != nil {219c.client.Transport = tp(c.client.Transport)220}221222return c, nil223}224225func (c *client) run() {226batches := map[string]*batch{}227228// Given the client handles multiple batches (1 per tenant) and each batch229// can be created at a different point in time, we look for batches whose230// max wait time has been reached every 10 times per BatchWait, so that the231// maximum delay we have sending batches is 10% of the max waiting time.232// We apply a cap of 10ms to the ticker, to avoid too frequent checks in233// case the BatchWait is very low.234minWaitCheckFrequency := 10 * time.Millisecond235maxWaitCheckFrequency := c.cfg.BatchWait / 10236if maxWaitCheckFrequency < minWaitCheckFrequency {237maxWaitCheckFrequency = minWaitCheckFrequency238}239240maxWaitCheck := time.NewTicker(maxWaitCheckFrequency)241242defer func() {243maxWaitCheck.Stop()244// Send all pending batches245for tenantID, batch := range batches {246c.sendBatch(tenantID, batch)247}248249c.wg.Done()250}()251252for {253select {254case e, ok := <-c.entries:255if !ok {256return257}258e, tenantID := c.processEntry(e)259batch, ok := batches[tenantID]260261// If the batch doesn't exist yet, we create a new one with the entry262if !ok {263batches[tenantID] = newBatch(c.maxStreams, e)264break265}266267// If adding the entry to the batch will increase the size over the max268// size allowed, we do send the current batch and then create a new one269if batch.sizeBytesAfter(e) > c.cfg.BatchSize {270c.sendBatch(tenantID, batch)271272batches[tenantID] = newBatch(c.maxStreams, e)273break274}275276// The max size of the batch isn't reached, so we can add the entry277err := batch.add(e)278if err != nil {279level.Error(c.logger).Log("msg", "batch add err", "error", err)280c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host).Inc()281return282}283case <-maxWaitCheck.C:284// Send all batches whose max wait time has been reached285for tenantID, batch := range batches {286if batch.age() < c.cfg.BatchWait {287continue288}289290c.sendBatch(tenantID, batch)291delete(batches, tenantID)292}293}294}295}296297func (c *client) Chan() chan<- loki.Entry {298return c.entries299}300301func asSha256(o interface{}) string {302h := sha256.New()303h.Write([]byte(fmt.Sprintf("%v", o)))304305temp := fmt.Sprintf("%x", h.Sum(nil))306return temp[:6]307}308309func (c *client) sendBatch(tenantID string, batch *batch) {310buf, entriesCount, err := batch.encode()311if err != nil {312level.Error(c.logger).Log("msg", "error encoding batch", "error", err)313return314}315bufBytes := float64(len(buf))316c.metrics.encodedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)317318backoff := backoff.New(c.ctx, c.cfg.BackoffConfig)319var status int320for {321start := time.Now()322// send uses `timeout` internally, so `context.Background` is good enough.323status, err = c.send(context.Background(), tenantID, buf)324325c.metrics.requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds())326327if err == nil {328c.metrics.sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)329c.metrics.sentEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))330for _, s := range batch.streams {331lbls, err := parser.ParseMetric(s.Labels)332if err != nil {333// is this possible?334level.Warn(c.logger).Log("msg", "error converting stream label string to label.Labels, cannot update lagging metric", "error", err)335return336}337338//nolint:staticcheck339lblSet := make(prometheus.Labels)340for _, lbl := range c.streamLagLabels {341// label from streamLagLabels may not be found but we still need an empty value342// so that the prometheus client library doesn't panic on inconsistent label cardinality343value := ""344for i := range lbls {345if lbls[i].Name == lbl {346value = lbls[i].Value347}348}349lblSet[lbl] = value350}351352//nolint:staticcheck353if lblSet != nil {354// always set host355lblSet[HostLabel] = c.cfg.URL.Host356// also set client name since if we have multiple357// loki_write clients configured we will run into a358// duplicate metric collected with same labels error when359// trying to hit the /metrics endpoint360lblSet[ClientLabel] = c.name361c.metrics.streamLag.With(lblSet).Set(time.Since(s.Entries[len(s.Entries)-1].Timestamp).Seconds())362}363}364return365}366367// Only retry 429s, 500s and connection-level errors.368if status > 0 && status != 429 && status/100 != 5 {369break370}371372level.Warn(c.logger).Log("msg", "error sending batch, will retry", "status", status, "error", err)373c.metrics.batchRetries.WithLabelValues(c.cfg.URL.Host).Inc()374backoff.Wait()375376// Make sure it sends at least once before checking for retry.377if !backoff.Ongoing() {378break379}380}381382if err != nil {383level.Error(c.logger).Log("msg", "final error sending batch", "status", status, "error", err)384c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)385c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))386}387}388389func (c *client) send(ctx context.Context, tenantID string, buf []byte) (int, error) {390ctx, cancel := context.WithTimeout(ctx, c.cfg.Timeout)391defer cancel()392req, err := http.NewRequest("POST", c.cfg.URL.String(), bytes.NewReader(buf))393if err != nil {394return -1, err395}396req = req.WithContext(ctx)397req.Header.Set("Content-Type", contentType)398req.Header.Set("User-Agent", UserAgent)399400// If the tenant ID is not empty, the component is running in multi-tenant401// mode, so we should send it to Loki402if tenantID != "" {403req.Header.Set("X-Scope-OrgID", tenantID)404}405406resp, err := c.client.Do(req)407if err != nil {408return -1, err409}410defer lokiutil.LogError("closing response body", resp.Body.Close)411412if resp.StatusCode/100 != 2 {413scanner := bufio.NewScanner(io.LimitReader(resp.Body, maxErrMsgLen))414line := ""415if scanner.Scan() {416line = scanner.Text()417}418err = fmt.Errorf("server returned HTTP status %s (%d): %s", resp.Status, resp.StatusCode, line)419}420return resp.StatusCode, err421}422423func (c *client) getTenantID(labels model.LabelSet) string {424// Check if it has been overridden while processing the pipeline stages425if value, ok := labels[ReservedLabelTenantID]; ok {426return string(value)427}428429// Check if has been specified in the config430if c.cfg.TenantID != "" {431return c.cfg.TenantID432}433434// Defaults to an empty string, which means the X-Scope-OrgID header435// will not be sent436return ""437}438439// Stop the client.440func (c *client) Stop() {441c.once.Do(func() { close(c.entries) })442c.wg.Wait()443}444445// StopNow stops the client without retries446func (c *client) StopNow() {447// cancel will stop retrying http requests.448c.cancel()449c.Stop()450}451452func (c *client) processEntry(e loki.Entry) (loki.Entry, string) {453if len(c.externalLabels) > 0 {454e.Labels = c.externalLabels.Merge(e.Labels)455}456tenantID := c.getTenantID(e.Labels)457return e, tenantID458}459460func (c *client) UnregisterLatencyMetric(labels prometheus.Labels) {461labels[HostLabel] = c.cfg.URL.Host462c.metrics.streamLag.Delete(labels)463}464465func (c *client) Name() string {466return c.name467}468469470