Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/ws-daemon/pkg/cpulimit/cpulimit.go
2500 views
1
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package cpulimit
6
7
import (
8
"context"
9
"sort"
10
"strings"
11
"time"
12
13
"github.com/gitpod-io/gitpod/common-go/log"
14
"github.com/sirupsen/logrus"
15
"golang.org/x/xerrors"
16
"k8s.io/apimachinery/pkg/api/resource"
17
)
18
19
type Workspace struct {
20
ID string
21
22
NrThrottled uint64
23
Usage CPUTime
24
QoS int
25
Annotations map[string]string
26
}
27
28
type WorkspaceHistory struct {
29
ID string
30
31
LastUpdate *Workspace
32
UsageT0 CPUTime
33
ThrottleLag uint64
34
Limit Bandwidth
35
}
36
37
func (h *WorkspaceHistory) Usage() CPUTime {
38
if h == nil || h.LastUpdate == nil {
39
return 0
40
}
41
return h.LastUpdate.Usage - h.UsageT0
42
}
43
44
func (h *WorkspaceHistory) Update(w Workspace) {
45
if h.LastUpdate == nil {
46
h.UsageT0 = w.Usage
47
} else {
48
h.ThrottleLag = h.LastUpdate.NrThrottled
49
}
50
h.LastUpdate = &w
51
}
52
53
func (h *WorkspaceHistory) Throttled() bool {
54
if h.LastUpdate == nil || h.ThrottleLag == 0 {
55
return false
56
}
57
58
return h.ThrottleLag != h.LastUpdate.NrThrottled
59
}
60
61
type DistributorSource func(context.Context) ([]Workspace, error)
62
type DistributorSink func(id string, limit Bandwidth, burst bool)
63
64
func NewDistributor(source DistributorSource, sink DistributorSink, limiter ResourceLimiter, burstLimiter ResourceLimiter, totalBandwidth Bandwidth) *Distributor {
65
return &Distributor{
66
Source: source,
67
Sink: sink,
68
Limiter: limiter,
69
BurstLimiter: burstLimiter,
70
TotalBandwidth: totalBandwidth,
71
History: make(map[string]*WorkspaceHistory),
72
}
73
}
74
75
type Distributor struct {
76
Source DistributorSource
77
Sink DistributorSink
78
79
History map[string]*WorkspaceHistory
80
Limiter ResourceLimiter
81
BurstLimiter ResourceLimiter
82
83
// TotalBandwidth is the total CPU time available in nanoseconds per second
84
TotalBandwidth Bandwidth
85
LastTickUsage CPUTime
86
87
// Log is used (if not nil) to log out errors. If log is nil, no logging happens.
88
Log *logrus.Entry
89
}
90
91
type DistributorDebug struct {
92
BandwidthAvail, BandwidthUsed, BandwidthBurst Bandwidth
93
}
94
95
// Run starts a ticker which repeatedly calls Tick until the context is canceled.
96
// This function does not return until the context is canceled.
97
func (d *Distributor) Run(ctx context.Context, dt time.Duration) {
98
t := time.NewTicker(dt)
99
defer t.Stop()
100
101
go func() {
102
for range t.C {
103
_, err := d.Tick(dt)
104
if err != nil && d.Log != nil {
105
d.Log.WithError(err).Warn("cannot advance CPU limit distributor")
106
}
107
}
108
}()
109
<-ctx.Done()
110
}
111
112
// Tick drives the distributor and pushes out new limits.
113
// Callers are epxected to call this function repeatedly, with dt time inbetween calls.
114
func (d *Distributor) Tick(dt time.Duration) (DistributorDebug, error) {
115
// update state
116
ws, err := d.Source(context.Background())
117
if err != nil {
118
return DistributorDebug{}, err
119
}
120
121
f := make(map[string]struct{}, len(ws))
122
for _, w := range ws {
123
h, ok := d.History[w.ID]
124
if !ok {
125
h = &WorkspaceHistory{
126
ID: w.ID,
127
}
128
d.History[w.ID] = h
129
}
130
h.Update(w)
131
f[w.ID] = struct{}{}
132
}
133
for oldWS := range d.History {
134
if _, found := f[oldWS]; !found {
135
delete(d.History, oldWS)
136
}
137
}
138
139
var totalUsage CPUTime
140
wsOrder := make([]string, 0, len(d.History))
141
for id, h := range d.History {
142
wsOrder = append(wsOrder, id)
143
totalUsage += h.Usage()
144
}
145
146
// We order workspaces by their QoS class first. Within the same class we order
147
// by usage: lowest usage -> highest priority
148
sort.Slice(wsOrder, func(i, j int) bool {
149
uI := d.History[wsOrder[i]].Usage()
150
uJ := d.History[wsOrder[j]].Usage()
151
qI := d.History[wsOrder[i]].LastUpdate.QoS
152
qJ := d.History[wsOrder[j]].LastUpdate.QoS
153
if qI == qJ {
154
return uI < uJ
155
}
156
return qI < qJ
157
})
158
159
if d.LastTickUsage == 0 {
160
d.LastTickUsage = totalUsage
161
return DistributorDebug{
162
BandwidthAvail: d.TotalBandwidth,
163
BandwidthUsed: 0,
164
}, nil
165
}
166
167
totalBandwidth, err := BandwithFromUsage(d.LastTickUsage, totalUsage, dt)
168
d.LastTickUsage = totalUsage
169
if err != nil {
170
return DistributorDebug{
171
BandwidthAvail: d.TotalBandwidth,
172
BandwidthUsed: 0,
173
}, err
174
}
175
176
// enforce limits
177
var burstBandwidth Bandwidth
178
for _, id := range wsOrder {
179
ws := d.History[id]
180
limit, err := d.Limiter.Limit(ws)
181
if err != nil {
182
log.WithError(err).Errorf("unable to apply min limit")
183
continue
184
}
185
186
// if we didn't get the max bandwidth, but were throttled last time
187
// and there's still some bandwidth left to give, let's act as if had
188
// never spent any CPU time and assume the workspace will spend their
189
// entire bandwidth at once.
190
var burst bool
191
if totalBandwidth < d.TotalBandwidth && ws.Throttled() {
192
limit, err = d.BurstLimiter.Limit(ws)
193
if err != nil {
194
log.WithError(err).Errorf("unable to apply burst limit")
195
continue
196
}
197
198
// We assume the workspace is going to use as much as their limit allows.
199
// This might not be true, because their process which consumed so much CPU
200
// may have ended by now.
201
totalBandwidth += limit
202
203
burstBandwidth += limit
204
}
205
206
d.Sink(id, limit, burst)
207
}
208
209
return DistributorDebug{
210
BandwidthAvail: d.TotalBandwidth,
211
BandwidthUsed: totalBandwidth,
212
BandwidthBurst: burstBandwidth,
213
}, nil
214
}
215
216
func (d *Distributor) Reset() {
217
d.History = make(map[string]*WorkspaceHistory)
218
}
219
220
// ResourceLimiter implements a strategy to limit the resurce use of a workspace
221
type ResourceLimiter interface {
222
Limit(wsh *WorkspaceHistory) (Bandwidth, error)
223
}
224
225
var _ ResourceLimiter = (*fixedLimiter)(nil)
226
var _ ResourceLimiter = (*annotationLimiter)(nil)
227
var _ ResourceLimiter = (*BucketLimiter)(nil)
228
var _ ResourceLimiter = (*ClampingBucketLimiter)(nil)
229
var _ ResourceLimiter = (*compositeLimiter)(nil)
230
231
// FixedLimiter returns a fixed limit
232
func FixedLimiter(limit Bandwidth) ResourceLimiter {
233
return fixedLimiter{limit}
234
}
235
236
type fixedLimiter struct {
237
FixedLimit Bandwidth
238
}
239
240
func (f fixedLimiter) Limit(wsh *WorkspaceHistory) (Bandwidth, error) {
241
return f.FixedLimit, nil
242
}
243
244
func AnnotationLimiter(annotation string) ResourceLimiter {
245
return annotationLimiter{
246
Annotation: annotation,
247
}
248
}
249
250
type annotationLimiter struct {
251
Annotation string
252
}
253
254
func (a annotationLimiter) Limit(wsh *WorkspaceHistory) (Bandwidth, error) {
255
value, ok := wsh.LastUpdate.Annotations[a.Annotation]
256
if !ok {
257
return 0, xerrors.Errorf("no annotation named %s found on workspace %s", a.Annotation, wsh.ID)
258
}
259
260
limit, err := resource.ParseQuantity(value)
261
if err != nil {
262
return 0, xerrors.Errorf("failed to parse %s for workspace %s", limit, wsh.ID)
263
}
264
265
return BandwidthFromQuantity(limit), nil
266
}
267
268
// Bucket describes a "pot of CPU time" which can be spent at a particular rate.
269
type Bucket struct {
270
Budget CPUTime `json:"budget"`
271
Limit Bandwidth `json:"limit"`
272
}
273
274
// BucketLimiter limits CPU use based on different "pots of CPU time".
275
// The current limit is decided by the current bucket which is taken in order.
276
// For example:
277
//
278
// buckets = [ { Budget: 50, Limit: 20 }, { Budget: 20, Limit: 10 }, { Budget: 0, Limit: 5 } ]
279
// budgetSpent = totalBudget - budgetLeft == 65
280
// then the current limit is 10, because we have spent all our budget from bucket 0, and are currently
281
// spending from the second bucket.
282
//
283
// The last bucket's Budget is always ignored and becomes the default limit if all other
284
// buckets are used up.
285
// If the list of buckets is empty, this limiter limits to zero.
286
type BucketLimiter []Bucket
287
288
// Limit limits spending based on the budget that's left
289
func (buckets BucketLimiter) Limit(wsh *WorkspaceHistory) (Bandwidth, error) {
290
budgetSpent := wsh.Usage()
291
292
for i, bkt := range buckets {
293
if i+1 == len(buckets) {
294
// We've reached the last bucket - budget doesn't matter anymore
295
return bkt.Limit, nil
296
}
297
298
budgetSpent -= bkt.Budget
299
if budgetSpent <= 0 {
300
// BudgetSpent value is in this bucket, hence we have found our current bucket
301
return bkt.Limit, nil
302
}
303
}
304
305
// empty bucket list
306
return 0, nil
307
}
308
309
// ClampingBucketLimiter is a stateful limiter that clamps the limit to the last bucket once that bucket is reached.
310
// Clamping happens until less budget has been used as permitted by that bucket.
311
type ClampingBucketLimiter struct {
312
Buckets []Bucket
313
lastBucketLock bool
314
}
315
316
// Limit decides on a CPU use limit
317
func (bl *ClampingBucketLimiter) Limit(wsh *WorkspaceHistory) (Bandwidth, error) {
318
budgetSpent := wsh.Usage()
319
320
if bl.lastBucketLock {
321
if budgetSpent < bl.Buckets[len(bl.Buckets)-1].Budget {
322
bl.lastBucketLock = false
323
}
324
}
325
if bl.lastBucketLock {
326
return bl.Buckets[len(bl.Buckets)-1].Limit, nil
327
}
328
329
for i, bkt := range bl.Buckets {
330
if i+1 == len(bl.Buckets) {
331
// We've reached the last bucket - budget doesn't matter anymore
332
bl.lastBucketLock = true
333
return bkt.Limit, nil
334
}
335
336
budgetSpent -= bkt.Budget
337
if budgetSpent <= 0 {
338
// BudgetSpent value is in this bucket, hence we have found our current bucket
339
return bkt.Limit, nil
340
}
341
}
342
343
// empty bucket list
344
return 0, nil
345
}
346
347
type compositeLimiter struct {
348
limiters []ResourceLimiter
349
}
350
351
func CompositeLimiter(limiters ...ResourceLimiter) ResourceLimiter {
352
return &compositeLimiter{
353
limiters: limiters,
354
}
355
}
356
357
func (cl *compositeLimiter) Limit(wsh *WorkspaceHistory) (Bandwidth, error) {
358
var errs []error
359
for _, limiter := range cl.limiters {
360
limit, err := limiter.Limit(wsh)
361
if err != nil {
362
errs = append(errs, err)
363
continue
364
}
365
366
return limit, nil
367
}
368
369
allerr := make([]string, len(errs))
370
for i, err := range errs {
371
allerr[i] = err.Error()
372
}
373
return 0, xerrors.Errorf("no limiter was able to provide a limit", strings.Join(allerr, ", "))
374
}
375
376
type CFSController interface {
377
// Usage returns the cpuacct.usage value of the cgroup
378
Usage() (usage CPUTime, err error)
379
// SetQuota sets a new CFS quota on the cgroup
380
SetLimit(limit Bandwidth) (changed bool, err error)
381
// NrThrottled returns the number of CFS periods the cgroup was throttled
382
NrThrottled() (uint64, error)
383
}
384
385