Path: blob/main/components/usage/pkg/scheduler/mutex.go
2498 views
// Copyright (c) 2023 Gitpod GmbH. All rights reserved.1// Licensed under the GNU Affero General Public License (AGPL).2// See License.AGPL.txt in the project root for license information.34package scheduler56import (7"context"8"time"910"github.com/gitpod-io/gitpod/common-go/log"11"github.com/go-redsync/redsync/v4"12)1314func WithRefreshingMutex(ctx context.Context, rs *redsync.Redsync, name string, expiry time.Duration, fn func(ctx context.Context) error) error {15logger := log.Log.WithField("mutexName", name).WithField("mutexExpiry", expiry)1617// Refresh the mutex 10 seconds before it expires, or every 1 second at minimum18refreshThreshold := expiry - 10*time.Second19if refreshThreshold < 0 {20refreshThreshold = 1 * time.Second21}2223done := make(chan struct{})2425mutex := rs.NewMutex(name, redsync.WithExpiry(expiry), redsync.WithTries(1))2627logger.Debug("Acquiring mutex")28if err := mutex.LockContext(ctx); err != nil {29logger.WithError(err).Debugf("Failed to acquire mutex.")30return err31}32logger.Debugf("Acquired mutex. Mutex valid until: %s and will be refreshed every %v if job runs for longer.", mutex.Until().UTC(), refreshThreshold.String())3334defer func() {35// we always signal that our run is complete, to ensure our mutex refresh go-routine exits36close(done)37}()3839go func() {40logger.Debug("Running routine to refresh mutex lock if job runs longer than expiry.")41ticker := time.NewTicker(refreshThreshold)4243for {44select {45// either we're done, and we exit46case <-done:47logger.Debug("Job has completed, stopping mutex refresh routine.")48ticker.Stop()49return5051// or we're not yet done and need to extend the mutex52case <-ticker.C:53log.Debug("Extending mutex because job is still running.")54_, err := mutex.ExtendContext(ctx)55if err != nil {56log.Log.WithError(err).Errorf("Failed to extend %s mutex.", name)57}5859log.Debugf("Succesfully extended mutex. Mutex valid until: %v", mutex.Until().UTC())60}61}62}()6364logger.Debug("Running job inside mutex.")65fnErr := fn(ctx)6667// release the lock, it will be acquired on subsequent run, possibly by another instance of this job.68logger.Debug("Completed job inside mutex. Releasing mutex lock.")69if _, err := mutex.UnlockContext(ctx); err != nil {70logger.WithError(err).Error("Failed to release mutex.")71}7273logger.Debug("Mutex succesfully released.")74return fnErr75}767778