Path: blob/main/pkg/flow/internal/controller/scheduler.go
4095 views
package controller12import (3"context"4"fmt"5"sync"6)78// RunnableNode is any dag.Node which can also be run.9type RunnableNode interface {10NodeID() string11Run(ctx context.Context) error12}1314// Scheduler runs components.15type Scheduler struct {16ctx context.Context17cancel context.CancelFunc18running sync.WaitGroup1920tasksMut sync.Mutex21tasks map[string]*task22}2324// NewScheduler creates a new Scheduler. Call Synchronize to manage the set of25// components which are running.26//27// Call Close to stop the Scheduler and all running components.28func NewScheduler() *Scheduler {29ctx, cancel := context.WithCancel(context.Background())30return &Scheduler{31ctx: ctx,32cancel: cancel,3334tasks: make(map[string]*task),35}36}3738// Synchronize synchronizes the running components to those defined by rr.39//40// New RunnableNodes will be launched as new goroutines. RunnableNodes already41// managed by Scheduler will be kept running, while running RunnableNodes that42// are not in rr will be shut down and removed.43//44// Existing components will be restarted if they stopped since the previous45// call to Synchronize.46func (s *Scheduler) Synchronize(rr []RunnableNode) error {47s.tasksMut.Lock()48defer s.tasksMut.Unlock()4950if s.ctx.Err() != nil {51return fmt.Errorf("Scheduler is closed")52}5354newRunnables := make(map[string]RunnableNode, len(rr))55for _, r := range rr {56newRunnables[r.NodeID()] = r57}5859// Stop tasks that are not defined in rr.60var stopping sync.WaitGroup61for id, t := range s.tasks {62if _, keep := newRunnables[id]; keep {63continue64}6566stopping.Add(1)67go func(t *task) {68defer stopping.Done()69t.Stop()70}(t)71}7273// Launch new runnables that have appeared.74for id, r := range newRunnables {75if _, exist := s.tasks[id]; exist {76continue77}7879var (80nodeID = id81newRunnable = r82)8384opts := taskOptions{85Context: s.ctx,86Runnable: newRunnable,87OnDone: func() {88defer s.running.Done()8990s.tasksMut.Lock()91defer s.tasksMut.Unlock()92delete(s.tasks, nodeID)93},94}9596s.running.Add(1)97s.tasks[nodeID] = newTask(opts)98}99100// Wait for all stopping runnables to exit.101stopping.Wait()102return nil103}104105// Close stops the Scheduler and returns after all running goroutines have106// exited.107func (s *Scheduler) Close() error {108s.cancel()109s.running.Wait()110return nil111}112113// task is a scheduled runnable.114type task struct {115ctx context.Context116cancel context.CancelFunc117exited chan struct{}118}119120type taskOptions struct {121Context context.Context122Runnable RunnableNode123OnDone func()124}125126// newTask creates and starts a new task.127func newTask(opts taskOptions) *task {128ctx, cancel := context.WithCancel(opts.Context)129130t := &task{131ctx: ctx,132cancel: cancel,133exited: make(chan struct{}),134}135136go func() {137defer opts.OnDone()138defer close(t.exited)139_ = opts.Runnable.Run(t.ctx)140}()141return t142}143144func (t *task) Stop() {145t.cancel()146<-t.exited147}148149150