Path: blob/main/component/loki/source/file/file_test.go
4096 views
//go:build !race12package file34import (5"context"6"errors"7"log"8"os"9"path/filepath"10"testing"11"time"1213"github.com/grafana/agent/component"14"github.com/grafana/agent/component/common/loki"15"github.com/grafana/agent/component/discovery"16"github.com/grafana/agent/pkg/flow/componenttest"17"github.com/grafana/agent/pkg/util"18"github.com/prometheus/client_golang/prometheus"19"github.com/prometheus/common/model"20"github.com/stretchr/testify/require"21"go.uber.org/goleak"22)2324func Test(t *testing.T) {25defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))2627ctx, cancel := context.WithCancel(componenttest.TestContext(t))28defer cancel()2930// Create file to log to.31f, err := os.CreateTemp(t.TempDir(), "example")32require.NoError(t, err)33defer f.Close()3435ctrl, err := componenttest.NewControllerFromID(util.TestLogger(t), "loki.source.file")36require.NoError(t, err)3738ch1, ch2 := make(chan loki.Entry), make(chan loki.Entry)3940go func() {41err := ctrl.Run(ctx, Arguments{42Targets: []discovery.Target{{43"__path__": f.Name(),44"foo": "bar",45}},46ForwardTo: []loki.LogsReceiver{ch1, ch2},47})48require.NoError(t, err)49}()5051ctrl.WaitRunning(time.Minute)5253_, err = f.Write([]byte("writing some text\n"))54require.NoError(t, err)5556wantLabelSet := model.LabelSet{57"filename": model.LabelValue(f.Name()),58"foo": "bar",59}6061for i := 0; i < 2; i++ {62select {63case logEntry := <-ch1:64require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)65require.Equal(t, "writing some text", logEntry.Line)66require.Equal(t, wantLabelSet, logEntry.Labels)67case logEntry := <-ch2:68require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)69require.Equal(t, "writing some text", logEntry.Line)70require.Equal(t, wantLabelSet, logEntry.Labels)71case <-time.After(5 * time.Second):72require.FailNow(t, "failed waiting for log line")73}74}75}7677// Test that updating the component does not leak goroutines.78func TestUpdate_NoLeak(t *testing.T) {79defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))8081ctx, cancel := context.WithCancel(componenttest.TestContext(t))82defer cancel()8384// Create file to tail.85f, err := os.CreateTemp(t.TempDir(), "example")86require.NoError(t, err)87defer f.Close()8889ctrl, err := componenttest.NewControllerFromID(util.TestLogger(t), "loki.source.file")90require.NoError(t, err)9192args := Arguments{93Targets: []discovery.Target{{94"__path__": f.Name(),95"foo": "bar",96}},97ForwardTo: []loki.LogsReceiver{},98}99100go func() {101err := ctrl.Run(ctx, args)102require.NoError(t, err)103}()104105ctrl.WaitRunning(time.Minute)106107// Update a bunch of times to ensure that no goroutines get leaked between108// updates.109for i := 0; i < 10; i++ {110err := ctrl.Update(args)111require.NoError(t, err)112}113}114115func TestTwoTargets(t *testing.T) {116// Create opts for component117opts := component.Options{118Logger: util.TestFlowLogger(t),119Registerer: prometheus.NewRegistry(),120OnStateChange: func(e component.Exports) {},121DataPath: t.TempDir(),122}123124f, err := os.CreateTemp(opts.DataPath, "example")125if err != nil {126log.Fatal(err)127}128f2, err := os.CreateTemp(opts.DataPath, "example2")129if err != nil {130log.Fatal(err)131}132defer f.Close()133defer f2.Close()134135ch1 := make(chan loki.Entry)136args := Arguments{}137args.Targets = []discovery.Target{138{"__path__": f.Name(), "foo": "bar"},139{"__path__": f2.Name(), "foo": "bar2"},140}141args.ForwardTo = []loki.LogsReceiver{ch1}142143c, err := New(opts, args)144require.NoError(t, err)145146ctx, cancel := context.WithCancel(context.Background())147go c.Run(ctx)148time.Sleep(100 * time.Millisecond)149150_, err = f.Write([]byte("text\n"))151require.NoError(t, err)152153_, err = f2.Write([]byte("text2\n"))154require.NoError(t, err)155156foundF1, foundF2 := false, false157for i := 0; i < 2; i++ {158select {159case logEntry := <-ch1:160require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)161if logEntry.Line == "text" {162foundF1 = true163} else if logEntry.Line == "text2" {164foundF2 = true165}166167case <-time.After(5 * time.Second):168require.FailNow(t, "failed waiting for log line")169}170}171require.True(t, foundF1)172require.True(t, foundF2)173cancel()174// Verify that positions.yml is written. NOTE: if we didn't wait for it, there would be a race condition between175// temporary directory being cleaned up and this file being created.176require.Eventually(177t,178func() bool {179if _, err := os.Stat(filepath.Join(opts.DataPath, "positions.yml")); errors.Is(err, os.ErrNotExist) {180return false181}182return true183},1845*time.Second,18510*time.Millisecond,186"expected positions.yml file to be written eventually",187)188}189190191