Path: blob/main/components/usage/pkg/scheduler/scheduler.go
2498 views
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.1// Licensed under the GNU Affero General Public License (AGPL).2// See License.AGPL.txt in the project root for license information.34package scheduler56import (7"context"8"errors"9"sync"10"time"1112"github.com/gitpod-io/gitpod/common-go/log"13"github.com/go-redsync/redsync/v4"14"github.com/robfig/cron"15)1617func New(mutex *redsync.Redsync, jobs ...JobSpec) *Scheduler {18return &Scheduler{19specs: jobs,20runningJobs: sync.WaitGroup{},21cron: cron.NewWithLocation(time.UTC),22mutex: mutex,23}24}2526type Scheduler struct {27specs []JobSpec28runningJobs sync.WaitGroup29mutex *redsync.Redsync3031cron *cron.Cron32}3334type JobSpec struct {35Job Job36ID string37Schedule cron.Schedule38InitialLockDuration time.Duration39}4041func (c *Scheduler) Start() {42log.Infof("Starting usage scheduler. Setting up %d jobs.", len(c.specs))4344for _, job := range c.specs {45// need to re-assign job to avoid pointing to a different job spec once the `cron.FuncJob` executes.46j := job47c.cron.Schedule(job.Schedule, cron.FuncJob(func() {48ctx := context.Background()49c.runningJobs.Add(1)50defer c.runningJobs.Done()5152now := time.Now().UTC()53logger := log.WithField("job_id", j.ID)5455err := WithRefreshingMutex(ctx, c.mutex, j.ID, j.InitialLockDuration, func(ctx context.Context) error {56logger.Infof("Starting scheduled job %s", j.ID)57reportJobStarted(j.ID)58jobErr := j.Job.Run()59reportJobCompleted(j.ID, time.Since(now), jobErr)6061if jobErr != nil {62// We don't propagate the job erros outside of run mutex context deliberately63// to contain each job errors64logger.WithError(jobErr).Errorf("Scheduled job %s failed.", job.ID)65return nil66}6768logger.Infof("Scheduled job %s completed succesfully.", j.ID)69return jobErr70})71if err != nil {72if errors.Is(err, redsync.ErrTaken{}) {73logger.WithError(err).Info("Failed to acquire lock, another instance holds the lock already.")74return75}7677logger.WithError(err).Error("Failed to execute job inside a mutex.")78return79}8081logger.Debug("Succesfully obtained mutex and executed job.")82}))83}8485c.cron.Start()86}8788// Stop terminates the Scheduler and awaits for all running jobs to complete.89func (c *Scheduler) Stop() {90log.Info("Stopping scheduler.")91c.cron.Stop()9293log.Info("Awaiting existing jobs to complete.")94// Wait for existing jobs to finish95c.runningJobs.Wait()9697log.Infof("All running jobs completed.")98}99100101