Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/usage/pkg/scheduler/mutex.go
2498 views
1
// Copyright (c) 2023 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package scheduler
6
7
import (
8
"context"
9
"time"
10
11
"github.com/gitpod-io/gitpod/common-go/log"
12
"github.com/go-redsync/redsync/v4"
13
)
14
15
func WithRefreshingMutex(ctx context.Context, rs *redsync.Redsync, name string, expiry time.Duration, fn func(ctx context.Context) error) error {
16
logger := log.Log.WithField("mutexName", name).WithField("mutexExpiry", expiry)
17
18
// Refresh the mutex 10 seconds before it expires, or every 1 second at minimum
19
refreshThreshold := expiry - 10*time.Second
20
if refreshThreshold < 0 {
21
refreshThreshold = 1 * time.Second
22
}
23
24
done := make(chan struct{})
25
26
mutex := rs.NewMutex(name, redsync.WithExpiry(expiry), redsync.WithTries(1))
27
28
logger.Debug("Acquiring mutex")
29
if err := mutex.LockContext(ctx); err != nil {
30
logger.WithError(err).Debugf("Failed to acquire mutex.")
31
return err
32
}
33
logger.Debugf("Acquired mutex. Mutex valid until: %s and will be refreshed every %v if job runs for longer.", mutex.Until().UTC(), refreshThreshold.String())
34
35
defer func() {
36
// we always signal that our run is complete, to ensure our mutex refresh go-routine exits
37
close(done)
38
}()
39
40
go func() {
41
logger.Debug("Running routine to refresh mutex lock if job runs longer than expiry.")
42
ticker := time.NewTicker(refreshThreshold)
43
44
for {
45
select {
46
// either we're done, and we exit
47
case <-done:
48
logger.Debug("Job has completed, stopping mutex refresh routine.")
49
ticker.Stop()
50
return
51
52
// or we're not yet done and need to extend the mutex
53
case <-ticker.C:
54
log.Debug("Extending mutex because job is still running.")
55
_, err := mutex.ExtendContext(ctx)
56
if err != nil {
57
log.Log.WithError(err).Errorf("Failed to extend %s mutex.", name)
58
}
59
60
log.Debugf("Succesfully extended mutex. Mutex valid until: %v", mutex.Until().UTC())
61
}
62
}
63
}()
64
65
logger.Debug("Running job inside mutex.")
66
fnErr := fn(ctx)
67
68
// release the lock, it will be acquired on subsequent run, possibly by another instance of this job.
69
logger.Debug("Completed job inside mutex. Releasing mutex lock.")
70
if _, err := mutex.UnlockContext(ctx); err != nil {
71
logger.WithError(err).Error("Failed to release mutex.")
72
}
73
74
logger.Debug("Mutex succesfully released.")
75
return fnErr
76
}
77
78