Path: blob/main/component/otelcol/internal/scheduler/scheduler.go
4096 views
// Package scheduler exposes utilities for scheduling and running OpenTelemetry1// Collector components.2package scheduler34import (5"context"6"fmt"7"sync"8"time"910"github.com/go-kit/log"11"github.com/go-kit/log/level"12"github.com/grafana/agent/component"13otelcomponent "go.opentelemetry.io/collector/component"14"go.uber.org/multierr"15)1617// Scheduler implements manages a set of OpenTelemetry Collector components.18// Scheduler is intended to be used from Flow components which need to schedule19// OpenTelemetry Collector components; it does not implement the full20// component.Component interface.21//22// Each OpenTelemetry Collector component has one instance per supported23// telemetry signal, which is why Scheduler supports multiple components. For24// example, when creating the otlpreceiver component, you would have three25// total instances: one for logs, one for metrics, and one for traces.26// Scheduler should only be used to manage the different signals of the same27// OpenTelemetry Collector component; this means that otlpreceiver and28// jaegerreceiver should not share the same Scheduler.29type Scheduler struct {30log log.Logger3132healthMut sync.RWMutex33health component.Health3435schedMut sync.Mutex36schedComponents []otelcomponent.Component // Most recently created components37host otelcomponent.Host3839// newComponentsCh is written to when schedComponents gets updated.40newComponentsCh chan struct{}41}4243// New creates a new unstarted Scheduler. Call Run to start it, and call44// Schedule to schedule components to run.45func New(l log.Logger) *Scheduler {46return &Scheduler{47log: l,48newComponentsCh: make(chan struct{}, 1),49}50}5152// Schedule schedules a new set of OpenTelemetry Components to run. Components53// will only be scheduled when the Scheduler is running.54//55// Schedule completely overrides the set of previously running components;56// components which have been removed since the last call to Schedule will be57// stopped.58func (cs *Scheduler) Schedule(h otelcomponent.Host, cc ...otelcomponent.Component) {59cs.schedMut.Lock()60defer cs.schedMut.Unlock()6162cs.schedComponents = cc63cs.host = h6465select {66case cs.newComponentsCh <- struct{}{}:67// Queued new message.68default:69// A message is already queued for refreshing running components so we70// don't have to do anything here.71}72}7374// Run starts the Scheduler. Run will watch for schedule components to appear75// and run them, terminating previously running components if they exist.76func (cs *Scheduler) Run(ctx context.Context) error {77var components []otelcomponent.Component7879// Make sure we terminate all of our running components on shutdown.80defer func() {81cs.stopComponents(context.Background(), components...)82}()8384// Wait for a write to cs.newComponentsCh. The initial list of components is85// always empty so there's nothing to do until cs.newComponentsCh is written86// to.87for {88select {89case <-ctx.Done():90return nil91case <-cs.newComponentsCh:92// Stop the old components before running new scheduled ones.93cs.stopComponents(ctx, components...)9495cs.schedMut.Lock()96components = cs.schedComponents97host := cs.host98cs.schedMut.Unlock()99100level.Debug(cs.log).Log("msg", "scheduling components", "count", len(components))101components = cs.startComponents(ctx, host, components...)102}103}104}105106func (cs *Scheduler) stopComponents(ctx context.Context, cc ...otelcomponent.Component) {107for _, c := range cc {108if err := c.Shutdown(ctx); err != nil {109level.Error(cs.log).Log("msg", "failed to stop scheduled component; future updates may fail", "err", err)110}111}112}113114// startComponent schedules the provided components from cc. It then returns115// the list of components which started successfully.116func (cs *Scheduler) startComponents(ctx context.Context, h otelcomponent.Host, cc ...otelcomponent.Component) (started []otelcomponent.Component) {117var errs error118119for _, c := range cc {120if err := c.Start(ctx, h); err != nil {121level.Error(cs.log).Log("msg", "failed to start scheduled component", "err", err)122errs = multierr.Append(errs, err)123} else {124started = append(started, c)125}126}127128if errs != nil {129cs.setHealth(component.Health{130Health: component.HealthTypeUnhealthy,131Message: fmt.Sprintf("failed to create components: %s", errs),132UpdateTime: time.Now(),133})134} else {135cs.setHealth(component.Health{136Health: component.HealthTypeHealthy,137Message: "started scheduled components",138UpdateTime: time.Now(),139})140}141142return started143}144145// CurrentHealth implements component.HealthComponent. The component is146// reported as healthy when the most recent set of scheduled components were147// started successfully.148func (cs *Scheduler) CurrentHealth() component.Health {149cs.healthMut.RLock()150defer cs.healthMut.RUnlock()151return cs.health152}153154func (cs *Scheduler) setHealth(h component.Health) {155cs.healthMut.Lock()156defer cs.healthMut.Unlock()157cs.health = h158}159160161