Path: blob/main/components/ws-daemon/pkg/cpulimit/cpulimit.go
2500 views
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.1// Licensed under the GNU Affero General Public License (AGPL).2// See License.AGPL.txt in the project root for license information.34package cpulimit56import (7"context"8"sort"9"strings"10"time"1112"github.com/gitpod-io/gitpod/common-go/log"13"github.com/sirupsen/logrus"14"golang.org/x/xerrors"15"k8s.io/apimachinery/pkg/api/resource"16)1718type Workspace struct {19ID string2021NrThrottled uint6422Usage CPUTime23QoS int24Annotations map[string]string25}2627type WorkspaceHistory struct {28ID string2930LastUpdate *Workspace31UsageT0 CPUTime32ThrottleLag uint6433Limit Bandwidth34}3536func (h *WorkspaceHistory) Usage() CPUTime {37if h == nil || h.LastUpdate == nil {38return 039}40return h.LastUpdate.Usage - h.UsageT041}4243func (h *WorkspaceHistory) Update(w Workspace) {44if h.LastUpdate == nil {45h.UsageT0 = w.Usage46} else {47h.ThrottleLag = h.LastUpdate.NrThrottled48}49h.LastUpdate = &w50}5152func (h *WorkspaceHistory) Throttled() bool {53if h.LastUpdate == nil || h.ThrottleLag == 0 {54return false55}5657return h.ThrottleLag != h.LastUpdate.NrThrottled58}5960type DistributorSource func(context.Context) ([]Workspace, error)61type DistributorSink func(id string, limit Bandwidth, burst bool)6263func NewDistributor(source DistributorSource, sink DistributorSink, limiter ResourceLimiter, burstLimiter ResourceLimiter, totalBandwidth Bandwidth) *Distributor {64return &Distributor{65Source: source,66Sink: sink,67Limiter: limiter,68BurstLimiter: burstLimiter,69TotalBandwidth: totalBandwidth,70History: make(map[string]*WorkspaceHistory),71}72}7374type Distributor struct {75Source DistributorSource76Sink DistributorSink7778History map[string]*WorkspaceHistory79Limiter ResourceLimiter80BurstLimiter ResourceLimiter8182// TotalBandwidth is the total CPU time available in nanoseconds per second83TotalBandwidth Bandwidth84LastTickUsage CPUTime8586// Log is used (if not nil) to log out errors. If log is nil, no logging happens.87Log *logrus.Entry88}8990type DistributorDebug struct {91BandwidthAvail, BandwidthUsed, BandwidthBurst Bandwidth92}9394// Run starts a ticker which repeatedly calls Tick until the context is canceled.95// This function does not return until the context is canceled.96func (d *Distributor) Run(ctx context.Context, dt time.Duration) {97t := time.NewTicker(dt)98defer t.Stop()99100go func() {101for range t.C {102_, err := d.Tick(dt)103if err != nil && d.Log != nil {104d.Log.WithError(err).Warn("cannot advance CPU limit distributor")105}106}107}()108<-ctx.Done()109}110111// Tick drives the distributor and pushes out new limits.112// Callers are epxected to call this function repeatedly, with dt time inbetween calls.113func (d *Distributor) Tick(dt time.Duration) (DistributorDebug, error) {114// update state115ws, err := d.Source(context.Background())116if err != nil {117return DistributorDebug{}, err118}119120f := make(map[string]struct{}, len(ws))121for _, w := range ws {122h, ok := d.History[w.ID]123if !ok {124h = &WorkspaceHistory{125ID: w.ID,126}127d.History[w.ID] = h128}129h.Update(w)130f[w.ID] = struct{}{}131}132for oldWS := range d.History {133if _, found := f[oldWS]; !found {134delete(d.History, oldWS)135}136}137138var totalUsage CPUTime139wsOrder := make([]string, 0, len(d.History))140for id, h := range d.History {141wsOrder = append(wsOrder, id)142totalUsage += h.Usage()143}144145// We order workspaces by their QoS class first. Within the same class we order146// by usage: lowest usage -> highest priority147sort.Slice(wsOrder, func(i, j int) bool {148uI := d.History[wsOrder[i]].Usage()149uJ := d.History[wsOrder[j]].Usage()150qI := d.History[wsOrder[i]].LastUpdate.QoS151qJ := d.History[wsOrder[j]].LastUpdate.QoS152if qI == qJ {153return uI < uJ154}155return qI < qJ156})157158if d.LastTickUsage == 0 {159d.LastTickUsage = totalUsage160return DistributorDebug{161BandwidthAvail: d.TotalBandwidth,162BandwidthUsed: 0,163}, nil164}165166totalBandwidth, err := BandwithFromUsage(d.LastTickUsage, totalUsage, dt)167d.LastTickUsage = totalUsage168if err != nil {169return DistributorDebug{170BandwidthAvail: d.TotalBandwidth,171BandwidthUsed: 0,172}, err173}174175// enforce limits176var burstBandwidth Bandwidth177for _, id := range wsOrder {178ws := d.History[id]179limit, err := d.Limiter.Limit(ws)180if err != nil {181log.WithError(err).Errorf("unable to apply min limit")182continue183}184185// if we didn't get the max bandwidth, but were throttled last time186// and there's still some bandwidth left to give, let's act as if had187// never spent any CPU time and assume the workspace will spend their188// entire bandwidth at once.189var burst bool190if totalBandwidth < d.TotalBandwidth && ws.Throttled() {191limit, err = d.BurstLimiter.Limit(ws)192if err != nil {193log.WithError(err).Errorf("unable to apply burst limit")194continue195}196197// We assume the workspace is going to use as much as their limit allows.198// This might not be true, because their process which consumed so much CPU199// may have ended by now.200totalBandwidth += limit201202burstBandwidth += limit203}204205d.Sink(id, limit, burst)206}207208return DistributorDebug{209BandwidthAvail: d.TotalBandwidth,210BandwidthUsed: totalBandwidth,211BandwidthBurst: burstBandwidth,212}, nil213}214215func (d *Distributor) Reset() {216d.History = make(map[string]*WorkspaceHistory)217}218219// ResourceLimiter implements a strategy to limit the resurce use of a workspace220type ResourceLimiter interface {221Limit(wsh *WorkspaceHistory) (Bandwidth, error)222}223224var _ ResourceLimiter = (*fixedLimiter)(nil)225var _ ResourceLimiter = (*annotationLimiter)(nil)226var _ ResourceLimiter = (*BucketLimiter)(nil)227var _ ResourceLimiter = (*ClampingBucketLimiter)(nil)228var _ ResourceLimiter = (*compositeLimiter)(nil)229230// FixedLimiter returns a fixed limit231func FixedLimiter(limit Bandwidth) ResourceLimiter {232return fixedLimiter{limit}233}234235type fixedLimiter struct {236FixedLimit Bandwidth237}238239func (f fixedLimiter) Limit(wsh *WorkspaceHistory) (Bandwidth, error) {240return f.FixedLimit, nil241}242243func AnnotationLimiter(annotation string) ResourceLimiter {244return annotationLimiter{245Annotation: annotation,246}247}248249type annotationLimiter struct {250Annotation string251}252253func (a annotationLimiter) Limit(wsh *WorkspaceHistory) (Bandwidth, error) {254value, ok := wsh.LastUpdate.Annotations[a.Annotation]255if !ok {256return 0, xerrors.Errorf("no annotation named %s found on workspace %s", a.Annotation, wsh.ID)257}258259limit, err := resource.ParseQuantity(value)260if err != nil {261return 0, xerrors.Errorf("failed to parse %s for workspace %s", limit, wsh.ID)262}263264return BandwidthFromQuantity(limit), nil265}266267// Bucket describes a "pot of CPU time" which can be spent at a particular rate.268type Bucket struct {269Budget CPUTime `json:"budget"`270Limit Bandwidth `json:"limit"`271}272273// BucketLimiter limits CPU use based on different "pots of CPU time".274// The current limit is decided by the current bucket which is taken in order.275// For example:276//277// buckets = [ { Budget: 50, Limit: 20 }, { Budget: 20, Limit: 10 }, { Budget: 0, Limit: 5 } ]278// budgetSpent = totalBudget - budgetLeft == 65279// then the current limit is 10, because we have spent all our budget from bucket 0, and are currently280// spending from the second bucket.281//282// The last bucket's Budget is always ignored and becomes the default limit if all other283// buckets are used up.284// If the list of buckets is empty, this limiter limits to zero.285type BucketLimiter []Bucket286287// Limit limits spending based on the budget that's left288func (buckets BucketLimiter) Limit(wsh *WorkspaceHistory) (Bandwidth, error) {289budgetSpent := wsh.Usage()290291for i, bkt := range buckets {292if i+1 == len(buckets) {293// We've reached the last bucket - budget doesn't matter anymore294return bkt.Limit, nil295}296297budgetSpent -= bkt.Budget298if budgetSpent <= 0 {299// BudgetSpent value is in this bucket, hence we have found our current bucket300return bkt.Limit, nil301}302}303304// empty bucket list305return 0, nil306}307308// ClampingBucketLimiter is a stateful limiter that clamps the limit to the last bucket once that bucket is reached.309// Clamping happens until less budget has been used as permitted by that bucket.310type ClampingBucketLimiter struct {311Buckets []Bucket312lastBucketLock bool313}314315// Limit decides on a CPU use limit316func (bl *ClampingBucketLimiter) Limit(wsh *WorkspaceHistory) (Bandwidth, error) {317budgetSpent := wsh.Usage()318319if bl.lastBucketLock {320if budgetSpent < bl.Buckets[len(bl.Buckets)-1].Budget {321bl.lastBucketLock = false322}323}324if bl.lastBucketLock {325return bl.Buckets[len(bl.Buckets)-1].Limit, nil326}327328for i, bkt := range bl.Buckets {329if i+1 == len(bl.Buckets) {330// We've reached the last bucket - budget doesn't matter anymore331bl.lastBucketLock = true332return bkt.Limit, nil333}334335budgetSpent -= bkt.Budget336if budgetSpent <= 0 {337// BudgetSpent value is in this bucket, hence we have found our current bucket338return bkt.Limit, nil339}340}341342// empty bucket list343return 0, nil344}345346type compositeLimiter struct {347limiters []ResourceLimiter348}349350func CompositeLimiter(limiters ...ResourceLimiter) ResourceLimiter {351return &compositeLimiter{352limiters: limiters,353}354}355356func (cl *compositeLimiter) Limit(wsh *WorkspaceHistory) (Bandwidth, error) {357var errs []error358for _, limiter := range cl.limiters {359limit, err := limiter.Limit(wsh)360if err != nil {361errs = append(errs, err)362continue363}364365return limit, nil366}367368allerr := make([]string, len(errs))369for i, err := range errs {370allerr[i] = err.Error()371}372return 0, xerrors.Errorf("no limiter was able to provide a limit", strings.Join(allerr, ", "))373}374375type CFSController interface {376// Usage returns the cpuacct.usage value of the cgroup377Usage() (usage CPUTime, err error)378// SetQuota sets a new CFS quota on the cgroup379SetLimit(limit Bandwidth) (changed bool, err error)380// NrThrottled returns the number of CFS periods the cgroup was throttled381NrThrottled() (uint64, error)382}383384385