Path: blob/main/component/loki/source/api/api_test.go
4096 views
package api12import (3"context"4"fmt"5"net/http"6"testing"7"time"89"github.com/phayes/freeport"1011"github.com/grafana/agent/component"12"github.com/grafana/agent/component/common/loki"13"github.com/grafana/agent/component/common/loki/client"14"github.com/grafana/agent/component/common/loki/client/fake"15"github.com/grafana/agent/component/common/net"16"github.com/grafana/agent/component/common/relabel"17"github.com/grafana/agent/pkg/util"18"github.com/grafana/dskit/flagext"19"github.com/grafana/loki/pkg/logproto"20"github.com/grafana/regexp"21"github.com/prometheus/client_golang/prometheus"22"github.com/prometheus/common/model"23"github.com/stretchr/testify/assert"24"github.com/stretchr/testify/require"25)2627func TestLokiSourceAPI_Simple(t *testing.T) {28ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)29defer cancel()3031receiver := fake.NewClient(func() {})32defer receiver.Stop()3334args := testArgsWith(t, func(a *Arguments) {35a.Server.HTTP.ListenPort = 853236a.ForwardTo = []loki.LogsReceiver{receiver.LogsReceiver()}37a.UseIncomingTimestamp = true38})39opts := defaultOptions(t)40_, shutdown := startTestComponent(t, opts, args, ctx)41defer shutdown()4243lokiClient := newTestLokiClient(t, args, opts)44defer lokiClient.Stop()4546now := time.Now()47select {48case lokiClient.Chan() <- loki.Entry{49Labels: map[model.LabelName]model.LabelValue{"source": "test"},50Entry: logproto.Entry{Timestamp: now, Line: "hello world!"},51}:52case <-ctx.Done():53t.Fatalf("timed out while sending test entries via loki client")54}5556require.Eventually(57t,58func() bool { return len(receiver.Received()) == 1 },595*time.Second,6010*time.Millisecond,61"did not receive the forwarded message within the timeout",62)63received := receiver.Received()[0]64assert.Equal(t, received.Line, "hello world!")65assert.Equal(t, received.Timestamp.Unix(), now.Unix())66assert.Equal(t, received.Labels, model.LabelSet{67"source": "test",68"foo": "bar",69"fizz": "buzz",70})71}7273func TestLokiSourceAPI_Update(t *testing.T) {74ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)75defer cancel()7677receiver := fake.NewClient(func() {})78defer receiver.Stop()7980args := testArgsWith(t, func(a *Arguments) {81a.Server.HTTP.ListenPort = 858382a.ForwardTo = []loki.LogsReceiver{receiver.LogsReceiver()}83a.UseIncomingTimestamp = true84a.Labels = map[string]string{"test_label": "before"}85})86opts := defaultOptions(t)87c, shutdown := startTestComponent(t, opts, args, ctx)88defer shutdown()8990lokiClient := newTestLokiClient(t, args, opts)91defer lokiClient.Stop()9293now := time.Now()94select {95case lokiClient.Chan() <- loki.Entry{96Labels: map[model.LabelName]model.LabelValue{"source": "test"},97Entry: logproto.Entry{Timestamp: now, Line: "hello world!"},98}:99case <-ctx.Done():100t.Fatalf("timed out while sending test entries via loki client")101}102103require.Eventually(104t,105func() bool { return len(receiver.Received()) == 1 },1065*time.Second,10710*time.Millisecond,108"did not receive the forwarded message within the timeout",109)110received := receiver.Received()[0]111assert.Equal(t, received.Line, "hello world!")112assert.Equal(t, received.Timestamp.Unix(), now.Unix())113assert.Equal(t, received.Labels, model.LabelSet{114"test_label": "before",115"source": "test",116})117118args.Labels = map[string]string{"test_label": "after"}119err := c.Update(args)120require.NoError(t, err)121122receiver.Clear()123124select {125case lokiClient.Chan() <- loki.Entry{126Labels: map[model.LabelName]model.LabelValue{"source": "test"},127Entry: logproto.Entry{Timestamp: now, Line: "hello brave new world!"},128}:129case <-ctx.Done():130t.Fatalf("timed out while sending test entries via loki client")131}132require.Eventually(133t,134func() bool { return len(receiver.Received()) == 1 },1355*time.Second,13610*time.Millisecond,137"did not receive the forwarded message within the timeout",138)139received = receiver.Received()[0]140assert.Equal(t, received.Line, "hello brave new world!")141assert.Equal(t, received.Timestamp.Unix(), now.Unix())142assert.Equal(t, received.Labels, model.LabelSet{143"test_label": "after",144"source": "test",145})146}147148func TestLokiSourceAPI_FanOut(t *testing.T) {149ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)150defer cancel()151152const receiversCount = 10153var receivers = make([]*fake.Client, receiversCount)154for i := 0; i < receiversCount; i++ {155receivers[i] = fake.NewClient(func() {})156}157158args := testArgsWith(t, func(a *Arguments) {159a.Server.HTTP.ListenPort = 8537160a.ForwardTo = mapToChannels(receivers)161})162opts := defaultOptions(t)163164comp, err := New(opts, args)165require.NoError(t, err)166go func() {167err := comp.Run(ctx)168require.NoError(t, err)169}()170171lokiClient := newTestLokiClient(t, args, opts)172defer lokiClient.Stop()173174const messagesCount = 100175for i := 0; i < messagesCount; i++ {176entry := loki.Entry{177Labels: map[model.LabelName]model.LabelValue{"source": "test"},178Entry: logproto.Entry{Line: fmt.Sprintf("test message #%d", i)},179}180select {181case lokiClient.Chan() <- entry:182case <-ctx.Done():183t.Log("timed out while sending test entries via loki client")184}185}186187require.Eventually(188t,189func() bool {190for i := 0; i < receiversCount; i++ {191if len(receivers[i].Received()) != messagesCount {192return false193}194}195return true196},1975*time.Second,19810*time.Millisecond,199"did not receive all the expected messages within the timeout",200)201}202203func TestComponent_detectsWhenUpdateRequiresARestart(t *testing.T) {204httpPort := getFreePort(t)205grpcPort := getFreePort(t)206tests := []struct {207name string208args Arguments209newArgs Arguments210restartRequired bool211}{212{213name: "identical args don't require server restart",214args: testArgsWithPorts(httpPort, grpcPort),215newArgs: testArgsWithPorts(httpPort, grpcPort),216restartRequired: false,217},218{219name: "change in address requires server restart",220args: testArgsWithPorts(httpPort, grpcPort),221newArgs: testArgsWith(t, func(args *Arguments) {222args.Server.HTTP.ListenAddress = "localhost"223args.Server.HTTP.ListenPort = httpPort224args.Server.GRPC.ListenPort = grpcPort225}),226restartRequired: true,227},228{229name: "change in port requires server restart",230args: testArgsWithPorts(httpPort, grpcPort),231newArgs: testArgsWithPorts(getFreePort(t), grpcPort),232restartRequired: true,233},234{235name: "change in forwardTo does not require server restart",236args: testArgsWithPorts(httpPort, grpcPort),237newArgs: testArgsWith(t, func(args *Arguments) {238args.ForwardTo = []loki.LogsReceiver{}239args.Server.HTTP.ListenPort = httpPort240args.Server.GRPC.ListenPort = grpcPort241}),242restartRequired: false,243},244{245name: "change in labels does not require server restart",246args: testArgsWithPorts(httpPort, grpcPort),247newArgs: testArgsWith(t, func(args *Arguments) {248args.Labels = map[string]string{"some": "label"}249args.Server.HTTP.ListenPort = httpPort250args.Server.GRPC.ListenPort = grpcPort251}),252restartRequired: false,253},254{255name: "change in relabel rules does not require server restart",256args: testArgsWithPorts(httpPort, grpcPort),257newArgs: testArgsWith(t, func(args *Arguments) {258args.RelabelRules = relabel.Rules{}259args.Server.HTTP.ListenPort = httpPort260args.Server.GRPC.ListenPort = grpcPort261}),262restartRequired: false,263},264{265name: "change in use incoming timestamp does not require server restart",266args: testArgsWithPorts(httpPort, grpcPort),267newArgs: testArgsWith(t, func(args *Arguments) {268args.UseIncomingTimestamp = !args.UseIncomingTimestamp269args.Server.HTTP.ListenPort = httpPort270args.Server.GRPC.ListenPort = grpcPort271}),272restartRequired: false,273},274}275for _, tc := range tests {276t.Run(tc.name, func(t *testing.T) {277comp, err := New(278defaultOptions(t),279tc.args,280)281require.NoError(t, err)282283c, ok := comp.(*Component)284require.True(t, ok)285286// in order to cleanly update, we want to make sure the server is running first.287waitForServerToBeReady(t, c)288289serverBefore := c.server290err = c.Update(tc.newArgs)291require.NoError(t, err)292293restarted := serverBefore != c.server294assert.Equal(t, restarted, tc.restartRequired)295296// in order to cleanly shutdown, we want to make sure the server is running first.297waitForServerToBeReady(t, c)298c.stop()299})300}301}302303func TestDefaultServerConfig(t *testing.T) {304args := testArgs(t)305args.Server = nil // user did not define server options306307comp, err := New(308defaultOptions(t),309args,310)311require.NoError(t, err)312313c, ok := comp.(*Component)314require.True(t, ok)315316require.Eventuallyf(t, func() bool {317resp, err := http.Get(fmt.Sprintf(318"http://%v:%d/wrong/url",319"localhost",320net.DefaultHTTPPort,321))322return err == nil && resp.StatusCode == 404323}, 5*time.Second, 20*time.Millisecond, "server failed to start before timeout")324325c.stop()326}327328func startTestComponent(329t *testing.T,330opts component.Options,331args Arguments,332ctx context.Context,333) (component.Component, func()) {334335comp, err := New(opts, args)336require.NoError(t, err)337go func() {338err := comp.Run(ctx)339require.NoError(t, err)340}()341342c, ok := comp.(*Component)343require.True(t, ok)344345return comp, func() {346// in order to cleanly shutdown, we want to make sure the server is running first.347waitForServerToBeReady(t, c)348c.stop()349}350}351352func waitForServerToBeReady(t *testing.T, comp *Component) {353require.Eventuallyf(t, func() bool {354resp, err := http.Get(fmt.Sprintf(355"http://%v:%d/wrong/url",356comp.server.ServerConfig().HTTP.ListenAddress,357comp.server.ServerConfig().HTTP.ListenPort,358))359return err == nil && resp.StatusCode == 404360}, 5*time.Second, 20*time.Millisecond, "server failed to start before timeout")361}362363func mapToChannels(clients []*fake.Client) []loki.LogsReceiver {364channels := make([]loki.LogsReceiver, len(clients))365for i := 0; i < len(clients); i++ {366channels[i] = clients[i].LogsReceiver()367}368return channels369}370371func newTestLokiClient(t *testing.T, args Arguments, opts component.Options) client.Client {372url := flagext.URLValue{}373err := url.Set(fmt.Sprintf(374"http://%s:%d/api/v1/push",375args.Server.HTTP.ListenAddress,376args.Server.HTTP.ListenPort,377))378require.NoError(t, err)379380lokiClient, err := client.New(381client.NewMetrics(nil, nil),382client.Config{383URL: url,384Timeout: 5 * time.Second,385},386[]string{},3870,388opts.Logger,389)390require.NoError(t, err)391return lokiClient392}393394func defaultOptions(t *testing.T) component.Options {395return component.Options{396ID: "loki.source.api.test",397Logger: util.TestFlowLogger(t),398Registerer: prometheus.NewRegistry(),399}400}401402func testArgsWith(t *testing.T, mutator func(arguments *Arguments)) Arguments {403a := testArgs(t)404mutator(&a)405return a406}407408func testArgs(t *testing.T) Arguments {409return testArgsWithPorts(getFreePort(t), getFreePort(t))410}411412func testArgsWithPorts(httpPort int, grpcPort int) Arguments {413return Arguments{414Server: &net.ServerConfig{415HTTP: &net.HTTPConfig{416ListenAddress: "127.0.0.1",417ListenPort: httpPort,418},419GRPC: &net.GRPCConfig{420ListenAddress: "127.0.0.1",421ListenPort: grpcPort,422},423},424ForwardTo: []loki.LogsReceiver{make(chan loki.Entry), make(chan loki.Entry)},425Labels: map[string]string{"foo": "bar", "fizz": "buzz"},426RelabelRules: relabel.Rules{427{428SourceLabels: []string{"tag"},429Regex: relabel.Regexp{Regexp: regexp.MustCompile("ignore")},430Action: relabel.Drop,431},432},433UseIncomingTimestamp: false,434}435}436437func getFreePort(t *testing.T) int {438port, err := freeport.GetFreePort()439require.NoError(t, err)440return port441}442443444