Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/usage/pkg/scheduler/scheduler.go
2498 views
1
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package scheduler
6
7
import (
8
"context"
9
"errors"
10
"sync"
11
"time"
12
13
"github.com/gitpod-io/gitpod/common-go/log"
14
"github.com/go-redsync/redsync/v4"
15
"github.com/robfig/cron"
16
)
17
18
func New(mutex *redsync.Redsync, jobs ...JobSpec) *Scheduler {
19
return &Scheduler{
20
specs: jobs,
21
runningJobs: sync.WaitGroup{},
22
cron: cron.NewWithLocation(time.UTC),
23
mutex: mutex,
24
}
25
}
26
27
type Scheduler struct {
28
specs []JobSpec
29
runningJobs sync.WaitGroup
30
mutex *redsync.Redsync
31
32
cron *cron.Cron
33
}
34
35
type JobSpec struct {
36
Job Job
37
ID string
38
Schedule cron.Schedule
39
InitialLockDuration time.Duration
40
}
41
42
func (c *Scheduler) Start() {
43
log.Infof("Starting usage scheduler. Setting up %d jobs.", len(c.specs))
44
45
for _, job := range c.specs {
46
// need to re-assign job to avoid pointing to a different job spec once the `cron.FuncJob` executes.
47
j := job
48
c.cron.Schedule(job.Schedule, cron.FuncJob(func() {
49
ctx := context.Background()
50
c.runningJobs.Add(1)
51
defer c.runningJobs.Done()
52
53
now := time.Now().UTC()
54
logger := log.WithField("job_id", j.ID)
55
56
err := WithRefreshingMutex(ctx, c.mutex, j.ID, j.InitialLockDuration, func(ctx context.Context) error {
57
logger.Infof("Starting scheduled job %s", j.ID)
58
reportJobStarted(j.ID)
59
jobErr := j.Job.Run()
60
reportJobCompleted(j.ID, time.Since(now), jobErr)
61
62
if jobErr != nil {
63
// We don't propagate the job erros outside of run mutex context deliberately
64
// to contain each job errors
65
logger.WithError(jobErr).Errorf("Scheduled job %s failed.", job.ID)
66
return nil
67
}
68
69
logger.Infof("Scheduled job %s completed succesfully.", j.ID)
70
return jobErr
71
})
72
if err != nil {
73
if errors.Is(err, redsync.ErrTaken{}) {
74
logger.WithError(err).Info("Failed to acquire lock, another instance holds the lock already.")
75
return
76
}
77
78
logger.WithError(err).Error("Failed to execute job inside a mutex.")
79
return
80
}
81
82
logger.Debug("Succesfully obtained mutex and executed job.")
83
}))
84
}
85
86
c.cron.Start()
87
}
88
89
// Stop terminates the Scheduler and awaits for all running jobs to complete.
90
func (c *Scheduler) Stop() {
91
log.Info("Stopping scheduler.")
92
c.cron.Stop()
93
94
log.Info("Awaiting existing jobs to complete.")
95
// Wait for existing jobs to finish
96
c.runningJobs.Wait()
97
98
log.Infof("All running jobs completed.")
99
}
100
101