Path: blob/main/pkg/flow/internal/controller/component.go
4096 views
package controller12import (3"context"4"errors"5"fmt"6"net"7"net/http"8"path"9"path/filepath"10"reflect"11"strings"12"sync"13"time"1415"github.com/go-kit/log/level"16"github.com/grafana/agent/component"17"github.com/grafana/agent/pkg/cluster"18"github.com/grafana/agent/pkg/flow/logging"19"github.com/grafana/agent/pkg/river/ast"20"github.com/grafana/agent/pkg/river/vm"21"github.com/prometheus/client_golang/prometheus"22"go.opentelemetry.io/otel/trace"23"go.uber.org/atomic"24)2526// ComponentID is a fully-qualified name of a component. Each element in27// ComponentID corresponds to a fragment of the period-delimited string;28// "remote.http.example" is ComponentID{"remote", "http", "example"}.29type ComponentID []string3031// BlockComponentID returns the ComponentID specified by an River block.32func BlockComponentID(b *ast.BlockStmt) ComponentID {33id := make(ComponentID, 0, len(b.Name)+1) // add 1 for the optional label34id = append(id, b.Name...)35if b.Label != "" {36id = append(id, b.Label)37}38return id39}4041// String returns the string representation of a component ID.42func (id ComponentID) String() string {43return strings.Join(id, ".")44}4546// Equals returns true if id == other.47func (id ComponentID) Equals(other ComponentID) bool {48if len(id) != len(other) {49return false50}51for i := 0; i < len(id); i++ {52if id[i] != other[i] {53return false54}55}56return true57}5859// DialFunc is a function to establish a network connection.60type DialFunc func(ctx context.Context, network, address string) (net.Conn, error)6162// ComponentGlobals are used by ComponentNodes to build managed components. All63// ComponentNodes should use the same ComponentGlobals.64type ComponentGlobals struct {65LogSink *logging.Sink // Sink used for Logging.66Logger *logging.Logger // Logger shared between all managed components.67TraceProvider trace.TracerProvider // Tracer shared between all managed components.68Clusterer *cluster.Clusterer // Clusterer shared between all managed components.69DataPath string // Shared directory where component data may be stored70OnComponentUpdate func(cn *ComponentNode) // Informs controller that we need to reevaluate71OnExportsChange func(exports map[string]any) // Invoked when the managed component updated its exports72Registerer prometheus.Registerer // Registerer for serving agent and component metrics73HTTPPathPrefix string // HTTP prefix for components.74HTTPListenAddr string // Base address for server75DialFunc DialFunc // Function to connect to HTTPListenAddr.76ControllerID string // ID of controller.77}7879// ComponentNode is a controller node which manages a user-defined component.80//81// ComponentNode manages the underlying component and caches its current82// arguments and exports. ComponentNode manages the arguments for the component83// from a River block.84type ComponentNode struct {85id ComponentID86label string87componentName string88nodeID string // Cached from id.String() to avoid allocating new strings every time NodeID is called.89reg component.Registration90managedOpts component.Options91register *wrappedRegisterer92exportsType reflect.Type93OnComponentUpdate func(cn *ComponentNode) // Informs controller that we need to reevaluate9495mut sync.RWMutex96block *ast.BlockStmt // Current River block to derive args from97eval *vm.Evaluator98managed component.Component // Inner managed component99args component.Arguments // Evaluated arguments for the managed component100101doingEval atomic.Bool102103// NOTE(rfratto): health and exports have their own mutex because they may be104// set asynchronously while mut is still being held (i.e., when calling Evaluate105// and the managed component immediately creates new exports)106107healthMut sync.RWMutex108evalHealth component.Health // Health of the last evaluate109runHealth component.Health // Health of running the component110111exportsMut sync.RWMutex112exports component.Exports // Evaluated exports for the managed component113}114115var _ BlockNode = (*ComponentNode)(nil)116117// NewComponentNode creates a new ComponentNode from an initial ast.BlockStmt.118// The underlying managed component isn't created until Evaluate is called.119func NewComponentNode(globals ComponentGlobals, b *ast.BlockStmt) *ComponentNode {120var (121id = BlockComponentID(b)122nodeID = id.String()123)124125reg, ok := component.Get(ComponentID(b.Name).String())126if !ok {127// NOTE(rfratto): It's normally not possible to get to this point; the128// blocks should have been validated by the graph loader in advance to129// guarantee that b is an expected component.130panic("NewComponentNode: could not find registration for component " + nodeID)131}132133initHealth := component.Health{134Health: component.HealthTypeUnknown,135Message: "component created",136UpdateTime: time.Now(),137}138139cn := &ComponentNode{140id: id,141label: b.Label,142nodeID: nodeID,143componentName: strings.Join(b.Name, "."),144reg: reg,145exportsType: getExportsType(reg),146OnComponentUpdate: globals.OnComponentUpdate,147148block: b,149eval: vm.New(b.Body),150151// Prepopulate arguments and exports with their zero values.152args: reg.Args,153exports: reg.Exports,154155evalHealth: initHealth,156runHealth: initHealth,157}158cn.managedOpts = getManagedOptions(globals, cn)159160return cn161}162163func getManagedOptions(globals ComponentGlobals, cn *ComponentNode) component.Options {164// Make sure the prefix is always absolute.165prefix := globals.HTTPPathPrefix166if !strings.HasPrefix(prefix, "/") {167prefix = "/" + prefix168}169170// We need to generate a globally unique component ID to give to the171// component and for use with telemetry data which doesn't support172// reconstructing the global ID. For everything else (HTTP, data), we can173// just use the controller-local ID as those values are guaranteed to be174// globally unique.175globalID := cn.nodeID176if globals.ControllerID != "" {177globalID = path.Join(globals.ControllerID, cn.nodeID)178}179180wrapped := newWrappedRegisterer()181cn.register = wrapped182return component.Options{183ID: globalID,184Logger: logging.New(logging.LoggerSink(globals.Logger), logging.WithComponentID(cn.nodeID)),185Registerer: prometheus.WrapRegistererWith(prometheus.Labels{186"component_id": globalID,187}, wrapped),188Tracer: wrapTracer(globals.TraceProvider, globalID),189Clusterer: globals.Clusterer,190191DataPath: filepath.Join(globals.DataPath, cn.nodeID),192HTTPListenAddr: globals.HTTPListenAddr,193DialFunc: globals.DialFunc,194HTTPPath: path.Join(prefix, cn.nodeID) + "/",195196OnStateChange: cn.setExports,197}198}199200func getExportsType(reg component.Registration) reflect.Type {201if reg.Exports != nil {202return reflect.TypeOf(reg.Exports)203}204return nil205}206207// ID returns the component ID of the managed component from its River block.208func (cn *ComponentNode) ID() ComponentID { return cn.id }209210// Label returns the label for the block or "" if none was specified.211func (cn *ComponentNode) Label() string { return cn.label }212213// ComponentName returns the component's type, i.e. `local.file.test` returns `local.file`.214func (cn *ComponentNode) ComponentName() string { return cn.componentName }215216// NodeID implements dag.Node and returns the unique ID for this node. The217// NodeID is the string representation of the component's ID from its River218// block.219func (cn *ComponentNode) NodeID() string { return cn.nodeID }220221// UpdateBlock updates the River block used to construct arguments for the222// managed component. The new block isn't used until the next time Evaluate is223// invoked.224//225// UpdateBlock will panic if the block does not match the component ID of the226// ComponentNode.227func (cn *ComponentNode) UpdateBlock(b *ast.BlockStmt) {228if !BlockComponentID(b).Equals(cn.id) {229panic("UpdateBlock called with an River block with a different component ID")230}231232cn.mut.Lock()233defer cn.mut.Unlock()234cn.block = b235cn.eval = vm.New(b.Body)236}237238// Evaluate implements BlockNode and updates the arguments for the managed component239// by re-evaluating its River block with the provided scope. The managed component240// will be built the first time Evaluate is called.241//242// Evaluate will return an error if the River block cannot be evaluated or if243// decoding to arguments fails.244func (cn *ComponentNode) Evaluate(scope *vm.Scope) error {245err := cn.evaluate(scope)246247switch err {248case nil:249cn.setEvalHealth(component.HealthTypeHealthy, "component evaluated")250default:251msg := fmt.Sprintf("component evaluation failed: %s", err)252cn.setEvalHealth(component.HealthTypeUnhealthy, msg)253}254255return err256}257258// Reevaluate calls Update on the managed component with its last used259// arguments.Reevaluate does not build the component if it is not already built260// and does not re-evaluate the River block itself.261// Its only use case is for components opting-in to clustering where calling262// Update with the same Arguments may result in different functionality.263func (cn *ComponentNode) Reevaluate() error {264cn.mut.Lock()265defer cn.mut.Unlock()266267cn.doingEval.Store(true)268defer cn.doingEval.Store(false)269270if cn.managed == nil {271// We haven't built the managed component successfully yet.272return nil273}274275// Update the existing managed component with the same arguments.276err := cn.managed.Update(cn.args)277278switch err {279case nil:280cn.setEvalHealth(component.HealthTypeHealthy, "component evaluated")281return nil282default:283msg := fmt.Sprintf("component evaluation failed: %s", err)284cn.setEvalHealth(component.HealthTypeUnhealthy, msg)285return err286}287}288289func (cn *ComponentNode) evaluate(scope *vm.Scope) error {290cn.mut.Lock()291defer cn.mut.Unlock()292293cn.doingEval.Store(true)294defer cn.doingEval.Store(false)295296argsPointer := cn.reg.CloneArguments()297if err := cn.eval.Evaluate(scope, argsPointer); err != nil {298return fmt.Errorf("decoding River: %w", err)299}300301// args is always a pointer to the args type, so we want to deference it since302// components expect a non-pointer.303argsCopyValue := reflect.ValueOf(argsPointer).Elem().Interface()304305if cn.managed == nil {306// We haven't built the managed component successfully yet.307managed, err := cn.reg.Build(cn.managedOpts, argsCopyValue)308if err != nil {309return fmt.Errorf("building component: %w", err)310}311cn.managed = managed312cn.args = argsCopyValue313314return nil315}316317if reflect.DeepEqual(cn.args, argsCopyValue) {318// Ignore components which haven't changed. This reduces the cost of319// calling evaluate for components where evaluation is expensive (e.g., if320// re-evaluating requires re-starting some internal logic).321return nil322}323324// Update the existing managed component325if err := cn.managed.Update(argsCopyValue); err != nil {326return fmt.Errorf("updating component: %w", err)327}328329cn.args = argsCopyValue330return nil331}332333// Run runs the managed component in the calling goroutine until ctx is334// canceled. Evaluate must have been called at least once without retuning an335// error before calling Run.336//337// Run will immediately return ErrUnevaluated if Evaluate has never been called338// successfully. Otherwise, Run will return nil.339func (cn *ComponentNode) Run(ctx context.Context) error {340cn.mut.RLock()341managed := cn.managed342cn.mut.RUnlock()343344if managed == nil {345return ErrUnevaluated346}347348cn.setRunHealth(component.HealthTypeHealthy, "started component")349err := cn.managed.Run(ctx)350351var exitMsg string352logger := cn.managedOpts.Logger353if err != nil {354level.Error(logger).Log("msg", "component exited with error", "err", err)355exitMsg = fmt.Sprintf("component shut down with error: %s", err)356} else {357level.Info(logger).Log("msg", "component exited")358exitMsg = "component shut down normally"359}360361cn.setRunHealth(component.HealthTypeExited, exitMsg)362return err363}364365// ErrUnevaluated is returned if ComponentNode.Run is called before a managed366// component is built.367var ErrUnevaluated = errors.New("managed component not built")368369// Arguments returns the current arguments of the managed component.370func (cn *ComponentNode) Arguments() component.Arguments {371cn.mut.RLock()372defer cn.mut.RUnlock()373return cn.args374}375376// Block implements BlockNode and returns the current block of the managed component.377func (cn *ComponentNode) Block() *ast.BlockStmt {378cn.mut.RLock()379defer cn.mut.RUnlock()380return cn.block381}382383// Exports returns the current set of exports from the managed component.384// Exports returns nil if the managed component does not have exports.385func (cn *ComponentNode) Exports() component.Exports {386cn.exportsMut.RLock()387defer cn.exportsMut.RUnlock()388return cn.exports389}390391// setExports is called whenever the managed component updates. e must be the392// same type as the registered exports type of the managed component.393func (cn *ComponentNode) setExports(e component.Exports) {394if cn.exportsType == nil {395panic(fmt.Sprintf("Component %s called OnStateChange but never registered an Exports type", cn.nodeID))396}397if reflect.TypeOf(e) != cn.exportsType {398panic(fmt.Sprintf("Component %s changed Exports types from %T to %T", cn.nodeID, cn.reg.Exports, e))399}400401// Some components may aggressively reexport values even though no exposed402// state has changed. This may be done for components which always supply403// exports whenever their arguments are evaluated without tracking internal404// state to see if anything actually changed.405//406// To avoid needlessly reevaluating components we'll ignore unchanged407// exports.408var changed bool409410cn.exportsMut.Lock()411if !reflect.DeepEqual(cn.exports, e) {412changed = true413cn.exports = e414}415cn.exportsMut.Unlock()416417if cn.doingEval.Load() {418// Optimization edge case: some components supply exports when they're419// being evaluated.420//421// Since components that are being evaluated will always cause their422// dependencies to also be evaluated, there's no reason to call423// onExportsChange here.424return425}426427if changed {428// Inform the controller that we have new exports.429cn.OnComponentUpdate(cn)430}431}432433// CurrentHealth returns the current health of the ComponentNode.434//435// The health of a ComponentNode is determined by combining:436//437// 1. Health from the call to Run().438// 2. Health from the last call to Evaluate().439// 3. Health reported from the component.440func (cn *ComponentNode) CurrentHealth() component.Health {441cn.healthMut.RLock()442defer cn.healthMut.RUnlock()443444var (445runHealth = cn.runHealth446evalHealth = cn.evalHealth447)448449if hc, ok := cn.managed.(component.HealthComponent); ok {450componentHealth := hc.CurrentHealth()451return component.LeastHealthy(runHealth, evalHealth, componentHealth)452}453454return component.LeastHealthy(runHealth, evalHealth)455}456457// DebugInfo returns debugging information from the managed component (if any).458func (cn *ComponentNode) DebugInfo() interface{} {459cn.mut.RLock()460defer cn.mut.RUnlock()461462if dc, ok := cn.managed.(component.DebugComponent); ok {463return dc.DebugInfo()464}465return nil466}467468// setEvalHealth sets the internal health from a call to Evaluate. See Health469// for information on how overall health is calculated.470func (cn *ComponentNode) setEvalHealth(t component.HealthType, msg string) {471cn.healthMut.Lock()472defer cn.healthMut.Unlock()473474cn.evalHealth = component.Health{475Health: t,476Message: msg,477UpdateTime: time.Now(),478}479}480481// setRunHealth sets the internal health from a call to Run. See Health for482// information on how overall health is calculated.483func (cn *ComponentNode) setRunHealth(t component.HealthType, msg string) {484cn.healthMut.Lock()485defer cn.healthMut.Unlock()486487cn.runHealth = component.Health{488Health: t,489Message: msg,490UpdateTime: time.Now(),491}492}493494// HTTPHandler returns an http handler for a component IF it implements HTTPComponent.495// otherwise it will return nil.496func (cn *ComponentNode) HTTPHandler() http.Handler {497handler, ok := cn.managed.(component.HTTPComponent)498if !ok {499return nil500}501return handler.Handler()502}503504505