Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/ws-daemon/pkg/cpulimit/dispatch.go
2500 views
1
// Copyright (c) 2020 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package cpulimit
6
7
import (
8
"context"
9
"errors"
10
"os"
11
"path/filepath"
12
"sync"
13
"time"
14
15
"github.com/prometheus/client_golang/prometheus"
16
"github.com/sirupsen/logrus"
17
"golang.org/x/xerrors"
18
"k8s.io/apimachinery/pkg/api/resource"
19
20
"github.com/gitpod-io/gitpod/common-go/cgroups"
21
"github.com/gitpod-io/gitpod/common-go/kubernetes"
22
"github.com/gitpod-io/gitpod/common-go/log"
23
"github.com/gitpod-io/gitpod/common-go/util"
24
"github.com/gitpod-io/gitpod/ws-daemon/pkg/dispatch"
25
)
26
27
// Config configures the containerd resource governer dispatch
28
type Config struct {
29
Enabled bool `json:"enabled"`
30
TotalBandwidth resource.Quantity `json:"totalBandwidth"`
31
Limit resource.Quantity `json:"limit"`
32
BurstLimit resource.Quantity `json:"burstLimit"`
33
34
ControlPeriod util.Duration `json:"controlPeriod"`
35
CGroupBasePath string `json:"cgroupBasePath"`
36
}
37
38
// NewDispatchListener creates a new resource governer dispatch listener
39
func NewDispatchListener(cfg *Config, prom prometheus.Registerer) *DispatchListener {
40
d := &DispatchListener{
41
Prometheus: prom,
42
Config: cfg,
43
workspaces: make(map[string]*workspace),
44
45
workspacesAddedCounterVec: prometheus.NewCounterVec(prometheus.CounterOpts{
46
Name: "cpulimit_workspaces_added_total",
47
Help: "Number of workspaces added to CPU control",
48
}, []string{"qos"}),
49
workspacesRemovedCounterVec: prometheus.NewCounterVec(prometheus.CounterOpts{
50
Name: "cpulimit_workspaces_removed_total",
51
Help: "Number of workspaces removed from CPU control",
52
}, []string{"qos"}),
53
workspacesThrottledCounterVec: prometheus.NewCounterVec(prometheus.CounterOpts{
54
Name: "cpulimit_workspaces_throttled_total",
55
Help: "Number of workspaces which ran with throttled CPU",
56
}, []string{"qos"}),
57
workspacesBurstCounterVec: prometheus.NewCounterVec(prometheus.CounterOpts{
58
Name: "cpulimit_workspaces_burst_total",
59
Help: "Number of workspaces which received burst CPU limits",
60
}, []string{"qos"}),
61
workspacesCPUTimeVec: prometheus.NewGaugeVec(prometheus.GaugeOpts{
62
Name: "cpulimit_workspaces_cputime_seconds",
63
Help: "CPU time of all observed workspaces",
64
}, []string{"qos"}),
65
}
66
67
if cfg.Enabled {
68
dist := NewDistributor(d.source, d.sink,
69
CompositeLimiter(AnnotationLimiter(kubernetes.WorkspaceCpuMinLimitAnnotation), FixedLimiter(BandwidthFromQuantity(d.Config.Limit))),
70
CompositeLimiter(AnnotationLimiter(kubernetes.WorkspaceCpuBurstLimitAnnotation), FixedLimiter(BandwidthFromQuantity(d.Config.BurstLimit))),
71
BandwidthFromQuantity(d.Config.TotalBandwidth),
72
)
73
go dist.Run(context.Background(), time.Duration(d.Config.ControlPeriod))
74
}
75
76
prom.MustRegister(
77
d.workspacesAddedCounterVec,
78
d.workspacesRemovedCounterVec,
79
d.workspacesThrottledCounterVec,
80
d.workspacesBurstCounterVec,
81
d.workspacesCPUTimeVec,
82
)
83
84
return d
85
}
86
87
// DispatchListener starts new resource governer using the workspace dispatch
88
type DispatchListener struct {
89
Prometheus prometheus.Registerer
90
Config *Config
91
92
workspaces map[string]*workspace
93
mu sync.RWMutex
94
95
workspacesAddedCounterVec *prometheus.CounterVec
96
workspacesRemovedCounterVec *prometheus.CounterVec
97
workspacesThrottledCounterVec *prometheus.CounterVec
98
workspacesBurstCounterVec *prometheus.CounterVec
99
workspacesCPUTimeVec *prometheus.GaugeVec
100
}
101
102
type workspace struct {
103
CFS CFSController
104
OWI logrus.Fields
105
HardLimit ResourceLimiter
106
Annotations map[string]string
107
108
lastThrottled uint64
109
}
110
111
func (d *DispatchListener) source(context.Context) ([]Workspace, error) {
112
d.mu.RLock()
113
defer d.mu.RUnlock()
114
115
res := make([]Workspace, 0, len(d.workspaces))
116
d.workspacesCPUTimeVec.Reset()
117
for id, w := range d.workspaces {
118
usage, err := w.CFS.Usage()
119
if err != nil {
120
if !errors.Is(err, os.ErrNotExist) {
121
log.WithFields(w.OWI).WithError(err).Warn("cannot read CPU usage")
122
}
123
124
continue
125
}
126
127
throttled, err := w.CFS.NrThrottled()
128
if err != nil {
129
log.WithFields(w.OWI).WithError(err).Warn("cannot read times cgroup was throttled")
130
// we don't continue here, because worst case the cgroup will get too low a
131
// limit, but at least we'll keep maintaining the limit.
132
}
133
134
if w.lastThrottled > 0 && w.lastThrottled != throttled {
135
d.workspacesThrottledCounterVec.WithLabelValues("none").Inc()
136
}
137
w.lastThrottled = throttled
138
139
d.workspacesCPUTimeVec.WithLabelValues("none").Add(time.Duration(usage).Seconds())
140
141
res = append(res, Workspace{
142
ID: id,
143
NrThrottled: throttled,
144
Usage: usage,
145
Annotations: w.Annotations,
146
})
147
}
148
return res, nil
149
}
150
151
func (d *DispatchListener) sink(id string, limit Bandwidth, burst bool) {
152
d.mu.RLock()
153
defer d.mu.RUnlock()
154
155
ws, ok := d.workspaces[id]
156
if !ok {
157
// this can happen if the workspace has gone away inbetween a distributor cycle
158
return
159
}
160
161
d.workspacesBurstCounterVec.WithLabelValues("none").Inc()
162
163
changed, err := ws.CFS.SetLimit(limit)
164
if err != nil && !errors.Is(err, os.ErrNotExist) {
165
log.WithError(err).WithFields(ws.OWI).Warn("cannot set CPU limit")
166
}
167
if changed {
168
log.WithFields(ws.OWI).WithField("limit", limit).Debug("applied new CPU limit")
169
}
170
}
171
172
// WorkspaceAdded starts new governer
173
func (d *DispatchListener) WorkspaceAdded(ctx context.Context, ws *dispatch.Workspace) error {
174
d.mu.Lock()
175
defer d.mu.Unlock()
176
177
disp := dispatch.GetFromContext(ctx)
178
if disp == nil {
179
return xerrors.Errorf("no dispatch available")
180
}
181
182
cgroupPath, err := disp.Runtime.ContainerCGroupPath(ctx, ws.ContainerID)
183
if err != nil {
184
if errors.Is(err, context.Canceled) {
185
return nil
186
}
187
return xerrors.Errorf("cannot start governer: %w", err)
188
}
189
190
controller, err := newCFSController(d.Config.CGroupBasePath, cgroupPath)
191
if err != nil {
192
return xerrors.Errorf("cannot start CFS controller: %w", err)
193
}
194
195
d.workspaces[ws.InstanceID] = &workspace{
196
CFS: controller,
197
OWI: ws.OWI(),
198
Annotations: ws.Pod.Annotations,
199
}
200
201
dispatch.GetDispatchWaitGroup(ctx).Add(1)
202
go func() {
203
defer dispatch.GetDispatchWaitGroup(ctx).Done()
204
205
<-ctx.Done()
206
207
d.mu.Lock()
208
defer d.mu.Unlock()
209
delete(d.workspaces, ws.InstanceID)
210
d.workspacesRemovedCounterVec.WithLabelValues("none").Inc()
211
}()
212
213
d.workspacesAddedCounterVec.WithLabelValues("none").Inc()
214
215
return nil
216
}
217
218
// WorkspaceUpdated gets called when a workspace is updated
219
func (d *DispatchListener) WorkspaceUpdated(ctx context.Context, ws *dispatch.Workspace) error {
220
d.mu.Lock()
221
defer d.mu.Unlock()
222
223
wsinfo, ok := d.workspaces[ws.InstanceID]
224
if !ok {
225
return xerrors.Errorf("received update for a workspace we haven't seen before: %s", ws.InstanceID)
226
}
227
228
wsinfo.Annotations = ws.Pod.Annotations
229
return nil
230
}
231
232
func newCFSController(basePath, cgroupPath string) (CFSController, error) {
233
unified, err := cgroups.IsUnifiedCgroupSetup()
234
if err != nil {
235
return nil, xerrors.Errorf("could not determine cgroup setup: %w", err)
236
}
237
238
if unified {
239
fullPath := filepath.Join(basePath, cgroupPath)
240
if err := cgroups.EnsureCpuControllerEnabled(basePath, filepath.Join("/", cgroupPath)); err != nil {
241
return nil, xerrors.Errorf("could not check CPU controller is enabled: %w", err)
242
}
243
244
return CgroupV2CFSController(fullPath), nil
245
} else {
246
return CgroupV1CFSController(filepath.Join(basePath, "cpu", cgroupPath)), nil
247
}
248
}
249
250