Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/supervisor/pkg/dropwriter/dropwriter.go
2500 views
1
// Copyright (c) 2020 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package dropwriter
6
7
import (
8
"io"
9
"sync"
10
"time"
11
)
12
13
// Clock abstracts time for the bucket limiter.
14
type Clock func() time.Time
15
16
// NewBucket creates a new bucket limiter with a realtime clock.
17
func NewBucket(capacity, refillRatePerSec int64) *Bucket {
18
return NewBucketClock(capacity, refillRatePerSec, time.Now)
19
}
20
21
// NewBucketClock produces a new bucket limiter with a custom clock. Useful for testing.
22
func NewBucketClock(capacity, refillRatePerSec int64, clock Clock) *Bucket {
23
return &Bucket{
24
clock: clock,
25
capacity: capacity,
26
refillRate: refillRatePerSec,
27
}
28
}
29
30
// Bucket implements a token bucket limiter.
31
type Bucket struct {
32
clock Clock
33
34
// capacity is the total token capacity of this bucket
35
capacity int64
36
37
// refillRate holds how many tokens we refill per second
38
refillRate int64
39
40
// mu syncs bucket access
41
mu sync.Mutex
42
43
// availableTokens is the total number of tokens currently available
44
availableTokens int64
45
46
// lastTick is the last time we adjusted the available token count
47
lastTick time.Time
48
}
49
50
func (b *Bucket) adjustTokens() {
51
b.mu.Lock()
52
defer b.mu.Unlock()
53
54
now := b.clock()
55
defer func() {
56
b.lastTick = now
57
}()
58
59
if b.lastTick.IsZero() {
60
// first adjustment/tick ever - set availableTokens to capacity
61
b.availableTokens = b.capacity
62
return
63
}
64
65
b.availableTokens += int64(now.Sub(b.lastTick).Seconds() * float64(b.refillRate))
66
if b.availableTokens > b.capacity {
67
b.availableTokens = b.capacity
68
}
69
}
70
71
// TakeAvailable attempts to remove req tokens from the bucket. If there are fewer tokens available
72
// all remaining tokens are removed and returned.
73
func (b *Bucket) TakeAvailable(req int64) int64 {
74
b.adjustTokens()
75
76
b.mu.Lock()
77
defer b.mu.Unlock()
78
79
grant := req
80
if grant > b.availableTokens {
81
grant = b.availableTokens
82
}
83
b.availableTokens -= grant
84
85
return grant
86
}
87
88
type writer struct {
89
w io.Writer
90
bucket *Bucket
91
}
92
93
func (w *writer) Write(buf []byte) (n int, err error) {
94
grant := w.bucket.TakeAvailable(int64(len(buf)))
95
n, err = w.w.Write(buf[:grant])
96
if err != nil {
97
return
98
}
99
100
// We act as though we had written the whole buffer. This is what actually implements
101
// the byte drop imposed by the bucket limiter. if we returned the correct number of bytes
102
// here the caller might err with ErrShortWrite or simply try again.
103
n = len(buf)
104
105
return
106
}
107
108
// Writer produces a new rate limited dropping writer.
109
func Writer(dst io.Writer, b *Bucket) io.Writer {
110
return &writer{w: dst, bucket: b}
111
}
112
113