// Package runner provides an API for generic goroutine scheduling. It is1// broken up into three concepts:2//3// 1. Task: a unit of work to perform4// 2. Worker: a goroutine dedicated to doing a specific Task5// 3. Runner: manages the set of Workers, one per unique Task6//7// An example of a Task and Worker pair would be a Task which describes an8// endpoint to poll for health. The Task would then be assigned to a Worker to9// perform the polling.10package runner1112import (13"context"14"fmt"15"sync"16)1718// A Task is a payload that determines what a Worker should do. For example,19// a Task may be a struct including an address for a Worker to poll.20type Task interface {21// Hash should return a hash which represents this Task.22Hash() uint6423// Equals should determine if two Tasks are equal. It is only called when two24// Tasks have the same Hash.25Equals(other Task) bool26}2728// A Worker is a goroutine which performs business logic for a Task which is29// assigned to it. Each Worker is responsible for a single Task.30type Worker interface {31// Run starts a Worker, blocking until the provided ctx is canceled or a32// fatal error occurs. Run is guaranteed to be called exactly once for any33// given Worker.34Run(ctx context.Context)35}3637// The Runner manages a set of running Workers based on an active set of tasks.38type Runner[TaskType Task] struct {39newWorker func(t TaskType) Worker4041ctx context.Context42cancel context.CancelFunc4344running sync.WaitGroup45workers *hashMap46}4748// Internal types used to implement the Runner.49type (50// scheduledWorker is a representation of a running worker.51scheduledWorker struct {52Worker Worker // The underlying Worker instance.5354// Function to call to request the worker to stop.55Cancel context.CancelFunc5657// Exited will close once the worker has exited.58Exited chan struct{}59}6061// workerTask represents a tuple of a scheduledWorker with its assigned Task.62// workerTask implements Task for it to be used in a hashMap; two workerTasks63// are equal if their underlying Tasks are equal.64workerTask struct {65Worker *scheduledWorker66Task Task67}68)6970// Hash returns the hash of the Task the scheduledWorker owns.71func (sw *workerTask) Hash() uint64 {72return sw.Task.Hash()73}7475// Equals returns true if the Task owned by this workerTask equals the Task76// owned by another workerTask.77func (sw *workerTask) Equals(other Task) bool {78return sw.Task.Equals(other.(*workerTask).Task)79}8081// New creates a new Runner which manages workers for a given Task type. The82// newWorker function is called whenever a new Task is received that is not83// managed by any existing Worker.84func New[TaskType Task](newWorker func(t TaskType) Worker) *Runner[TaskType] {85ctx, cancel := context.WithCancel(context.Background())8687return &Runner[TaskType]{88newWorker: newWorker,8990ctx: ctx,91cancel: cancel,9293workers: newHashMap(10),94}95}9697// ApplyTasks updates the Tasks tracked by the Runner to the slice specified98// by t. t should be the entire set of tasks that workers should be operating99// against. ApplyTasks will launch new Workers for new tasks and terminate100// previous Workers for tasks which are no longer found in tt.101//102// ApplyTasks will block until Workers for stale Tasks have terminated. If the103// provided context is canceled, ApplyTasks will still finish synchronizing the104// set of Workers but will not wait for stale Workers to exit.105func (s *Runner[TaskType]) ApplyTasks(ctx context.Context, tt []TaskType) error {106if s.ctx.Err() != nil {107return fmt.Errorf("Runner is closed")108}109110// Create a new hashMap of tasks we intend to run.111newTasks := newHashMap(len(tt))112for _, t := range tt {113newTasks.Add(t)114}115116// Stop stale workers (i.e., Workers whose tasks are not in newTasks).117var stopping sync.WaitGroup118for w := range s.workers.Iterate() {119if newTasks.Has(w.(*workerTask).Task) {120// Task still exists.121continue122}123124// Stop and remove the task from s.workers.125stopping.Add(1)126go func(w *workerTask) {127defer stopping.Done()128defer s.workers.Delete(w)129w.Worker.Cancel()130131select {132case <-ctx.Done():133case <-w.Worker.Exited:134}135}(w.(*workerTask))136}137138// Ensure that every defined task in newTasks has a worker associated with139// it.140for definedTask := range newTasks.Iterate() {141// Ignore tasks for workers that are already running.142//143// We use a temporary workerTask here where only the task field is used144// for comparison. This prevents unnecessarily creating a new worker when145// one isn't needed.146if s.workers.Has(&workerTask{Task: definedTask}) {147continue148}149150workerCtx, workerCancel := context.WithCancel(s.ctx)151newWorker := &scheduledWorker{152Worker: s.newWorker(definedTask.(TaskType)),153Cancel: workerCancel,154Exited: make(chan struct{}),155}156newTask := &workerTask{157Worker: newWorker,158Task: definedTask,159}160161s.running.Add(1)162go func() {163defer s.running.Done()164defer close(newWorker.Exited)165newWorker.Worker.Run(workerCtx)166}()167168_ = s.workers.Add(newTask)169}170171// Wait for all stopping workers to stop (or until the context to cancel,172// which will stop the WaitGroup early).173stopping.Wait()174return ctx.Err()175}176177// Tasks returns the current set of Tasks. Tasks are included even if their178// associated Worker has terminated.179func (s *Runner[TaskType]) Tasks() []TaskType {180var res []TaskType181for task := range s.workers.Iterate() {182workerTask := task.(*workerTask)183res = append(res, workerTask.Task.(TaskType))184}185return res186}187188// Workers returns the current set of Workers. Workers are included even if189// they have terminated.190func (s *Runner[TaskType]) Workers() []Worker {191var res []Worker192for task := range s.workers.Iterate() {193workerTask := task.(*workerTask)194res = append(res, workerTask.Worker.Worker)195}196return res197}198199// Stop the Scheduler and all running Workers. Close blocks until all running200// Workers exit.201func (s *Runner[TaskType]) Stop() {202s.cancel()203s.running.Wait()204}205206207