// Package flow implements the Flow component graph system. Flow configuration1// files are parsed from River, which contain a listing of components to run.2//3// # Components4//5// Each component has a set of arguments (River attributes and blocks) and6// optionally a set of exported fields. Components can reference the exports of7// other components using River expressions.8//9// See the top-level component package for more information on components, and10// subpackages for defined components.11//12// # Component Health13//14// A component will have various health states during its lifetime:15//16// 1. Unknown: The initial health state for new components.17// 2. Healthy: A healthy component18// 3. Unhealthy: An unhealthy component.19// 4. Exited: A component which is no longer running.20//21// Health states are paired with a time for when the health state was generated22// and a message providing more detail for the health state.23//24// Components can report their own health states. The health state reported by25// a component is merged with the Flow-level health of that component: an error26// when evaluating the configuration for a component will always be reported as27// unhealthy until the next successful evaluation.28//29// # Component Evaluation30//31// The process of converting the River block associated with a component into32// the appropriate Go struct is called "component evaluation."33//34// Components are only evaluated after all components they reference have been35// evaluated; cyclic dependencies are invalid.36//37// If a component updates its Exports at runtime, other components which directly38// or indirectly reference the updated component will have their Arguments39// re-evaluated.40//41// The arguments and exports for a component will be left in their last valid42// state if a component shuts down or is given an invalid config. This prevents43// a domino effect of a single failed component taking down other components44// which are otherwise healthy.45package flow4647import (48"context"49"encoding/json"50"net"51"sync"52"time"5354"github.com/go-kit/log/level"55"github.com/grafana/agent/pkg/cluster"56"github.com/grafana/agent/pkg/flow/internal/controller"57"github.com/grafana/agent/pkg/flow/internal/dag"58"github.com/grafana/agent/pkg/flow/logging"59"github.com/grafana/agent/pkg/flow/tracing"60"github.com/prometheus/client_golang/prometheus"61"go.uber.org/atomic"62)6364// Options holds static options for a flow controller.65type Options struct {66// ControllerID is an identifier used to represent the controller.67// ControllerID is used to generate a globally unique display name for68// components in a binary where multiple controllers are used.69//70// If running multiple Flow controllers, each controller must have a71// different value for ControllerID to be able to differentiate between72// components in telemetry data.73ControllerID string7475// LogSink to use for controller logs and components. A no-op logger will be76// created if this is nil.77LogSink *logging.Sink7879// Tracer for components to use. A no-op tracer will be created if this is80// nil.81Tracer *tracing.Tracer8283// Clusterer for implementing distributed behavior among components running84// on different nodes.85Clusterer *cluster.Clusterer8687// Directory where components can write data. Constructed components will be88// given a subdirectory of DataPath using the local ID of the component.89//90// If running multiple Flow controllers, each controller must have a91// different value for DataPath to prevent components from colliding.92DataPath string9394// Reg is the prometheus register to use95Reg prometheus.Registerer9697// HTTPPathPrefix is the path prefix given to managed components. May be98// empty. When provided, it should be an absolute path.99//100// Components will be given a path relative to HTTPPathPrefix using their101// local ID.102//103// If running multiple Flow controllers, each controller must have a104// different value for HTTPPathPrefix to prevent components from colliding.105HTTPPathPrefix string106107// HTTPListenAddr is the base address (host:port) where component APIs are108// exposed to other components.109HTTPListenAddr string110111// OnExportsChange is called when the exports of the controller change.112// Exports are controlled by "export" configuration blocks. If113// OnExportsChange is nil, export configuration blocks are not allowed in the114// loaded config file.115OnExportsChange func(exports map[string]any)116117// DialFunc is a function to use for components to properly connect to118// HTTPListenAddr. If nil, DialFunc defaults to (&net.Dialer{}).DialContext.119DialFunc func(ctx context.Context, network, address string) (net.Conn, error)120}121122// Flow is the Flow system.123type Flow struct {124log *logging.Logger125tracer *tracing.Tracer126clusterer *cluster.Clusterer127opts Options128129updateQueue *controller.Queue130sched *controller.Scheduler131loader *controller.Loader132133loadFinished chan struct{}134135loadMut sync.RWMutex136loadedOnce atomic.Bool137}138139// New creates and starts a new Flow controller. Call Close to stop140// the controller.141func New(o Options) *Flow {142var (143log = logging.New(o.LogSink)144tracer = o.Tracer145clusterer = o.Clusterer146)147148if tracer == nil {149var err error150tracer, err = tracing.New(tracing.DefaultOptions)151if err != nil {152// This shouldn't happen unless there's a bug153panic(err)154}155}156157dialFunc := o.DialFunc158if dialFunc == nil {159dialFunc = (&net.Dialer{}).DialContext160}161162var (163queue = controller.NewQueue()164sched = controller.NewScheduler()165loader = controller.NewLoader(controller.ComponentGlobals{166LogSink: o.LogSink,167Logger: log,168TraceProvider: tracer,169Clusterer: clusterer,170DataPath: o.DataPath,171OnComponentUpdate: func(cn *controller.ComponentNode) {172// Changed components should be queued for reevaluation.173queue.Enqueue(cn)174},175OnExportsChange: o.OnExportsChange,176Registerer: o.Reg,177HTTPPathPrefix: o.HTTPPathPrefix,178HTTPListenAddr: o.HTTPListenAddr,179DialFunc: dialFunc,180ControllerID: o.ControllerID,181})182)183return &Flow{184log: log,185tracer: tracer,186opts: o,187188clusterer: clusterer,189updateQueue: queue,190sched: sched,191loader: loader,192193loadFinished: make(chan struct{}, 1),194}195}196197// Run starts the Flow controller, blocking until the provided context is198// canceled. Run must only be called once.199func (c *Flow) Run(ctx context.Context) {200defer c.sched.Close()201defer level.Debug(c.log).Log("msg", "flow controller exiting")202203for {204select {205case <-ctx.Done():206return207208case <-c.updateQueue.Chan():209// We need to pop _everything_ from the queue and evaluate each of them.210// If we only pop a single element, other components may sit waiting for211// evaluation forever.212for {213updated := c.updateQueue.TryDequeue()214if updated == nil {215break216}217218level.Debug(c.log).Log("msg", "handling component with updated state", "node_id", updated.NodeID())219c.loader.EvaluateDependencies(updated)220}221222case <-c.loadFinished:223level.Info(c.log).Log("msg", "scheduling loaded components")224225components := c.loader.Components()226runnables := make([]controller.RunnableNode, 0, len(components))227for _, uc := range components {228runnables = append(runnables, uc)229}230err := c.sched.Synchronize(runnables)231if err != nil {232level.Error(c.log).Log("msg", "failed to load components", "err", err)233}234}235}236}237238// LoadFile synchronizes the state of the controller with the current config239// file. Components in the graph will be marked as unhealthy if there was an240// error encountered during Load.241//242// The controller will only start running components after Load is called once243// without any configuration errors.244func (c *Flow) LoadFile(file *File, args map[string]any) error {245c.loadMut.Lock()246defer c.loadMut.Unlock()247248diags := c.loader.Apply(args, file.Components, file.ConfigBlocks)249if !c.loadedOnce.Load() && diags.HasErrors() {250// The first call to Load should not run any components if there were251// errors in the configuration file.252return diags253}254c.loadedOnce.Store(true)255256select {257case c.loadFinished <- struct{}{}:258default:259// A refresh is already scheduled260}261return diags.ErrorOrNil()262}263264// Ready returns whether the Flow controller has finished its initial load.265func (c *Flow) Ready() bool {266return c.loadedOnce.Load()267}268269// ComponentInfos returns the component infos.270func (c *Flow) ComponentInfos() []*ComponentInfo {271c.loadMut.RLock()272defer c.loadMut.RUnlock()273274cns := c.loader.Components()275infos := make([]*ComponentInfo, len(cns))276edges := c.loader.OriginalGraph().Edges()277for i, com := range cns {278nn := newFromNode(com, edges)279infos[i] = nn280}281return infos282}283284func newFromNode(cn *controller.ComponentNode, edges []dag.Edge) *ComponentInfo {285references := make([]string, 0)286referencedBy := make([]string, 0)287for _, e := range edges {288// Skip over any edge which isn't between two component nodes. This is a289// temporary workaround needed until there's the concept of configuration290// blocks from the API.291//292// Without this change, the graph fails to render when a configuration293// block is referenced in the graph.294//295// TODO(rfratto): add support for config block nodes in the API and UI.296if !isComponentNode(e.From) || !isComponentNode(e.To) {297continue298}299300if e.From.NodeID() == cn.NodeID() {301references = append(references, e.To.NodeID())302} else if e.To.NodeID() == cn.NodeID() {303referencedBy = append(referencedBy, e.From.NodeID())304}305}306h := cn.CurrentHealth()307ci := &ComponentInfo{308Label: cn.Label(),309ID: cn.NodeID(),310Name: cn.ComponentName(),311Type: "block",312References: references,313ReferencedBy: referencedBy,314Health: &ComponentHealth{315State: h.Health.String(),316Message: h.Message,317UpdatedTime: h.UpdateTime,318},319}320return ci321}322323func isComponentNode(n dag.Node) bool {324_, ok := n.(*controller.ComponentNode)325return ok326}327328// ComponentInfo represents a component in flow.329type ComponentInfo struct {330Name string `json:"name,omitempty"`331Type string `json:"type,omitempty"`332ID string `json:"id,omitempty"`333Label string `json:"label,omitempty"`334References []string `json:"referencesTo"`335ReferencedBy []string `json:"referencedBy"`336Health *ComponentHealth `json:"health"`337Original string `json:"original"`338Arguments json.RawMessage `json:"arguments,omitempty"`339Exports json.RawMessage `json:"exports,omitempty"`340DebugInfo json.RawMessage `json:"debugInfo,omitempty"`341}342343// ComponentHealth represents the health of a component.344type ComponentHealth struct {345State string `json:"state"`346Message string `json:"message"`347UpdatedTime time.Time `json:"updatedTime"`348}349350351