Path: blob/main/component/common/loki/client/client_test.go
4096 views
package client12// This code is copied from Promtail. The client package is used to configure3// and run the clients that can send log entries to a Loki instance.45import (6"io"7"math"8"net/http"9"net/http/httptest"10"net/url"11"strings"12"testing"13"time"1415"github.com/go-kit/log"16"github.com/grafana/agent/component/common/loki"17"github.com/grafana/dskit/backoff"18"github.com/grafana/dskit/flagext"19"github.com/grafana/loki/pkg/logproto"20"github.com/grafana/loki/pkg/util"21lokiflag "github.com/grafana/loki/pkg/util/flagext"22"github.com/prometheus/client_golang/prometheus"23"github.com/prometheus/client_golang/prometheus/testutil"24"github.com/prometheus/common/config"25"github.com/prometheus/common/model"26"github.com/stretchr/testify/assert"27"github.com/stretchr/testify/require"28)2930var logEntries = []loki.Entry{31{Labels: model.LabelSet{}, Entry: logproto.Entry{Timestamp: time.Unix(1, 0).UTC(), Line: "line1"}},32{Labels: model.LabelSet{}, Entry: logproto.Entry{Timestamp: time.Unix(2, 0).UTC(), Line: "line2"}},33{Labels: model.LabelSet{}, Entry: logproto.Entry{Timestamp: time.Unix(3, 0).UTC(), Line: "line3"}},34{Labels: model.LabelSet{"__tenant_id__": "tenant-1"}, Entry: logproto.Entry{Timestamp: time.Unix(4, 0).UTC(), Line: "line4"}},35{Labels: model.LabelSet{"__tenant_id__": "tenant-1"}, Entry: logproto.Entry{Timestamp: time.Unix(5, 0).UTC(), Line: "line5"}},36{Labels: model.LabelSet{"__tenant_id__": "tenant-2"}, Entry: logproto.Entry{Timestamp: time.Unix(6, 0).UTC(), Line: "line6"}},37}3839type receivedReq struct {40tenantID string41pushReq logproto.PushRequest42}4344func TestClient_Handle(t *testing.T) {45tests := map[string]struct {46clientBatchSize int47clientBatchWait time.Duration48clientMaxRetries int49clientTenantID string50serverResponseStatus int51inputEntries []loki.Entry52inputDelay time.Duration53expectedReqs []receivedReq54expectedMetrics string55}{56"batch log entries together until the batch size is reached": {57clientBatchSize: 10,58clientBatchWait: 100 * time.Millisecond,59clientMaxRetries: 3,60serverResponseStatus: 200,61inputEntries: []loki.Entry{logEntries[0], logEntries[1], logEntries[2]},62expectedReqs: []receivedReq{63{64tenantID: "",65pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry, logEntries[1].Entry}}}},66},67{68tenantID: "",69pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[2].Entry}}}},70},71},72expectedMetrics: `73# HELP loki_write_sent_entries_total Number of log entries sent to the ingester.74# TYPE loki_write_sent_entries_total counter75loki_write_sent_entries_total{host="__HOST__"} 3.076# HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.77# TYPE loki_write_dropped_entries_total counter78loki_write_dropped_entries_total{host="__HOST__"} 079`,80},81"batch log entries together until the batch wait time is reached": {82clientBatchSize: 10,83clientBatchWait: 100 * time.Millisecond,84clientMaxRetries: 3,85serverResponseStatus: 200,86inputEntries: []loki.Entry{logEntries[0], logEntries[1]},87inputDelay: 110 * time.Millisecond,88expectedReqs: []receivedReq{89{90tenantID: "",91pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},92},93{94tenantID: "",95pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[1].Entry}}}},96},97},98expectedMetrics: `99# HELP loki_write_sent_entries_total Number of log entries sent to the ingester.100# TYPE loki_write_sent_entries_total counter101loki_write_sent_entries_total{host="__HOST__"} 2.0102# HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.103# TYPE loki_write_dropped_entries_total counter104loki_write_dropped_entries_total{host="__HOST__"} 0105`,106},107"retry send a batch up to backoff's max retries in case the server responds with a 5xx": {108clientBatchSize: 10,109clientBatchWait: 10 * time.Millisecond,110clientMaxRetries: 3,111serverResponseStatus: 500,112inputEntries: []loki.Entry{logEntries[0]},113expectedReqs: []receivedReq{114{115tenantID: "",116pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},117},118{119tenantID: "",120pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},121},122{123tenantID: "",124pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},125},126},127expectedMetrics: `128# HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.129# TYPE loki_write_dropped_entries_total counter130loki_write_dropped_entries_total{host="__HOST__"} 1.0131# HELP loki_write_sent_entries_total Number of log entries sent to the ingester.132# TYPE loki_write_sent_entries_total counter133loki_write_sent_entries_total{host="__HOST__"} 0134`,135},136"do not retry send a batch in case the server responds with a 4xx": {137clientBatchSize: 10,138clientBatchWait: 10 * time.Millisecond,139clientMaxRetries: 3,140serverResponseStatus: 400,141inputEntries: []loki.Entry{logEntries[0]},142expectedReqs: []receivedReq{143{144tenantID: "",145pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},146},147},148expectedMetrics: `149# HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.150# TYPE loki_write_dropped_entries_total counter151loki_write_dropped_entries_total{host="__HOST__"} 1.0152# HELP loki_write_sent_entries_total Number of log entries sent to the ingester.153# TYPE loki_write_sent_entries_total counter154loki_write_sent_entries_total{host="__HOST__"} 0155`,156},157"do retry sending a batch in case the server responds with a 429": {158clientBatchSize: 10,159clientBatchWait: 10 * time.Millisecond,160clientMaxRetries: 3,161serverResponseStatus: 429,162inputEntries: []loki.Entry{logEntries[0]},163expectedReqs: []receivedReq{164{165tenantID: "",166pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},167},168{169tenantID: "",170pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},171},172{173tenantID: "",174pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},175},176},177expectedMetrics: `178# HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.179# TYPE loki_write_dropped_entries_total counter180loki_write_dropped_entries_total{host="__HOST__"} 1.0181# HELP loki_write_sent_entries_total Number of log entries sent to the ingester.182# TYPE loki_write_sent_entries_total counter183loki_write_sent_entries_total{host="__HOST__"} 0184`,185},186"batch log entries together honoring the client tenant ID": {187clientBatchSize: 100,188clientBatchWait: 100 * time.Millisecond,189clientMaxRetries: 3,190clientTenantID: "tenant-default",191serverResponseStatus: 200,192inputEntries: []loki.Entry{logEntries[0], logEntries[1]},193expectedReqs: []receivedReq{194{195tenantID: "tenant-default",196pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry, logEntries[1].Entry}}}},197},198},199expectedMetrics: `200# HELP loki_write_sent_entries_total Number of log entries sent to the ingester.201# TYPE loki_write_sent_entries_total counter202loki_write_sent_entries_total{host="__HOST__"} 2.0203# HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.204# TYPE loki_write_dropped_entries_total counter205loki_write_dropped_entries_total{host="__HOST__"} 0206`,207},208"batch log entries together honoring the tenant ID overridden while processing the pipeline stages": {209clientBatchSize: 100,210clientBatchWait: 100 * time.Millisecond,211clientMaxRetries: 3,212clientTenantID: "tenant-default",213serverResponseStatus: 200,214inputEntries: []loki.Entry{logEntries[0], logEntries[3], logEntries[4], logEntries[5]},215expectedReqs: []receivedReq{216{217tenantID: "tenant-default",218pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},219},220{221tenantID: "tenant-1",222pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[3].Entry, logEntries[4].Entry}}}},223},224{225tenantID: "tenant-2",226pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[5].Entry}}}},227},228},229expectedMetrics: `230# HELP loki_write_sent_entries_total Number of log entries sent to the ingester.231# TYPE loki_write_sent_entries_total counter232loki_write_sent_entries_total{host="__HOST__"} 4.0233# HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.234# TYPE loki_write_dropped_entries_total counter235loki_write_dropped_entries_total{host="__HOST__"} 0236`,237},238}239240for testName, testData := range tests {241t.Run(testName, func(t *testing.T) {242reg := prometheus.NewRegistry()243244// Create a buffer channel where we do enqueue received requests245receivedReqsChan := make(chan receivedReq, 10)246247// Start a local HTTP server248server := httptest.NewServer(createServerHandler(receivedReqsChan, testData.serverResponseStatus))249require.NotNil(t, server)250defer server.Close()251252// Get the URL at which the local test server is listening to253serverURL := flagext.URLValue{}254err := serverURL.Set(server.URL)255require.NoError(t, err)256257// Instance the client258cfg := Config{259URL: serverURL,260BatchWait: testData.clientBatchWait,261BatchSize: testData.clientBatchSize,262Client: config.HTTPClientConfig{},263BackoffConfig: backoff.Config{MinBackoff: 1 * time.Millisecond, MaxBackoff: 2 * time.Millisecond, MaxRetries: testData.clientMaxRetries},264ExternalLabels: lokiflag.LabelSet{},265Timeout: 1 * time.Second,266TenantID: testData.clientTenantID,267}268269m := NewMetrics(reg, nil)270c, err := New(m, cfg, nil, 0, log.NewNopLogger())271require.NoError(t, err)272273// Send all the input log entries274for i, logEntry := range testData.inputEntries {275c.Chan() <- logEntry276277if testData.inputDelay > 0 && i < len(testData.inputEntries)-1 {278time.Sleep(testData.inputDelay)279}280}281282// Wait until the expected push requests are received (with a timeout)283deadline := time.Now().Add(1 * time.Second)284for len(receivedReqsChan) < len(testData.expectedReqs) && time.Now().Before(deadline) {285time.Sleep(5 * time.Millisecond)286}287288// Stop the client: it waits until the current batch is sent289c.Stop()290close(receivedReqsChan)291292// Get all push requests received on the server side293receivedReqs := make([]receivedReq, 0)294for req := range receivedReqsChan {295receivedReqs = append(receivedReqs, req)296}297298// Due to implementation details (maps iteration ordering is random) we just check299// that the expected requests are equal to the received requests, without checking300// the exact order which is not guaranteed in case of multi-tenant301require.ElementsMatch(t, testData.expectedReqs, receivedReqs)302303expectedMetrics := strings.Replace(testData.expectedMetrics, "__HOST__", serverURL.Host, -1)304err = testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "loki_write_sent_entries_total", "loki_write_dropped_entries_total")305assert.NoError(t, err)306})307}308}309310func TestClient_StopNow(t *testing.T) {311cases := []struct {312name string313clientBatchSize int314clientBatchWait time.Duration315clientMaxRetries int316clientTenantID string317serverResponseStatus int318inputEntries []loki.Entry319inputDelay time.Duration320expectedReqs []receivedReq321expectedMetrics string322}{323{324name: "send requests shouldn't be cancelled after StopNow()",325clientBatchSize: 10,326clientBatchWait: 100 * time.Millisecond,327clientMaxRetries: 3,328serverResponseStatus: 200,329inputEntries: []loki.Entry{logEntries[0], logEntries[1], logEntries[2]},330expectedReqs: []receivedReq{331{332tenantID: "",333pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry, logEntries[1].Entry}}}},334},335{336tenantID: "",337pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[2].Entry}}}},338},339},340expectedMetrics: `341# HELP loki_write_sent_entries_total Number of log entries sent to the ingester.342# TYPE loki_write_sent_entries_total counter343loki_write_sent_entries_total{host="__HOST__"} 3.0344# HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.345# TYPE loki_write_dropped_entries_total counter346loki_write_dropped_entries_total{host="__HOST__"} 0347`,348},349{350name: "shouldn't retry after StopNow()",351clientBatchSize: 10,352clientBatchWait: 10 * time.Millisecond,353clientMaxRetries: 3,354serverResponseStatus: 429,355inputEntries: []loki.Entry{logEntries[0]},356expectedReqs: []receivedReq{357{358tenantID: "",359pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},360},361},362expectedMetrics: `363# HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.364# TYPE loki_write_dropped_entries_total counter365loki_write_dropped_entries_total{host="__HOST__"} 1.0366# HELP loki_write_sent_entries_total Number of log entries sent to the ingester.367# TYPE loki_write_sent_entries_total counter368loki_write_sent_entries_total{host="__HOST__"} 0369`,370},371}372373for _, c := range cases {374t.Run(c.name, func(t *testing.T) {375reg := prometheus.NewRegistry()376377// Create a buffer channel where we do enqueue received requests378receivedReqsChan := make(chan receivedReq, 10)379380// Start a local HTTP server381server := httptest.NewServer(createServerHandler(receivedReqsChan, c.serverResponseStatus))382require.NotNil(t, server)383defer server.Close()384385// Get the URL at which the local test server is listening to386serverURL := flagext.URLValue{}387err := serverURL.Set(server.URL)388require.NoError(t, err)389390// Instance the client391cfg := Config{392URL: serverURL,393BatchWait: c.clientBatchWait,394BatchSize: c.clientBatchSize,395Client: config.HTTPClientConfig{},396BackoffConfig: backoff.Config{MinBackoff: 5 * time.Second, MaxBackoff: 10 * time.Second, MaxRetries: c.clientMaxRetries},397ExternalLabels: lokiflag.LabelSet{},398Timeout: 1 * time.Second,399TenantID: c.clientTenantID,400}401m := NewMetrics(reg, nil)402cl, err := New(m, cfg, nil, 0, log.NewNopLogger())403require.NoError(t, err)404405// Send all the input log entries406for i, logEntry := range c.inputEntries {407cl.Chan() <- logEntry408409if c.inputDelay > 0 && i < len(c.inputEntries)-1 {410time.Sleep(c.inputDelay)411}412}413414// Wait until the expected push requests are received (with a timeout)415deadline := time.Now().Add(1 * time.Second)416for len(receivedReqsChan) < len(c.expectedReqs) && time.Now().Before(deadline) {417time.Sleep(5 * time.Millisecond)418}419420// StopNow should have cancelled client's ctx421cc := cl.(*client)422require.NoError(t, cc.ctx.Err())423424// Stop the client: it waits until the current batch is sent425cl.StopNow()426close(receivedReqsChan)427428require.Error(t, cc.ctx.Err()) // non-nil error if its cancelled.429430// Get all push requests received on the server side431receivedReqs := make([]receivedReq, 0)432for req := range receivedReqsChan {433receivedReqs = append(receivedReqs, req)434}435436// Due to implementation details (maps iteration ordering is random) we just check437// that the expected requests are equal to the received requests, without checking438// the exact order which is not guaranteed in case of multi-tenant439require.ElementsMatch(t, c.expectedReqs, receivedReqs)440441expectedMetrics := strings.Replace(c.expectedMetrics, "__HOST__", serverURL.Host, -1)442err = testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "loki_write_sent_entries_total", "loki_write_dropped_entries_total")443assert.NoError(t, err)444})445}446}447448func createServerHandler(receivedReqsChan chan receivedReq, status int) http.HandlerFunc {449return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {450// Parse the request451var pushReq logproto.PushRequest452if err := util.ParseProtoReader(req.Context(), req.Body, int(req.ContentLength), math.MaxInt32, &pushReq, util.RawSnappy); err != nil {453rw.WriteHeader(500)454return455}456457receivedReqsChan <- receivedReq{458tenantID: req.Header.Get("X-Scope-OrgID"),459pushReq: pushReq,460}461462rw.WriteHeader(status)463})464}465466type RoundTripperFunc func(*http.Request) (*http.Response, error)467468func (r RoundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) {469return r(req)470}471472func Test_Tripperware(t *testing.T) {473url, err := url.Parse("http://foo.com")474require.NoError(t, err)475var called bool476c, err := NewWithTripperware(metrics, Config{477URL: flagext.URLValue{URL: url},478}, nil, 0, log.NewNopLogger(), func(rt http.RoundTripper) http.RoundTripper {479return RoundTripperFunc(func(r *http.Request) (*http.Response, error) {480require.Equal(t, r.URL.String(), "http://foo.com")481called = true482return &http.Response{483StatusCode: http.StatusOK,484Body: io.NopCloser(strings.NewReader("ok")),485}, nil486})487})488require.NoError(t, err)489490c.Chan() <- loki.Entry{491Labels: model.LabelSet{"foo": "bar"},492Entry: logproto.Entry{Timestamp: time.Now(), Line: "foo"},493}494c.Stop()495require.True(t, called)496}497498499