Path: blob/main/components/ws-daemon/pkg/cpulimit/cpulimit_test.go
2500 views
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.1// Licensed under the GNU Affero General Public License (AGPL).2// See License.AGPL.txt in the project root for license information.34package cpulimit_test56import (7"context"8"encoding/json"9"errors"10"fmt"11"io"12"io/fs"13"io/ioutil"14"math"15"math/rand"16"os"17"testing"18"time"1920"github.com/gitpod-io/gitpod/ws-daemon/pkg/cpulimit"21)2223const (24totalCapacity = cpulimit.Bandwidth(12000)25testSampleCount = 100026testDt = 10 * time.Second27testDuration = testSampleCount * testDt28)2930var (31defaultLimit = cpulimit.FixedLimiter(2000)32defaultBreakoutLimit = cpulimit.FixedLimiter(6000)33)3435// Consumer consumes CPU time36type Consumer interface {37ID() string38Rate(t time.Duration) cpulimit.Bandwidth39QoS() int40}4142// SteadyConsumer consumes constant CPU time43type SteadyConsumer struct {44id string45rate cpulimit.Bandwidth46qos int47}4849func (s SteadyConsumer) ID() string { return s.id }50func (s SteadyConsumer) Rate(t time.Duration) cpulimit.Bandwidth { return s.rate }51func (s SteadyConsumer) QoS() int { return s.qos }5253// SinusoidalConsumer consumes sinusoidal shaped CPU time54type SinusoidalConsumer struct {55id string56phase time.Duration57period time.Duration58ampl cpulimit.Bandwidth59qos int60}6162func (s SinusoidalConsumer) ID() string { return s.id }63func (s SinusoidalConsumer) Rate(t time.Duration) cpulimit.Bandwidth {64pt := (t - s.phase).Seconds()65pr := math.Pi / s.period.Seconds()66ampl := float64(s.ampl)67return cpulimit.Bandwidth(ampl*math.Sin(pt*pr) + ampl)68}69func (s SinusoidalConsumer) QoS() int { return s.qos }7071// SpikyConsumer randomly spikes its CPU use72type SpikyConsumer struct {73Consumer7475MinSpike, MaxSpike cpulimit.Bandwidth76LikelyHood float6477MinLatch, MaxLatch time.Duration7879latch time.Duration80latchedVal cpulimit.Bandwidth81}8283func (s *SpikyConsumer) Rate(t time.Duration) cpulimit.Bandwidth {84if t < s.latch {85return s.latchedVal86}87if rand.Float64() < s.LikelyHood {88s.latch = t + s.MinLatch + time.Duration(rand.Int63n(int64(s.MaxLatch-s.MinLatch)))89s.latchedVal = s.MinSpike + cpulimit.Bandwidth(rand.Int63n(int64(s.MaxSpike-s.MinSpike)))90return s.latchedVal91}92return s.Consumer.Rate(t)93}9495type RecordedConsumer struct {96Id string `json:"id"`97Qos int `json:"qos"`98T []time.Duration `json:"times"`99R []cpulimit.Bandwidth `json:"rates"`100}101102func (s RecordedConsumer) ID() string { return s.Id }103func (s RecordedConsumer) Rate(t time.Duration) cpulimit.Bandwidth {104var idx int105for idx = 0; idx < len(s.T) && s.T[idx] < t; idx++ {106}107if idx > 0 {108idx--109}110return s.R[idx]111}112func (s RecordedConsumer) QoS() int { return s.Qos }113114func RecordConsumer(consumer Consumer, dt, totalT time.Duration) *RecordedConsumer {115var res RecordedConsumer116res.Id = consumer.ID()117res.Qos = consumer.QoS()118for t := 0 * time.Second; t < totalT; t += dt {119res.T = append(res.T, t)120res.R = append(res.R, consumer.Rate(t))121}122return &res123}124125// NewNode produces a new virtual machine126func NewNode(c ...Consumer) *Node {127return &Node{128Consumer: c,129State: make(map[string]*consumerState, len(c)),130ClampOnAvailableBandwidth: true,131}132}133134// Node repsents a single node in a cluster135type Node struct {136Consumer []Consumer137State map[string]*consumerState138139ClampOnAvailableBandwidth bool140bandwidthReq cpulimit.Bandwidth141bandwidthUsed cpulimit.Bandwidth142}143144type consumerState struct {145Consumer Consumer146Limit cpulimit.Bandwidth147Usage cpulimit.CPUTime148Throttled uint64149}150151// Tick ticks time152func (n *Node) Tick(totalT, dt time.Duration) {153var (154bw = make(map[string]cpulimit.Bandwidth, len(n.Consumer))155thr = make(map[string]bool, len(n.Consumer))156totalBW cpulimit.Bandwidth157)158for _, c := range n.Consumer {159state, ok := n.State[c.ID()]160if !ok {161state = &consumerState{Consumer: c}162n.State[c.ID()] = state163}164165// apply limit166bandwidth := c.Rate(totalT)167if state.Limit != 0 && bandwidth > state.Limit {168bandwidth = state.Limit169thr[c.ID()] = true170}171172bw[c.ID()] = bandwidth173totalBW += bandwidth174}175176n.bandwidthReq = totalBW177if n.ClampOnAvailableBandwidth && totalBW > totalCapacity {178// if we've overbooked, we subtract an equal amount from everyone179for i := 0; i < 100; i++ {180if totalBW <= totalCapacity {181break182}183184overbook := totalBW - totalCapacity185sub := overbook/cpulimit.Bandwidth(len(n.Consumer)) + 1186for id := range bw {187if bw[id] < sub {188totalBW -= bw[id]189bw[id] = 0190} else {191totalBW -= sub192bw[id] -= sub193}194thr[id] = true195}196}197}198n.bandwidthUsed = totalBW199200// consume bandwidth and update throttled status201for id := range bw {202state := n.State[id]203state.Usage += bw[id].Integrate(dt)204if thr[id] {205state.Throttled++206}207}208}209210// Source acts as source to a distributor211func (n *Node) Source(context.Context) ([]cpulimit.Workspace, error) {212var res []cpulimit.Workspace213for id, w := range n.State {214res = append(res, cpulimit.Workspace{215ID: id,216NrThrottled: w.Throttled,217Usage: w.Usage,218QoS: w.Consumer.QoS(),219})220}221return res, nil222}223224// Sink acts as sink for a distributor225func (n *Node) Sink(id string, limit cpulimit.Bandwidth, burst bool) {226n.State[id].Limit = limit227}228229func (n *Node) DumpHeader(out io.Writer) {230fmt.Fprintf(out, "t,id,desiredrate,throttled,usage,limit,actualrate,bwavail,bwused,bwreq,bwbreak\n")231}232233// Dump dumps the internal state234func (n *Node) Dump(out io.Writer, t time.Duration, dbg cpulimit.DistributorDebug) {235for _, c := range n.Consumer {236actualRate := c.Rate(t)237state := n.State[c.ID()]238limit := state.Limit239if actualRate > limit {240actualRate = limit241}242fmt.Fprintf(out, "%d,%s,%d,%d,%d,%d,%d,%d,%d,%d,%d\n", t, c.ID(), c.Rate(t), state.Throttled, time.Duration(state.Usage).Milliseconds(), state.Limit, actualRate, totalCapacity, n.bandwidthUsed, n.bandwidthReq, dbg.BandwidthBurst)243}244}245246func TestBucketLimitsEatAll(t *testing.T) {247node := NewNode(248SteadyConsumer{id: "q1", rate: 6000, qos: -1},249SteadyConsumer{id: "q2", rate: 4000, qos: -1},250SteadyConsumer{id: "a1", rate: 5000},251SteadyConsumer{id: "a2", rate: 3000},252SteadyConsumer{id: "a3", rate: 2000},253SteadyConsumer{id: "a4", rate: 1000},254)255dist := cpulimit.NewDistributor(node.Source, node.Sink, defaultLimit, defaultBreakoutLimit, totalCapacity)256runSimulation(t, node, dist)257}258259func TestBucketLimitsSine(t *testing.T) {260node := NewNode(261SteadyConsumer{id: "q1", rate: 5000, qos: -1},262SteadyConsumer{id: "a2", rate: 3000},263SteadyConsumer{id: "a3", rate: 2000},264SteadyConsumer{id: "a4", rate: 1000},265SinusoidalConsumer{266id: "s1",267phase: 0,268period: 15 * time.Minute,269ampl: 5000,270},271)272dist := cpulimit.NewDistributor(node.Source, node.Sink, defaultLimit, defaultBreakoutLimit, totalCapacity)273runSimulation(t, node, dist)274}275276func TestBucketLimitsMiner(t *testing.T) {277cs := defaultConsumerSet(t)278cs = append(cs, SteadyConsumer{id: "miner01", rate: 10000})279node := NewNode(cs...)280281dist := cpulimit.NewDistributor(node.Source, node.Sink, defaultLimit, defaultBreakoutLimit, totalCapacity)282283runSimulation(t, node, dist)284}285286func TestBucketLimitsMixedQoS(t *testing.T) {287cs := defaultConsumerSet(t)288cs = cs[5:]289cs = append(cs, defaultQoSConsumerSet(t)...)290node := NewNode(cs...)291292dist := cpulimit.NewDistributor(node.Source, node.Sink, defaultLimit, defaultBreakoutLimit, totalCapacity)293294runSimulation(t, node, dist)295}296297func TestBucketLimitsMaxConsumer(t *testing.T) {298var cs []Consumer299for i := 0; i < 20; i++ {300cs = append(cs,301SteadyConsumer{id: fmt.Sprintf("c%02d", i), rate: 10000},302)303}304node := NewNode(cs...)305dist := cpulimit.NewDistributor(node.Source, node.Sink, defaultLimit, defaultBreakoutLimit, totalCapacity)306307runSimulation(t, node, dist)308}309310func TestBucketLimitsNewProdBehaviour(t *testing.T) {311cs := defaultConsumerSet(t)312node := NewNode(cs...)313314dist := cpulimit.NewDistributor(node.Source, node.Sink, defaultLimit, defaultBreakoutLimit, totalCapacity)315316runSimulation(t, node, dist)317}318319func TestProdBehaviour(t *testing.T) {320node := NewNode(defaultConsumerSet(t)...)321limiter := cpulimit.BucketLimiter{322cpulimit.Bucket{Budget: 5 * 60 * 6000, Limit: 6000},323cpulimit.Bucket{Budget: 5 * 60 * 4000, Limit: 4000},324cpulimit.Bucket{Budget: 5 * 60 * 2000, Limit: 2000},325}326breakoutLimiter := limiter327dist := cpulimit.NewDistributor(node.Source, node.Sink, limiter, breakoutLimiter, totalCapacity)328329runSimulation(t, node, dist)330}331332func runSimulation(t *testing.T, node *Node, dist *cpulimit.Distributor) {333f, err := os.OpenFile(fmt.Sprintf("sim_%s.csv", t.Name()), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0744)334if err != nil {335t.Fatal(err)336}337defer f.Close()338node.DumpHeader(f)339340totalT := 0 * time.Second341for i := 0; i < testSampleCount; i++ {342node.Tick(totalT, testDt)343dbg, _ := dist.Tick(testDt)344node.Dump(f, totalT, dbg)345totalT += testDt346}347}348349func defaultConsumerSet(t *testing.T) []Consumer {350const fn = "default-consumer.json"351var res []Consumer352for i := 0; i < 15; i++ {353c := &SpikyConsumer{354Consumer: SteadyConsumer{id: fmt.Sprintf("c%02d", i), rate: 200},355MaxSpike: 6000,356MinSpike: 3000,357LikelyHood: 0.01,358MinLatch: 10 * time.Second,359MaxLatch: 5 * time.Minute,360}361res = append(res, c)362}363return generateOrRestoreConsumers(t, fn, res)364}365366func defaultQoSConsumerSet(t *testing.T) []Consumer {367const fn = "default-qos-consumer.json"368var res []Consumer369for i := 0; i < 5; i++ {370c := &SpikyConsumer{371Consumer: SteadyConsumer{id: fmt.Sprintf("q%02d", i), rate: 200, qos: -1},372MaxSpike: 6000,373MinSpike: 4000,374LikelyHood: 0.05,375MinLatch: 10 * time.Second,376MaxLatch: 5 * time.Minute,377}378res = append(res, c)379}380return generateOrRestoreConsumers(t, fn, res)381}382383func generateOrRestoreConsumers(t *testing.T, fn string, cs []Consumer) []Consumer {384fc, err := os.ReadFile(fn)385if errors.Is(err, fs.ErrNotExist) {386var (387rcs []*RecordedConsumer388res []Consumer389)390391for _, c := range cs {392rc := RecordConsumer(c, testDt, testDuration)393rcs = append(rcs, rc)394res = append(res, rc)395}396fc, _ := json.Marshal(rcs)397err = ioutil.WriteFile(fn, fc, 0644)398if err != nil {399t.Fatal(err)400}401return res402}403if err != nil {404t.Fatal(err)405}406407var rcs []*RecordedConsumer408err = json.Unmarshal(fc, &rcs)409if err != nil {410t.Fatal(err)411}412413res := make([]Consumer, len(rcs))414for i := range rcs {415res[i] = rcs[i]416}417418return res419}420421422