Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/ws-daemon/pkg/cpulimit/cpulimit_test.go
2500 views
1
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package cpulimit_test
6
7
import (
8
"context"
9
"encoding/json"
10
"errors"
11
"fmt"
12
"io"
13
"io/fs"
14
"io/ioutil"
15
"math"
16
"math/rand"
17
"os"
18
"testing"
19
"time"
20
21
"github.com/gitpod-io/gitpod/ws-daemon/pkg/cpulimit"
22
)
23
24
const (
25
totalCapacity = cpulimit.Bandwidth(12000)
26
testSampleCount = 1000
27
testDt = 10 * time.Second
28
testDuration = testSampleCount * testDt
29
)
30
31
var (
32
defaultLimit = cpulimit.FixedLimiter(2000)
33
defaultBreakoutLimit = cpulimit.FixedLimiter(6000)
34
)
35
36
// Consumer consumes CPU time
37
type Consumer interface {
38
ID() string
39
Rate(t time.Duration) cpulimit.Bandwidth
40
QoS() int
41
}
42
43
// SteadyConsumer consumes constant CPU time
44
type SteadyConsumer struct {
45
id string
46
rate cpulimit.Bandwidth
47
qos int
48
}
49
50
func (s SteadyConsumer) ID() string { return s.id }
51
func (s SteadyConsumer) Rate(t time.Duration) cpulimit.Bandwidth { return s.rate }
52
func (s SteadyConsumer) QoS() int { return s.qos }
53
54
// SinusoidalConsumer consumes sinusoidal shaped CPU time
55
type SinusoidalConsumer struct {
56
id string
57
phase time.Duration
58
period time.Duration
59
ampl cpulimit.Bandwidth
60
qos int
61
}
62
63
func (s SinusoidalConsumer) ID() string { return s.id }
64
func (s SinusoidalConsumer) Rate(t time.Duration) cpulimit.Bandwidth {
65
pt := (t - s.phase).Seconds()
66
pr := math.Pi / s.period.Seconds()
67
ampl := float64(s.ampl)
68
return cpulimit.Bandwidth(ampl*math.Sin(pt*pr) + ampl)
69
}
70
func (s SinusoidalConsumer) QoS() int { return s.qos }
71
72
// SpikyConsumer randomly spikes its CPU use
73
type SpikyConsumer struct {
74
Consumer
75
76
MinSpike, MaxSpike cpulimit.Bandwidth
77
LikelyHood float64
78
MinLatch, MaxLatch time.Duration
79
80
latch time.Duration
81
latchedVal cpulimit.Bandwidth
82
}
83
84
func (s *SpikyConsumer) Rate(t time.Duration) cpulimit.Bandwidth {
85
if t < s.latch {
86
return s.latchedVal
87
}
88
if rand.Float64() < s.LikelyHood {
89
s.latch = t + s.MinLatch + time.Duration(rand.Int63n(int64(s.MaxLatch-s.MinLatch)))
90
s.latchedVal = s.MinSpike + cpulimit.Bandwidth(rand.Int63n(int64(s.MaxSpike-s.MinSpike)))
91
return s.latchedVal
92
}
93
return s.Consumer.Rate(t)
94
}
95
96
type RecordedConsumer struct {
97
Id string `json:"id"`
98
Qos int `json:"qos"`
99
T []time.Duration `json:"times"`
100
R []cpulimit.Bandwidth `json:"rates"`
101
}
102
103
func (s RecordedConsumer) ID() string { return s.Id }
104
func (s RecordedConsumer) Rate(t time.Duration) cpulimit.Bandwidth {
105
var idx int
106
for idx = 0; idx < len(s.T) && s.T[idx] < t; idx++ {
107
}
108
if idx > 0 {
109
idx--
110
}
111
return s.R[idx]
112
}
113
func (s RecordedConsumer) QoS() int { return s.Qos }
114
115
func RecordConsumer(consumer Consumer, dt, totalT time.Duration) *RecordedConsumer {
116
var res RecordedConsumer
117
res.Id = consumer.ID()
118
res.Qos = consumer.QoS()
119
for t := 0 * time.Second; t < totalT; t += dt {
120
res.T = append(res.T, t)
121
res.R = append(res.R, consumer.Rate(t))
122
}
123
return &res
124
}
125
126
// NewNode produces a new virtual machine
127
func NewNode(c ...Consumer) *Node {
128
return &Node{
129
Consumer: c,
130
State: make(map[string]*consumerState, len(c)),
131
ClampOnAvailableBandwidth: true,
132
}
133
}
134
135
// Node repsents a single node in a cluster
136
type Node struct {
137
Consumer []Consumer
138
State map[string]*consumerState
139
140
ClampOnAvailableBandwidth bool
141
bandwidthReq cpulimit.Bandwidth
142
bandwidthUsed cpulimit.Bandwidth
143
}
144
145
type consumerState struct {
146
Consumer Consumer
147
Limit cpulimit.Bandwidth
148
Usage cpulimit.CPUTime
149
Throttled uint64
150
}
151
152
// Tick ticks time
153
func (n *Node) Tick(totalT, dt time.Duration) {
154
var (
155
bw = make(map[string]cpulimit.Bandwidth, len(n.Consumer))
156
thr = make(map[string]bool, len(n.Consumer))
157
totalBW cpulimit.Bandwidth
158
)
159
for _, c := range n.Consumer {
160
state, ok := n.State[c.ID()]
161
if !ok {
162
state = &consumerState{Consumer: c}
163
n.State[c.ID()] = state
164
}
165
166
// apply limit
167
bandwidth := c.Rate(totalT)
168
if state.Limit != 0 && bandwidth > state.Limit {
169
bandwidth = state.Limit
170
thr[c.ID()] = true
171
}
172
173
bw[c.ID()] = bandwidth
174
totalBW += bandwidth
175
}
176
177
n.bandwidthReq = totalBW
178
if n.ClampOnAvailableBandwidth && totalBW > totalCapacity {
179
// if we've overbooked, we subtract an equal amount from everyone
180
for i := 0; i < 100; i++ {
181
if totalBW <= totalCapacity {
182
break
183
}
184
185
overbook := totalBW - totalCapacity
186
sub := overbook/cpulimit.Bandwidth(len(n.Consumer)) + 1
187
for id := range bw {
188
if bw[id] < sub {
189
totalBW -= bw[id]
190
bw[id] = 0
191
} else {
192
totalBW -= sub
193
bw[id] -= sub
194
}
195
thr[id] = true
196
}
197
}
198
}
199
n.bandwidthUsed = totalBW
200
201
// consume bandwidth and update throttled status
202
for id := range bw {
203
state := n.State[id]
204
state.Usage += bw[id].Integrate(dt)
205
if thr[id] {
206
state.Throttled++
207
}
208
}
209
}
210
211
// Source acts as source to a distributor
212
func (n *Node) Source(context.Context) ([]cpulimit.Workspace, error) {
213
var res []cpulimit.Workspace
214
for id, w := range n.State {
215
res = append(res, cpulimit.Workspace{
216
ID: id,
217
NrThrottled: w.Throttled,
218
Usage: w.Usage,
219
QoS: w.Consumer.QoS(),
220
})
221
}
222
return res, nil
223
}
224
225
// Sink acts as sink for a distributor
226
func (n *Node) Sink(id string, limit cpulimit.Bandwidth, burst bool) {
227
n.State[id].Limit = limit
228
}
229
230
func (n *Node) DumpHeader(out io.Writer) {
231
fmt.Fprintf(out, "t,id,desiredrate,throttled,usage,limit,actualrate,bwavail,bwused,bwreq,bwbreak\n")
232
}
233
234
// Dump dumps the internal state
235
func (n *Node) Dump(out io.Writer, t time.Duration, dbg cpulimit.DistributorDebug) {
236
for _, c := range n.Consumer {
237
actualRate := c.Rate(t)
238
state := n.State[c.ID()]
239
limit := state.Limit
240
if actualRate > limit {
241
actualRate = limit
242
}
243
fmt.Fprintf(out, "%d,%s,%d,%d,%d,%d,%d,%d,%d,%d,%d\n", t, c.ID(), c.Rate(t), state.Throttled, time.Duration(state.Usage).Milliseconds(), state.Limit, actualRate, totalCapacity, n.bandwidthUsed, n.bandwidthReq, dbg.BandwidthBurst)
244
}
245
}
246
247
func TestBucketLimitsEatAll(t *testing.T) {
248
node := NewNode(
249
SteadyConsumer{id: "q1", rate: 6000, qos: -1},
250
SteadyConsumer{id: "q2", rate: 4000, qos: -1},
251
SteadyConsumer{id: "a1", rate: 5000},
252
SteadyConsumer{id: "a2", rate: 3000},
253
SteadyConsumer{id: "a3", rate: 2000},
254
SteadyConsumer{id: "a4", rate: 1000},
255
)
256
dist := cpulimit.NewDistributor(node.Source, node.Sink, defaultLimit, defaultBreakoutLimit, totalCapacity)
257
runSimulation(t, node, dist)
258
}
259
260
func TestBucketLimitsSine(t *testing.T) {
261
node := NewNode(
262
SteadyConsumer{id: "q1", rate: 5000, qos: -1},
263
SteadyConsumer{id: "a2", rate: 3000},
264
SteadyConsumer{id: "a3", rate: 2000},
265
SteadyConsumer{id: "a4", rate: 1000},
266
SinusoidalConsumer{
267
id: "s1",
268
phase: 0,
269
period: 15 * time.Minute,
270
ampl: 5000,
271
},
272
)
273
dist := cpulimit.NewDistributor(node.Source, node.Sink, defaultLimit, defaultBreakoutLimit, totalCapacity)
274
runSimulation(t, node, dist)
275
}
276
277
func TestBucketLimitsMiner(t *testing.T) {
278
cs := defaultConsumerSet(t)
279
cs = append(cs, SteadyConsumer{id: "miner01", rate: 10000})
280
node := NewNode(cs...)
281
282
dist := cpulimit.NewDistributor(node.Source, node.Sink, defaultLimit, defaultBreakoutLimit, totalCapacity)
283
284
runSimulation(t, node, dist)
285
}
286
287
func TestBucketLimitsMixedQoS(t *testing.T) {
288
cs := defaultConsumerSet(t)
289
cs = cs[5:]
290
cs = append(cs, defaultQoSConsumerSet(t)...)
291
node := NewNode(cs...)
292
293
dist := cpulimit.NewDistributor(node.Source, node.Sink, defaultLimit, defaultBreakoutLimit, totalCapacity)
294
295
runSimulation(t, node, dist)
296
}
297
298
func TestBucketLimitsMaxConsumer(t *testing.T) {
299
var cs []Consumer
300
for i := 0; i < 20; i++ {
301
cs = append(cs,
302
SteadyConsumer{id: fmt.Sprintf("c%02d", i), rate: 10000},
303
)
304
}
305
node := NewNode(cs...)
306
dist := cpulimit.NewDistributor(node.Source, node.Sink, defaultLimit, defaultBreakoutLimit, totalCapacity)
307
308
runSimulation(t, node, dist)
309
}
310
311
func TestBucketLimitsNewProdBehaviour(t *testing.T) {
312
cs := defaultConsumerSet(t)
313
node := NewNode(cs...)
314
315
dist := cpulimit.NewDistributor(node.Source, node.Sink, defaultLimit, defaultBreakoutLimit, totalCapacity)
316
317
runSimulation(t, node, dist)
318
}
319
320
func TestProdBehaviour(t *testing.T) {
321
node := NewNode(defaultConsumerSet(t)...)
322
limiter := cpulimit.BucketLimiter{
323
cpulimit.Bucket{Budget: 5 * 60 * 6000, Limit: 6000},
324
cpulimit.Bucket{Budget: 5 * 60 * 4000, Limit: 4000},
325
cpulimit.Bucket{Budget: 5 * 60 * 2000, Limit: 2000},
326
}
327
breakoutLimiter := limiter
328
dist := cpulimit.NewDistributor(node.Source, node.Sink, limiter, breakoutLimiter, totalCapacity)
329
330
runSimulation(t, node, dist)
331
}
332
333
func runSimulation(t *testing.T, node *Node, dist *cpulimit.Distributor) {
334
f, err := os.OpenFile(fmt.Sprintf("sim_%s.csv", t.Name()), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0744)
335
if err != nil {
336
t.Fatal(err)
337
}
338
defer f.Close()
339
node.DumpHeader(f)
340
341
totalT := 0 * time.Second
342
for i := 0; i < testSampleCount; i++ {
343
node.Tick(totalT, testDt)
344
dbg, _ := dist.Tick(testDt)
345
node.Dump(f, totalT, dbg)
346
totalT += testDt
347
}
348
}
349
350
func defaultConsumerSet(t *testing.T) []Consumer {
351
const fn = "default-consumer.json"
352
var res []Consumer
353
for i := 0; i < 15; i++ {
354
c := &SpikyConsumer{
355
Consumer: SteadyConsumer{id: fmt.Sprintf("c%02d", i), rate: 200},
356
MaxSpike: 6000,
357
MinSpike: 3000,
358
LikelyHood: 0.01,
359
MinLatch: 10 * time.Second,
360
MaxLatch: 5 * time.Minute,
361
}
362
res = append(res, c)
363
}
364
return generateOrRestoreConsumers(t, fn, res)
365
}
366
367
func defaultQoSConsumerSet(t *testing.T) []Consumer {
368
const fn = "default-qos-consumer.json"
369
var res []Consumer
370
for i := 0; i < 5; i++ {
371
c := &SpikyConsumer{
372
Consumer: SteadyConsumer{id: fmt.Sprintf("q%02d", i), rate: 200, qos: -1},
373
MaxSpike: 6000,
374
MinSpike: 4000,
375
LikelyHood: 0.05,
376
MinLatch: 10 * time.Second,
377
MaxLatch: 5 * time.Minute,
378
}
379
res = append(res, c)
380
}
381
return generateOrRestoreConsumers(t, fn, res)
382
}
383
384
func generateOrRestoreConsumers(t *testing.T, fn string, cs []Consumer) []Consumer {
385
fc, err := os.ReadFile(fn)
386
if errors.Is(err, fs.ErrNotExist) {
387
var (
388
rcs []*RecordedConsumer
389
res []Consumer
390
)
391
392
for _, c := range cs {
393
rc := RecordConsumer(c, testDt, testDuration)
394
rcs = append(rcs, rc)
395
res = append(res, rc)
396
}
397
fc, _ := json.Marshal(rcs)
398
err = ioutil.WriteFile(fn, fc, 0644)
399
if err != nil {
400
t.Fatal(err)
401
}
402
return res
403
}
404
if err != nil {
405
t.Fatal(err)
406
}
407
408
var rcs []*RecordedConsumer
409
err = json.Unmarshal(fc, &rcs)
410
if err != nil {
411
t.Fatal(err)
412
}
413
414
res := make([]Consumer, len(rcs))
415
for i := range rcs {
416
res[i] = rcs[i]
417
}
418
419
return res
420
}
421
422