Path: blob/main/components/ws-daemon/pkg/cpulimit/dispatch.go
2500 views
// Copyright (c) 2020 Gitpod GmbH. All rights reserved.1// Licensed under the GNU Affero General Public License (AGPL).2// See License.AGPL.txt in the project root for license information.34package cpulimit56import (7"context"8"errors"9"os"10"path/filepath"11"sync"12"time"1314"github.com/prometheus/client_golang/prometheus"15"github.com/sirupsen/logrus"16"golang.org/x/xerrors"17"k8s.io/apimachinery/pkg/api/resource"1819"github.com/gitpod-io/gitpod/common-go/cgroups"20"github.com/gitpod-io/gitpod/common-go/kubernetes"21"github.com/gitpod-io/gitpod/common-go/log"22"github.com/gitpod-io/gitpod/common-go/util"23"github.com/gitpod-io/gitpod/ws-daemon/pkg/dispatch"24)2526// Config configures the containerd resource governer dispatch27type Config struct {28Enabled bool `json:"enabled"`29TotalBandwidth resource.Quantity `json:"totalBandwidth"`30Limit resource.Quantity `json:"limit"`31BurstLimit resource.Quantity `json:"burstLimit"`3233ControlPeriod util.Duration `json:"controlPeriod"`34CGroupBasePath string `json:"cgroupBasePath"`35}3637// NewDispatchListener creates a new resource governer dispatch listener38func NewDispatchListener(cfg *Config, prom prometheus.Registerer) *DispatchListener {39d := &DispatchListener{40Prometheus: prom,41Config: cfg,42workspaces: make(map[string]*workspace),4344workspacesAddedCounterVec: prometheus.NewCounterVec(prometheus.CounterOpts{45Name: "cpulimit_workspaces_added_total",46Help: "Number of workspaces added to CPU control",47}, []string{"qos"}),48workspacesRemovedCounterVec: prometheus.NewCounterVec(prometheus.CounterOpts{49Name: "cpulimit_workspaces_removed_total",50Help: "Number of workspaces removed from CPU control",51}, []string{"qos"}),52workspacesThrottledCounterVec: prometheus.NewCounterVec(prometheus.CounterOpts{53Name: "cpulimit_workspaces_throttled_total",54Help: "Number of workspaces which ran with throttled CPU",55}, []string{"qos"}),56workspacesBurstCounterVec: prometheus.NewCounterVec(prometheus.CounterOpts{57Name: "cpulimit_workspaces_burst_total",58Help: "Number of workspaces which received burst CPU limits",59}, []string{"qos"}),60workspacesCPUTimeVec: prometheus.NewGaugeVec(prometheus.GaugeOpts{61Name: "cpulimit_workspaces_cputime_seconds",62Help: "CPU time of all observed workspaces",63}, []string{"qos"}),64}6566if cfg.Enabled {67dist := NewDistributor(d.source, d.sink,68CompositeLimiter(AnnotationLimiter(kubernetes.WorkspaceCpuMinLimitAnnotation), FixedLimiter(BandwidthFromQuantity(d.Config.Limit))),69CompositeLimiter(AnnotationLimiter(kubernetes.WorkspaceCpuBurstLimitAnnotation), FixedLimiter(BandwidthFromQuantity(d.Config.BurstLimit))),70BandwidthFromQuantity(d.Config.TotalBandwidth),71)72go dist.Run(context.Background(), time.Duration(d.Config.ControlPeriod))73}7475prom.MustRegister(76d.workspacesAddedCounterVec,77d.workspacesRemovedCounterVec,78d.workspacesThrottledCounterVec,79d.workspacesBurstCounterVec,80d.workspacesCPUTimeVec,81)8283return d84}8586// DispatchListener starts new resource governer using the workspace dispatch87type DispatchListener struct {88Prometheus prometheus.Registerer89Config *Config9091workspaces map[string]*workspace92mu sync.RWMutex9394workspacesAddedCounterVec *prometheus.CounterVec95workspacesRemovedCounterVec *prometheus.CounterVec96workspacesThrottledCounterVec *prometheus.CounterVec97workspacesBurstCounterVec *prometheus.CounterVec98workspacesCPUTimeVec *prometheus.GaugeVec99}100101type workspace struct {102CFS CFSController103OWI logrus.Fields104HardLimit ResourceLimiter105Annotations map[string]string106107lastThrottled uint64108}109110func (d *DispatchListener) source(context.Context) ([]Workspace, error) {111d.mu.RLock()112defer d.mu.RUnlock()113114res := make([]Workspace, 0, len(d.workspaces))115d.workspacesCPUTimeVec.Reset()116for id, w := range d.workspaces {117usage, err := w.CFS.Usage()118if err != nil {119if !errors.Is(err, os.ErrNotExist) {120log.WithFields(w.OWI).WithError(err).Warn("cannot read CPU usage")121}122123continue124}125126throttled, err := w.CFS.NrThrottled()127if err != nil {128log.WithFields(w.OWI).WithError(err).Warn("cannot read times cgroup was throttled")129// we don't continue here, because worst case the cgroup will get too low a130// limit, but at least we'll keep maintaining the limit.131}132133if w.lastThrottled > 0 && w.lastThrottled != throttled {134d.workspacesThrottledCounterVec.WithLabelValues("none").Inc()135}136w.lastThrottled = throttled137138d.workspacesCPUTimeVec.WithLabelValues("none").Add(time.Duration(usage).Seconds())139140res = append(res, Workspace{141ID: id,142NrThrottled: throttled,143Usage: usage,144Annotations: w.Annotations,145})146}147return res, nil148}149150func (d *DispatchListener) sink(id string, limit Bandwidth, burst bool) {151d.mu.RLock()152defer d.mu.RUnlock()153154ws, ok := d.workspaces[id]155if !ok {156// this can happen if the workspace has gone away inbetween a distributor cycle157return158}159160d.workspacesBurstCounterVec.WithLabelValues("none").Inc()161162changed, err := ws.CFS.SetLimit(limit)163if err != nil && !errors.Is(err, os.ErrNotExist) {164log.WithError(err).WithFields(ws.OWI).Warn("cannot set CPU limit")165}166if changed {167log.WithFields(ws.OWI).WithField("limit", limit).Debug("applied new CPU limit")168}169}170171// WorkspaceAdded starts new governer172func (d *DispatchListener) WorkspaceAdded(ctx context.Context, ws *dispatch.Workspace) error {173d.mu.Lock()174defer d.mu.Unlock()175176disp := dispatch.GetFromContext(ctx)177if disp == nil {178return xerrors.Errorf("no dispatch available")179}180181cgroupPath, err := disp.Runtime.ContainerCGroupPath(ctx, ws.ContainerID)182if err != nil {183if errors.Is(err, context.Canceled) {184return nil185}186return xerrors.Errorf("cannot start governer: %w", err)187}188189controller, err := newCFSController(d.Config.CGroupBasePath, cgroupPath)190if err != nil {191return xerrors.Errorf("cannot start CFS controller: %w", err)192}193194d.workspaces[ws.InstanceID] = &workspace{195CFS: controller,196OWI: ws.OWI(),197Annotations: ws.Pod.Annotations,198}199200dispatch.GetDispatchWaitGroup(ctx).Add(1)201go func() {202defer dispatch.GetDispatchWaitGroup(ctx).Done()203204<-ctx.Done()205206d.mu.Lock()207defer d.mu.Unlock()208delete(d.workspaces, ws.InstanceID)209d.workspacesRemovedCounterVec.WithLabelValues("none").Inc()210}()211212d.workspacesAddedCounterVec.WithLabelValues("none").Inc()213214return nil215}216217// WorkspaceUpdated gets called when a workspace is updated218func (d *DispatchListener) WorkspaceUpdated(ctx context.Context, ws *dispatch.Workspace) error {219d.mu.Lock()220defer d.mu.Unlock()221222wsinfo, ok := d.workspaces[ws.InstanceID]223if !ok {224return xerrors.Errorf("received update for a workspace we haven't seen before: %s", ws.InstanceID)225}226227wsinfo.Annotations = ws.Pod.Annotations228return nil229}230231func newCFSController(basePath, cgroupPath string) (CFSController, error) {232unified, err := cgroups.IsUnifiedCgroupSetup()233if err != nil {234return nil, xerrors.Errorf("could not determine cgroup setup: %w", err)235}236237if unified {238fullPath := filepath.Join(basePath, cgroupPath)239if err := cgroups.EnsureCpuControllerEnabled(basePath, filepath.Join("/", cgroupPath)); err != nil {240return nil, xerrors.Errorf("could not check CPU controller is enabled: %w", err)241}242243return CgroupV2CFSController(fullPath), nil244} else {245return CgroupV1CFSController(filepath.Join(basePath, "cpu", cgroupPath)), nil246}247}248249250