Path: blob/main/pkg/flow/internal/controller/loader.go
4095 views
package controller12import (3"context"4"errors"5"fmt"6"sync"7"time"89"github.com/go-kit/log"10"github.com/go-kit/log/level"11"github.com/grafana/agent/component"12"github.com/grafana/agent/pkg/flow/internal/dag"13"github.com/grafana/agent/pkg/flow/logging"14"github.com/grafana/agent/pkg/river/ast"15"github.com/grafana/agent/pkg/river/diag"16"github.com/hashicorp/go-multierror"17"github.com/rfratto/ckit"18"github.com/rfratto/ckit/peer"19"go.opentelemetry.io/otel/attribute"20"go.opentelemetry.io/otel/codes"21"go.opentelemetry.io/otel/trace"2223_ "github.com/grafana/agent/pkg/flow/internal/testcomponents" // Include test components24)2526// The Loader builds and evaluates ComponentNodes from River blocks.27type Loader struct {28log *logging.Logger29tracer trace.TracerProvider30globals ComponentGlobals3132mut sync.RWMutex33graph *dag.Graph34originalGraph *dag.Graph35components []*ComponentNode36cache *valueCache37blocks []*ast.BlockStmt // Most recently loaded blocks, used for writing38cm *controllerMetrics39moduleExportIndex int40}4142// NewLoader creates a new Loader. Components built by the Loader will be built43// with co for their options.44func NewLoader(globals ComponentGlobals) *Loader {45l := &Loader{46log: globals.Logger,47tracer: globals.TraceProvider,48globals: globals,4950graph: &dag.Graph{},51originalGraph: &dag.Graph{},52cache: newValueCache(),53cm: newControllerMetrics(globals.Registerer),54}55cc := newControllerCollector(l)56if globals.Registerer != nil {57globals.Registerer.MustRegister(cc)58}5960globals.Clusterer.Node.Observe(ckit.FuncObserver(func(peers []peer.Peer) (reregister bool) {61tracer := l.tracer.Tracer("")62spanCtx, span := tracer.Start(context.Background(), "ClusterStateChange", trace.WithSpanKind(trace.SpanKindInternal))63defer span.End()64for _, cmp := range l.Components() {65if cc, ok := cmp.managed.(component.ClusteredComponent); ok {66if cc.ClusterUpdatesRegistration() {67_, span := tracer.Start(spanCtx, "ClusteredComponentReevaluation", trace.WithSpanKind(trace.SpanKindInternal))68span.SetAttributes(attribute.String("node_id", cmp.NodeID()))69defer span.End()7071err := cmp.Reevaluate()72if err != nil {73level.Error(globals.Logger).Log("msg", "failed to reevaluate component", "componentID", cmp.NodeID(), "err", err)74}75}76}77}78return true79}))8081return l82}8384// Apply loads a new set of components into the Loader. Apply will drop any85// previously loaded component which is not described in the set of River86// blocks.87//88// Apply will reuse existing components if there is an existing component which89// matches the component ID specified by any of the provided River blocks.90// Reused components will be updated to point at the new River block.91//92// Apply will perform an evaluation of all loaded components before returning.93// The provided parentContext can be used to provide global variables and94// functions to components. A child context will be constructed from the parent95// to expose values of other components.96func (l *Loader) Apply(args map[string]any, componentBlocks []*ast.BlockStmt, configBlocks []*ast.BlockStmt) diag.Diagnostics {97start := time.Now()98l.mut.Lock()99defer l.mut.Unlock()100l.cm.controllerEvaluation.Set(1)101defer l.cm.controllerEvaluation.Set(0)102103for key, value := range args {104l.cache.CacheModuleArgument(key, value)105}106l.cache.SyncModuleArgs(args)107108newGraph, diags := l.loadNewGraph(args, componentBlocks, configBlocks)109if diags.HasErrors() {110return diags111}112113var (114components = make([]*ComponentNode, 0, len(componentBlocks))115componentIDs = make([]ComponentID, 0, len(componentBlocks))116)117118tracer := l.tracer.Tracer("")119spanCtx, span := tracer.Start(context.Background(), "GraphEvaluate", trace.WithSpanKind(trace.SpanKindInternal))120defer span.End()121122logger := log.With(l.log, "trace_id", span.SpanContext().TraceID())123level.Info(logger).Log("msg", "starting complete graph evaluation")124defer func() {125span.SetStatus(codes.Ok, "")126127duration := time.Since(start)128level.Info(logger).Log("msg", "finished complete graph evaluation", "duration", duration)129l.cm.componentEvaluationTime.Observe(duration.Seconds())130}()131132l.cache.ClearModuleExports()133// Evaluate all the components.134_ = dag.WalkTopological(&newGraph, newGraph.Leaves(), func(n dag.Node) error {135_, span := tracer.Start(spanCtx, "EvaluateNode", trace.WithSpanKind(trace.SpanKindInternal))136span.SetAttributes(attribute.String("node_id", n.NodeID()))137defer span.End()138139start := time.Now()140defer func() {141level.Info(logger).Log("msg", "finished node evaluation", "node_id", n.NodeID(), "duration", time.Since(start))142}()143144var err error145146switch c := n.(type) {147case *ComponentNode:148components = append(components, c)149componentIDs = append(componentIDs, c.ID())150151if err = l.evaluate(logger, c); err != nil {152var evalDiags diag.Diagnostics153if errors.As(err, &evalDiags) {154diags = append(diags, evalDiags...)155} else {156diags.Add(diag.Diagnostic{157Severity: diag.SeverityLevelError,158Message: fmt.Sprintf("Failed to build component: %s", err),159StartPos: ast.StartPos(c.Block()).Position(),160EndPos: ast.EndPos(c.Block()).Position(),161})162}163}164case BlockNode:165if err = l.evaluate(logger, c); err != nil {166diags.Add(diag.Diagnostic{167Severity: diag.SeverityLevelError,168Message: fmt.Sprintf("Failed to evaluate node for config block: %s", err),169StartPos: ast.StartPos(c.Block()).Position(),170EndPos: ast.EndPos(c.Block()).Position(),171})172}173if exp, ok := n.(*ExportConfigNode); ok {174l.cache.CacheModuleExportValue(exp.Label(), exp.Value())175}176}177178// We only use the error for updating the span status; we don't return the179// error because we want to evaluate as many nodes as we can.180if err != nil {181span.SetStatus(codes.Error, err.Error())182} else {183span.SetStatus(codes.Ok, "")184}185return nil186})187188l.components = components189l.graph = &newGraph190l.cache.SyncIDs(componentIDs)191l.blocks = componentBlocks192l.cm.componentEvaluationTime.Observe(time.Since(start).Seconds())193if l.globals.OnExportsChange != nil && l.cache.ExportChangeIndex() != l.moduleExportIndex {194l.moduleExportIndex = l.cache.ExportChangeIndex()195l.globals.OnExportsChange(l.cache.CreateModuleExports())196}197return diags198}199200// loadNewGraph creates a new graph from the provided blocks and validates it.201func (l *Loader) loadNewGraph(args map[string]any, componentBlocks []*ast.BlockStmt, configBlocks []*ast.BlockStmt) (dag.Graph, diag.Diagnostics) {202var g dag.Graph203// Fill our graph with config blocks.204diags := l.populateConfigBlockNodes(args, &g, configBlocks)205206// Fill our graph with components.207componentNodeDiags := l.populateComponentNodes(&g, componentBlocks)208diags = append(diags, componentNodeDiags...)209210// Write up the edges of the graph211wireDiags := l.wireGraphEdges(&g)212diags = append(diags, wireDiags...)213214// Validate graph to detect cycles215err := dag.Validate(&g)216if err != nil {217diags = append(diags, multierrToDiags(err)...)218return g, diags219}220221// Copy the original graph, this is so we can have access to the original graph for things like displaying a UI or222// debug information.223l.originalGraph = g.Clone()224// Perform a transitive reduction of the graph to clean it up.225dag.Reduce(&g)226227return g, diags228}229230// populateConfigBlockNodes adds any config blocks to the graph.231func (l *Loader) populateConfigBlockNodes(args map[string]any, g *dag.Graph, configBlocks []*ast.BlockStmt) diag.Diagnostics {232var (233diags diag.Diagnostics234nodeMap = NewConfigNodeMap()235)236237for _, block := range configBlocks {238node, newConfigNodeDiags := NewConfigNode(block, l.globals)239diags = append(diags, newConfigNodeDiags...)240241if g.GetByID(node.NodeID()) != nil {242diags.Add(diag.Diagnostic{243Severity: diag.SeverityLevelError,244Message: fmt.Sprintf("%q block already declared", node.NodeID()),245StartPos: ast.StartPos(block).Position(),246EndPos: ast.EndPos(block).Position(),247})248249continue250}251252nodeMapDiags := nodeMap.Append(node)253diags = append(diags, nodeMapDiags...)254if diags.HasErrors() {255continue256}257258g.Add(node)259}260261validateDiags := nodeMap.Validate(l.isModule(), args)262diags = append(diags, validateDiags...)263264// If a logging config block is not provided, we create an empty node which uses defaults.265if nodeMap.logging == nil && !l.isModule() {266c := NewDefaultLoggingConfigNode(l.globals)267g.Add(c)268}269270// If a tracing config block is not provided, we create an empty node which uses defaults.271if nodeMap.tracing == nil && !l.isModule() {272c := NewDefaulTracingConfigNode(l.globals)273g.Add(c)274}275276return diags277}278279// populateComponentNodes adds any components to the graph.280func (l *Loader) populateComponentNodes(g *dag.Graph, componentBlocks []*ast.BlockStmt) diag.Diagnostics {281var (282diags diag.Diagnostics283blockMap = make(map[string]*ast.BlockStmt, len(componentBlocks))284)285for _, block := range componentBlocks {286var c *ComponentNode287id := BlockComponentID(block).String()288289if orig, redefined := blockMap[id]; redefined {290diags.Add(diag.Diagnostic{291Severity: diag.SeverityLevelError,292Message: fmt.Sprintf("Component %s already declared at %s", id, ast.StartPos(orig).Position()),293StartPos: block.NamePos.Position(),294EndPos: block.NamePos.Add(len(id) - 1).Position(),295})296continue297}298blockMap[id] = block299300if exist := l.graph.GetByID(id); exist != nil {301// Re-use the existing component and update its block302c = exist.(*ComponentNode)303c.UpdateBlock(block)304} else {305componentName := block.GetBlockName()306registration, exists := component.Get(componentName)307if !exists {308diags.Add(diag.Diagnostic{309Severity: diag.SeverityLevelError,310Message: fmt.Sprintf("Unrecognized component name %q", componentName),311StartPos: block.NamePos.Position(),312EndPos: block.NamePos.Add(len(componentName) - 1).Position(),313})314continue315}316317if registration.Singleton && block.Label != "" {318diags.Add(diag.Diagnostic{319Severity: diag.SeverityLevelError,320Message: fmt.Sprintf("Component %q does not support labels", componentName),321StartPos: block.LabelPos.Position(),322EndPos: block.LabelPos.Add(len(block.Label) + 1).Position(),323})324continue325}326327if !registration.Singleton && block.Label == "" {328diags.Add(diag.Diagnostic{329Severity: diag.SeverityLevelError,330Message: fmt.Sprintf("Component %q must have a label", componentName),331StartPos: block.NamePos.Position(),332EndPos: block.NamePos.Add(len(componentName) - 1).Position(),333})334continue335}336337if registration.Singleton && l.isModule() {338diags.Add(diag.Diagnostic{339Severity: diag.SeverityLevelError,340Message: fmt.Sprintf("Component %q is a singleton and unsupported inside a module", componentName),341StartPos: block.NamePos.Position(),342EndPos: block.NamePos.Add(len(componentName) - 1).Position(),343})344continue345}346347// Create a new component348c = NewComponentNode(l.globals, block)349}350351g.Add(c)352}353354return diags355}356357// Wire up all the related nodes358func (l *Loader) wireGraphEdges(g *dag.Graph) diag.Diagnostics {359var diags diag.Diagnostics360361for _, n := range g.Nodes() {362refs, nodeDiags := ComponentReferences(n, g)363for _, ref := range refs {364g.AddEdge(dag.Edge{From: n, To: ref.Target})365}366diags = append(diags, nodeDiags...)367}368369return diags370}371372// Variables returns the Variables the Loader exposes for other Flow components373// to reference.374func (l *Loader) Variables() map[string]interface{} {375return l.cache.BuildContext().Variables376}377378// Components returns the current set of loaded components.379func (l *Loader) Components() []*ComponentNode {380l.mut.RLock()381defer l.mut.RUnlock()382return l.components383}384385// Graph returns a copy of the DAG managed by the Loader.386func (l *Loader) Graph() *dag.Graph {387l.mut.RLock()388defer l.mut.RUnlock()389return l.graph.Clone()390}391392// OriginalGraph returns a copy of the graph before Reduce was called. This can be used if you want to show a UI of the393// original graph before the reduce function was called.394func (l *Loader) OriginalGraph() *dag.Graph {395l.mut.RLock()396defer l.mut.RUnlock()397return l.originalGraph.Clone()398}399400// EvaluateDependencies re-evaluates components which depend directly or401// indirectly on c. EvaluateDependencies should be called whenever a component402// updates its exports.403//404// The provided parentContext can be used to provide global variables and405// functions to components. A child context will be constructed from the parent406// to expose values of other components.407func (l *Loader) EvaluateDependencies(c *ComponentNode) {408tracer := l.tracer.Tracer("")409410l.mut.RLock()411defer l.mut.RUnlock()412413l.cm.controllerEvaluation.Set(1)414defer l.cm.controllerEvaluation.Set(0)415start := time.Now()416417spanCtx, span := tracer.Start(context.Background(), "GraphEvaluatePartial", trace.WithSpanKind(trace.SpanKindInternal))418span.SetAttributes(attribute.String("initiator", c.NodeID()))419defer span.End()420421logger := log.With(l.log, "trace_id", span.SpanContext().TraceID())422level.Info(logger).Log("msg", "starting partial graph evaluation")423defer func() {424span.SetStatus(codes.Ok, "")425426duration := time.Since(start)427level.Info(logger).Log("msg", "finished partial graph evaluation", "duration", duration)428l.cm.componentEvaluationTime.Observe(duration.Seconds())429}()430431// Make sure we're in-sync with the current exports of c.432l.cache.CacheExports(c.ID(), c.Exports())433434_ = dag.WalkReverse(l.graph, []dag.Node{c}, func(n dag.Node) error {435if n == c {436// Skip over the starting component; the starting component passed to437// EvaluateDependencies had its exports changed and none of its input438// arguments will need re-evaluation.439return nil440}441442_, span := tracer.Start(spanCtx, "EvaluateNode", trace.WithSpanKind(trace.SpanKindInternal))443span.SetAttributes(attribute.String("node_id", n.NodeID()))444defer span.End()445446var err error447448switch n := n.(type) {449case BlockNode:450err = l.evaluate(logger, n)451if exp, ok := n.(*ExportConfigNode); ok {452l.cache.CacheModuleExportValue(exp.Label(), exp.Value())453}454}455456// We only use the error for updating the span status; we don't return the457// error because we want to evaluate as many nodes as we can.458if err != nil {459span.SetStatus(codes.Error, err.Error())460} else {461span.SetStatus(codes.Ok, "")462}463return nil464})465466if l.globals.OnExportsChange != nil && l.cache.ExportChangeIndex() != l.moduleExportIndex {467l.globals.OnExportsChange(l.cache.CreateModuleExports())468l.moduleExportIndex = l.cache.ExportChangeIndex()469}470}471472// evaluate constructs the final context for the BlockNode and473// evaluates it. mut must be held when calling evaluate.474func (l *Loader) evaluate(logger log.Logger, bn BlockNode) error {475ectx := l.cache.BuildContext()476err := bn.Evaluate(ectx)477478switch c := bn.(type) {479case *ComponentNode:480// Always update the cache both the arguments and exports, since both might481// change when a component gets re-evaluated. We also want to cache the arguments and exports in case of an error482l.cache.CacheArguments(c.ID(), c.Arguments())483l.cache.CacheExports(c.ID(), c.Exports())484case *ArgumentConfigNode:485if _, found := l.cache.moduleArguments[c.Label()]; !found {486if c.Optional() {487l.cache.CacheModuleArgument(c.Label(), c.Default())488} else {489err = fmt.Errorf("missing required argument %q to module", c.Label())490}491}492}493494if err != nil {495level.Error(logger).Log("msg", "failed to evaluate config", "node", bn.NodeID(), "err", err)496return err497}498return nil499}500501func multierrToDiags(errors error) diag.Diagnostics {502var diags diag.Diagnostics503for _, err := range errors.(*multierror.Error).Errors {504// TODO(rfratto): should this include position information?505diags.Add(diag.Diagnostic{506Severity: diag.SeverityLevelError,507Message: err.Error(),508})509}510return diags511}512513// If the definition of a module ever changes, update this.514func (l *Loader) isModule() bool {515// Either 1 of these checks is technically sufficient but let's be extra careful.516return l.globals.OnExportsChange != nil && l.globals.ControllerID != ""517}518519520