Path: blob/main/pkg/flow/componenttest/componenttest.go
4094 views
// Package componenttest provides utilities for testing Flow components.1package componenttest23import (4"context"5"fmt"6"os"7"reflect"8"sync"9"time"1011"github.com/prometheus/client_golang/prometheus"12"go.opentelemetry.io/otel/trace"13"go.uber.org/atomic"1415"github.com/go-kit/log"16"github.com/grafana/agent/component"17"github.com/grafana/agent/pkg/flow/logging"18)1920// A Controller is a testing controller which controls a single component.21type Controller struct {22reg component.Registration23log log.Logger2425onRun sync.Once26running chan struct{}27runError atomic.Error2829innerMut sync.Mutex30inner component.Component3132exportsMut sync.Mutex33exports component.Exports34exportsCh chan struct{}35}3637// NewControllerFromID returns a new testing Controller for the component with38// the provided name.39func NewControllerFromID(l log.Logger, componentName string) (*Controller, error) {40reg, ok := component.Get(componentName)41if !ok {42return nil, fmt.Errorf("no such component %q", componentName)43}44return NewControllerFromReg(l, reg), nil45}4647// NewControllerFromReg registers a new testing Controller for a component with48// the given registration. This can be used for testing fake components which49// aren't really registered.50func NewControllerFromReg(l log.Logger, reg component.Registration) *Controller {51if l == nil {52l = log.NewNopLogger()53}5455return &Controller{56reg: reg,57log: l,5859running: make(chan struct{}, 1),60exportsCh: make(chan struct{}, 1),61}62}6364func (c *Controller) onStateChange(e component.Exports) {65c.exportsMut.Lock()66changed := !reflect.DeepEqual(c.exports, e)67c.exports = e68c.exportsMut.Unlock()6970if !changed {71return72}7374select {75case c.exportsCh <- struct{}{}:76default:77}78}7980// WaitRunning blocks until the Controller is running up to the provided81// timeout.82func (c *Controller) WaitRunning(timeout time.Duration) error {83select {84case <-time.After(timeout):85return fmt.Errorf("timed out waiting for the controller to start running")86case <-c.running:87if err := c.runError.Load(); err != nil {88return fmt.Errorf("component failed to start: %w", err)89}90return nil91}92}9394// WaitExports blocks until new Exports are available up to the provided95// timeout.96func (c *Controller) WaitExports(timeout time.Duration) error {97select {98case <-time.After(timeout):99return fmt.Errorf("timed out waiting for exports")100case <-c.exportsCh:101return nil102}103}104105// Exports gets the most recent exports for a component.106func (c *Controller) Exports() component.Exports {107c.exportsMut.Lock()108defer c.exportsMut.Unlock()109return c.exports110}111112// Run starts the controller, building and running the component. Run blocks113// until ctx is canceled, the component exits, or if there was an error.114//115// Run may only be called once per Controller.116func (c *Controller) Run(ctx context.Context, args component.Arguments) error {117dataPath, err := os.MkdirTemp("", "controller-*")118if err != nil {119return err120}121defer func() {122_ = os.RemoveAll(dataPath)123}()124125run, err := c.buildComponent(dataPath, args)126127// We close c.running before checking the error, since the component will128// never run if we return an error anyway.129c.onRun.Do(func() {130c.runError.Store(err)131close(c.running)132})133134if err != nil {135return err136}137return run.Run(ctx)138}139140func (c *Controller) buildComponent(dataPath string, args component.Arguments) (component.Component, error) {141c.innerMut.Lock()142defer c.innerMut.Unlock()143144writerAdapter := log.NewStdlibAdapter(c.log)145sink, err := logging.WriterSink(writerAdapter, logging.SinkOptions{146Level: logging.LevelDebug,147Format: logging.FormatLogfmt,148})149if err != nil {150return nil, err151}152153opts := component.Options{154ID: c.reg.Name + ".test",155Logger: logging.New(sink),156Tracer: trace.NewNoopTracerProvider(),157DataPath: dataPath,158OnStateChange: c.onStateChange,159Registerer: prometheus.NewRegistry(),160}161162inner, err := c.reg.Build(opts, args)163if err != nil {164return nil, err165}166167c.inner = inner168return inner, nil169}170171// Update updates the running component. Should only be called after Run.172func (c *Controller) Update(args component.Arguments) error {173c.innerMut.Lock()174defer c.innerMut.Unlock()175176if c.inner == nil {177return fmt.Errorf("component is not running")178}179return c.inner.Update(args)180}181182183