Path: blob/main/pkg/flow/internal/controller/loader_test.go
4095 views
package controller_test12import (3"errors"4"io"5"strings"6"testing"78"github.com/grafana/agent/pkg/cluster"9"github.com/grafana/agent/pkg/flow/internal/controller"10"github.com/grafana/agent/pkg/flow/internal/dag"11"github.com/grafana/agent/pkg/flow/logging"12"github.com/grafana/agent/pkg/river/ast"13"github.com/grafana/agent/pkg/river/diag"14"github.com/grafana/agent/pkg/river/parser"15"github.com/prometheus/client_golang/prometheus"16"github.com/stretchr/testify/require"17"go.opentelemetry.io/otel/trace"18)1920func TestLoader(t *testing.T) {21testFile := `22testcomponents.tick "ticker" {23frequency = "1s"24}2526testcomponents.passthrough "static" {27input = "hello, world!"28}2930testcomponents.passthrough "ticker" {31input = testcomponents.tick.ticker.tick_time32}3334testcomponents.passthrough "forwarded" {35input = testcomponents.passthrough.ticker.output36}37`3839testConfig := `40logging {41level = "debug"42format = "logfmt"43}4445tracing {46sampling_fraction = 147}48`4950// corresponds to testFile51testGraphDefinition := graphDefinition{52Nodes: []string{53"testcomponents.tick.ticker",54"testcomponents.passthrough.static",55"testcomponents.passthrough.ticker",56"testcomponents.passthrough.forwarded",57"logging",58"tracing",59},60OutEdges: []edge{61{From: "testcomponents.passthrough.ticker", To: "testcomponents.tick.ticker"},62{From: "testcomponents.passthrough.forwarded", To: "testcomponents.passthrough.ticker"},63},64}6566newGlobals := func() controller.ComponentGlobals {67return controller.ComponentGlobals{68LogSink: noOpSink(),69Logger: logging.New(nil),70TraceProvider: trace.NewNoopTracerProvider(),71Clusterer: noOpClusterer(),72DataPath: t.TempDir(),73OnComponentUpdate: func(cn *controller.ComponentNode) { /* no-op */ },74Registerer: prometheus.NewRegistry(),75}76}7778t.Run("New Graph", func(t *testing.T) {79l := controller.NewLoader(newGlobals())80diags := applyFromContent(t, l, []byte(testFile), []byte(testConfig))81require.NoError(t, diags.ErrorOrNil())82requireGraph(t, l.Graph(), testGraphDefinition)83})8485t.Run("New Graph No Config", func(t *testing.T) {86l := controller.NewLoader(newGlobals())87diags := applyFromContent(t, l, []byte(testFile), nil)88require.NoError(t, diags.ErrorOrNil())89requireGraph(t, l.Graph(), testGraphDefinition)90})9192t.Run("Copy existing components and delete stale ones", func(t *testing.T) {93startFile := `94// Component that should be copied over to the new graph95testcomponents.tick "ticker" {96frequency = "1s"97}9899// Component that will not exist in the new graph100testcomponents.tick "remove_me" {101frequency = "1m"102}103`104l := controller.NewLoader(newGlobals())105diags := applyFromContent(t, l, []byte(startFile), []byte(testConfig))106origGraph := l.Graph()107require.NoError(t, diags.ErrorOrNil())108109diags = applyFromContent(t, l, []byte(testFile), []byte(testConfig))110require.NoError(t, diags.ErrorOrNil())111newGraph := l.Graph()112113// Ensure that nodes were copied over and not recreated114require.Equal(t, origGraph.GetByID("testcomponents.tick.ticker"), newGraph.GetByID("testcomponents.tick.ticker"))115require.Nil(t, newGraph.GetByID("testcomponents.tick.remove_me")) // The new graph shouldn't have the old node116})117118t.Run("Load with invalid components", func(t *testing.T) {119invalidFile := `120doesnotexist "bad_component" {121}122`123l := controller.NewLoader(newGlobals())124diags := applyFromContent(t, l, []byte(invalidFile), nil)125require.ErrorContains(t, diags.ErrorOrNil(), `Unrecognized component name "doesnotexist`)126})127128t.Run("Partial load with invalid reference", func(t *testing.T) {129invalidFile := `130testcomponents.tick "ticker" {131frequency = "1s"132}133134testcomponents.passthrough "valid" {135input = testcomponents.tick.ticker.tick_time136}137138testcomponents.passthrough "invalid" {139input = testcomponents.tick.doesnotexist.tick_time140}141`142l := controller.NewLoader(newGlobals())143diags := applyFromContent(t, l, []byte(invalidFile), nil)144require.Error(t, diags.ErrorOrNil())145146requireGraph(t, l.Graph(), graphDefinition{147Nodes: nil,148OutEdges: nil,149})150})151152t.Run("File has cycles", func(t *testing.T) {153invalidFile := `154testcomponents.tick "ticker" {155frequency = "1s"156}157158testcomponents.passthrough "static" {159input = testcomponents.passthrough.forwarded.output160}161162testcomponents.passthrough "ticker" {163input = testcomponents.passthrough.static.output164}165166testcomponents.passthrough "forwarded" {167input = testcomponents.passthrough.ticker.output168}169`170l := controller.NewLoader(newGlobals())171diags := applyFromContent(t, l, []byte(invalidFile), nil)172require.Error(t, diags.ErrorOrNil())173})174175t.Run("Handling of singleton component labels", func(t *testing.T) {176invalidFile := `177testcomponents.tick {178}179testcomponents.singleton "first" {180}181`182l := controller.NewLoader(newGlobals())183diags := applyFromContent(t, l, []byte(invalidFile), nil)184require.ErrorContains(t, diags[0], `Component "testcomponents.tick" must have a label`)185require.ErrorContains(t, diags[1], `Component "testcomponents.singleton" does not support labels`)186})187}188189// TestScopeWithFailingComponent is used to ensure that the scope is filled out, even if the component190// fails to properly start.191func TestScopeWithFailingComponent(t *testing.T) {192testFile := `193testcomponents.tick "ticker" {194frequenc = "1s"195}196197testcomponents.passthrough "static" {198input = "hello, world!"199}200201testcomponents.passthrough "ticker" {202input = testcomponents.tick.ticker.tick_time203}204205testcomponents.passthrough "forwarded" {206input = testcomponents.passthrough.ticker.output207}208`209newGlobals := func() controller.ComponentGlobals {210return controller.ComponentGlobals{211LogSink: noOpSink(),212Logger: logging.New(nil),213TraceProvider: trace.NewNoopTracerProvider(),214DataPath: t.TempDir(),215OnComponentUpdate: func(cn *controller.ComponentNode) { /* no-op */ },216Registerer: prometheus.NewRegistry(),217Clusterer: noOpClusterer(),218}219}220221l := controller.NewLoader(newGlobals())222diags := applyFromContent(t, l, []byte(testFile), nil)223require.Error(t, diags.ErrorOrNil())224require.Len(t, diags, 1)225require.True(t, strings.Contains(diags.Error(), `unrecognized attribute name "frequenc"`))226}227228func noOpSink() *logging.Sink {229s, _ := logging.WriterSink(io.Discard, logging.DefaultSinkOptions)230return s231}232233func noOpClusterer() *cluster.Clusterer {234return &cluster.Clusterer{Node: cluster.NewLocalNode("")}235}236237func applyFromContent(t *testing.T, l *controller.Loader, componentBytes []byte, configBytes []byte) diag.Diagnostics {238t.Helper()239240var (241diags diag.Diagnostics242componentBlocks []*ast.BlockStmt243configBlocks []*ast.BlockStmt = nil244)245246componentBlocks, diags = fileToBlock(t, componentBytes)247if diags.HasErrors() {248return diags249}250251if string(configBytes) != "" {252configBlocks, diags = fileToBlock(t, configBytes)253if diags.HasErrors() {254return diags255}256}257258applyDiags := l.Apply(nil, componentBlocks, configBlocks)259diags = append(diags, applyDiags...)260261return diags262}263264func fileToBlock(t *testing.T, bytes []byte) ([]*ast.BlockStmt, diag.Diagnostics) {265var diags diag.Diagnostics266file, err := parser.ParseFile(t.Name(), bytes)267268var parseDiags diag.Diagnostics269if errors.As(err, &parseDiags); parseDiags.HasErrors() {270return nil, parseDiags271}272273var blocks []*ast.BlockStmt274for _, stmt := range file.Body {275switch stmt := stmt.(type) {276case *ast.BlockStmt:277blocks = append(blocks, stmt)278default:279diags = append(diags, diag.Diagnostic{280Severity: diag.SeverityLevelError,281Message: "unexpected statement",282StartPos: ast.StartPos(stmt).Position(),283EndPos: ast.EndPos(stmt).Position(),284})285}286}287288return blocks, diags289}290291type graphDefinition struct {292Nodes []string293OutEdges []edge294}295296type edge struct{ From, To string }297298func requireGraph(t *testing.T, g *dag.Graph, expect graphDefinition) {299t.Helper()300301var (302actualNodes []string303actualEdges []edge304)305306for _, n := range g.Nodes() {307actualNodes = append(actualNodes, n.NodeID())308}309require.ElementsMatch(t, expect.Nodes, actualNodes, "List of nodes do not match")310311for _, e := range g.Edges() {312actualEdges = append(actualEdges, edge{313From: e.From.NodeID(),314To: e.To.NodeID(),315})316}317require.ElementsMatch(t, expect.OutEdges, actualEdges, "List of edges do not match")318}319320321