Path: blob/main/components/supervisor/pkg/dropwriter/dropwriter.go
2500 views
// Copyright (c) 2020 Gitpod GmbH. All rights reserved.1// Licensed under the GNU Affero General Public License (AGPL).2// See License.AGPL.txt in the project root for license information.34package dropwriter56import (7"io"8"sync"9"time"10)1112// Clock abstracts time for the bucket limiter.13type Clock func() time.Time1415// NewBucket creates a new bucket limiter with a realtime clock.16func NewBucket(capacity, refillRatePerSec int64) *Bucket {17return NewBucketClock(capacity, refillRatePerSec, time.Now)18}1920// NewBucketClock produces a new bucket limiter with a custom clock. Useful for testing.21func NewBucketClock(capacity, refillRatePerSec int64, clock Clock) *Bucket {22return &Bucket{23clock: clock,24capacity: capacity,25refillRate: refillRatePerSec,26}27}2829// Bucket implements a token bucket limiter.30type Bucket struct {31clock Clock3233// capacity is the total token capacity of this bucket34capacity int643536// refillRate holds how many tokens we refill per second37refillRate int643839// mu syncs bucket access40mu sync.Mutex4142// availableTokens is the total number of tokens currently available43availableTokens int644445// lastTick is the last time we adjusted the available token count46lastTick time.Time47}4849func (b *Bucket) adjustTokens() {50b.mu.Lock()51defer b.mu.Unlock()5253now := b.clock()54defer func() {55b.lastTick = now56}()5758if b.lastTick.IsZero() {59// first adjustment/tick ever - set availableTokens to capacity60b.availableTokens = b.capacity61return62}6364b.availableTokens += int64(now.Sub(b.lastTick).Seconds() * float64(b.refillRate))65if b.availableTokens > b.capacity {66b.availableTokens = b.capacity67}68}6970// TakeAvailable attempts to remove req tokens from the bucket. If there are fewer tokens available71// all remaining tokens are removed and returned.72func (b *Bucket) TakeAvailable(req int64) int64 {73b.adjustTokens()7475b.mu.Lock()76defer b.mu.Unlock()7778grant := req79if grant > b.availableTokens {80grant = b.availableTokens81}82b.availableTokens -= grant8384return grant85}8687type writer struct {88w io.Writer89bucket *Bucket90}9192func (w *writer) Write(buf []byte) (n int, err error) {93grant := w.bucket.TakeAvailable(int64(len(buf)))94n, err = w.w.Write(buf[:grant])95if err != nil {96return97}9899// We act as though we had written the whole buffer. This is what actually implements100// the byte drop imposed by the bucket limiter. if we returned the correct number of bytes101// here the caller might err with ErrShortWrite or simply try again.102n = len(buf)103104return105}106107// Writer produces a new rate limited dropping writer.108func Writer(dst io.Writer, b *Bucket) io.Writer {109return &writer{w: dst, bucket: b}110}111112113