Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/ws-daemon/pkg/daemon/daemon.go
2501 views
1
// Copyright (c) 2020 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package daemon
6
7
import (
8
"context"
9
"fmt"
10
"os"
11
"time"
12
13
"github.com/prometheus/client_golang/prometheus"
14
"github.com/prometheus/client_golang/prometheus/collectors"
15
"golang.org/x/xerrors"
16
"k8s.io/apimachinery/pkg/runtime"
17
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
18
"k8s.io/client-go/kubernetes"
19
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
20
"k8s.io/client-go/rest"
21
"k8s.io/client-go/tools/clientcmd"
22
ctrl "sigs.k8s.io/controller-runtime"
23
"sigs.k8s.io/controller-runtime/pkg/cache"
24
"sigs.k8s.io/controller-runtime/pkg/manager"
25
"sigs.k8s.io/controller-runtime/pkg/metrics"
26
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
27
"sigs.k8s.io/controller-runtime/pkg/webhook"
28
29
"github.com/gitpod-io/gitpod/common-go/log"
30
"github.com/gitpod-io/gitpod/ws-daemon/pkg/cgroup"
31
"github.com/gitpod-io/gitpod/ws-daemon/pkg/container"
32
"github.com/gitpod-io/gitpod/ws-daemon/pkg/content"
33
"github.com/gitpod-io/gitpod/ws-daemon/pkg/controller"
34
"github.com/gitpod-io/gitpod/ws-daemon/pkg/cpulimit"
35
"github.com/gitpod-io/gitpod/ws-daemon/pkg/diskguard"
36
"github.com/gitpod-io/gitpod/ws-daemon/pkg/dispatch"
37
"github.com/gitpod-io/gitpod/ws-daemon/pkg/iws"
38
"github.com/gitpod-io/gitpod/ws-daemon/pkg/netlimit"
39
"github.com/gitpod-io/gitpod/ws-daemon/pkg/quota"
40
workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1"
41
)
42
43
var (
44
scheme = runtime.NewScheme()
45
)
46
47
func init() {
48
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
49
utilruntime.Must(workspacev1.AddToScheme(scheme))
50
}
51
52
// NewDaemon produces a new daemon
53
func NewDaemon(config Config) (*Daemon, error) {
54
// Use the metrics registry from the controller manager. The manager's registry
55
// isn't configurable so we use this instead of the baseserver's default registry.
56
// Hack: cast the registry as a *prometheus.Registry, as that's the type required
57
// by baseserver.
58
registry, ok := metrics.Registry.(*prometheus.Registry)
59
if ok {
60
// These collectors are also registered by baseserver. Use the ones from baseserver
61
// and remove the collectors registered by controller-manager, to prevent an error
62
// for duplicate collectors.
63
registry.Unregister(collectors.NewGoCollector())
64
registry.Unregister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
65
} else {
66
log.Error("failed to use controller-runtime metrics registry, not of expected type. Using default registry instead, but will not collect controller metrics...")
67
registry = prometheus.NewRegistry()
68
}
69
wrappedReg := prometheus.WrapRegistererWithPrefix("gitpod_ws_daemon_", registry)
70
71
restCfg, err := newClientConfig(config.Runtime.Kubeconfig)
72
if err != nil {
73
return nil, err
74
}
75
clientset, err := kubernetes.NewForConfig(restCfg)
76
if err != nil {
77
return nil, err
78
}
79
80
containerRuntime, err := container.FromConfig(config.Runtime.Container)
81
if err != nil {
82
return nil, err
83
}
84
if containerRuntime == nil {
85
return nil, xerrors.Errorf("no container runtime configured")
86
}
87
88
nodename := os.Getenv("NODENAME")
89
if nodename == "" {
90
return nil, xerrors.Errorf("NODENAME env var isn't set")
91
}
92
93
markUnmountFallback, err := NewMarkUnmountFallback(wrappedReg)
94
if err != nil {
95
return nil, err
96
}
97
98
cgroupV2IOLimiter, err := cgroup.NewIOLimiterV2(config.IOLimit.WriteBWPerSecond.Value(), config.IOLimit.ReadBWPerSecond.Value(), config.IOLimit.WriteIOPS, config.IOLimit.ReadIOPS)
99
if err != nil {
100
return nil, err
101
}
102
103
procV2Plugin, err := cgroup.NewProcLimiterV2(config.ProcLimit)
104
if err != nil {
105
return nil, err
106
}
107
108
cgroupPlugins, err := cgroup.NewPluginHost(config.CPULimit.CGroupBasePath,
109
&cgroup.FuseDeviceEnablerV2{},
110
cgroupV2IOLimiter,
111
&cgroup.ProcessPriorityV2{
112
ProcessPriorities: map[cgroup.ProcessType]int{
113
cgroup.ProcessWorkspaceKit: -10,
114
cgroup.ProcessSupervisor: -10,
115
116
cgroup.ProcessIDE: -10,
117
cgroup.ProcessWebIDEHelper: -5,
118
119
cgroup.ProcessCodeServer: -10,
120
cgroup.ProcessCodeServerHelper: -5,
121
122
cgroup.ProcessJetBrainsIDE: -10,
123
},
124
EnableOOMScoreAdj: config.OOMScores.Enabled,
125
OOMScoreAdj: map[cgroup.ProcessType]int{
126
cgroup.ProcessWorkspaceKit: config.OOMScores.Tier1,
127
cgroup.ProcessSupervisor: config.OOMScores.Tier1,
128
cgroup.ProcessCodeServer: config.OOMScores.Tier1,
129
cgroup.ProcessIDE: config.OOMScores.Tier1,
130
cgroup.ProcessJetBrainsIDE: config.OOMScores.Tier1,
131
cgroup.ProcessCodeServerHelper: config.OOMScores.Tier2,
132
cgroup.ProcessWebIDEHelper: config.OOMScores.Tier2,
133
},
134
},
135
procV2Plugin,
136
cgroup.NewPSIMetrics(wrappedReg),
137
)
138
if err != nil {
139
return nil, err
140
}
141
142
if cgroupPlugins.CGroupVersion != cgroup.Version2 {
143
return nil, xerrors.Errorf("only cgroup v2 is supported")
144
}
145
146
err = wrappedReg.Register(cgroupPlugins)
147
if err != nil {
148
return nil, xerrors.Errorf("cannot register cgroup plugin metrics: %w", err)
149
}
150
151
listener := []dispatch.Listener{
152
cpulimit.NewDispatchListener(&config.CPULimit, wrappedReg),
153
markUnmountFallback,
154
cgroupPlugins,
155
}
156
157
netlimiter := netlimit.NewConnLimiter(config.NetLimit, wrappedReg)
158
if config.NetLimit.Enabled {
159
listener = append(listener, netlimiter)
160
}
161
162
var configReloader CompositeConfigReloader
163
configReloader = append(configReloader, ConfigReloaderFunc(func(ctx context.Context, config *Config) error {
164
cgroupV2IOLimiter.Update(config.IOLimit.WriteBWPerSecond.Value(), config.IOLimit.ReadBWPerSecond.Value(), config.IOLimit.WriteIOPS, config.IOLimit.ReadIOPS)
165
procV2Plugin.Update(config.ProcLimit)
166
if config.NetLimit.Enabled {
167
netlimiter.Update(config.NetLimit)
168
}
169
return nil
170
}))
171
172
var mgr manager.Manager
173
174
mgr, err = ctrl.NewManager(restCfg, ctrl.Options{
175
Scheme: scheme,
176
HealthProbeBindAddress: "0",
177
Metrics: metricsserver.Options{
178
// Disable the metrics server.
179
// We only need access to the reconciliation loop feature.
180
BindAddress: "0",
181
},
182
Cache: cache.Options{
183
DefaultNamespaces: map[string]cache.Config{
184
config.Runtime.KubernetesNamespace: {},
185
config.Runtime.SecretsNamespace: {},
186
},
187
},
188
WebhookServer: webhook.NewServer(webhook.Options{
189
Port: 9443,
190
}),
191
})
192
if err != nil {
193
return nil, err
194
}
195
196
contentCfg := config.Content
197
198
xfs, err := quota.NewXFS(contentCfg.WorkingArea)
199
if err != nil {
200
return nil, err
201
}
202
203
hooks := content.WorkspaceLifecycleHooks(
204
contentCfg,
205
config.Runtime.WorkspaceCIDR,
206
&iws.Uidmapper{Config: config.Uidmapper, Runtime: containerRuntime},
207
xfs,
208
config.CPULimit.CGroupBasePath,
209
)
210
211
dsptch, err := dispatch.NewDispatch(containerRuntime, clientset, config.Runtime.KubernetesNamespace, nodename, listener...)
212
if err != nil {
213
return nil, err
214
}
215
216
workspaceOps, err := controller.NewWorkspaceOperations(contentCfg, controller.NewWorkspaceProvider(contentCfg.WorkingArea, hooks), wrappedReg, dsptch)
217
if err != nil {
218
return nil, err
219
}
220
221
wsctrl, err := controller.NewWorkspaceController(
222
mgr.GetClient(), mgr.GetEventRecorderFor("workspace"), nodename, config.Runtime.SecretsNamespace, config.WorkspaceController.MaxConcurrentReconciles, workspaceOps, wrappedReg, containerRuntime)
223
if err != nil {
224
return nil, err
225
}
226
err = wsctrl.SetupWithManager(mgr)
227
if err != nil {
228
return nil, err
229
}
230
231
ssctrl := controller.NewSnapshotController(
232
mgr.GetClient(), mgr.GetEventRecorderFor("snapshot"), nodename, config.WorkspaceController.MaxConcurrentReconciles, workspaceOps)
233
err = ssctrl.SetupWithManager(mgr)
234
if err != nil {
235
return nil, err
236
}
237
238
housekeeping := controller.NewHousekeeping(contentCfg.WorkingArea, 5*time.Minute)
239
go housekeeping.Start(context.Background())
240
241
dsk := diskguard.FromConfig(config.DiskSpaceGuard, clientset, nodename)
242
243
return &Daemon{
244
Config: config,
245
dispatch: dsptch,
246
diskGuards: dsk,
247
configReloader: configReloader,
248
mgr: mgr,
249
metricsRegistry: registry,
250
}, nil
251
}
252
253
func newClientConfig(kubeconfig string) (*rest.Config, error) {
254
if kubeconfig != "" {
255
return clientcmd.BuildConfigFromFlags("", kubeconfig)
256
}
257
258
return rest.InClusterConfig()
259
}
260
261
// Daemon connects all the individual bits and bobs that make up the workspace daemon
262
type Daemon struct {
263
Config Config
264
265
dispatch *dispatch.Dispatch
266
diskGuards []*diskguard.Guard
267
configReloader ConfigReloader
268
mgr ctrl.Manager
269
metricsRegistry *prometheus.Registry
270
271
cancel context.CancelFunc
272
}
273
274
func (d *Daemon) ReloadConfig(ctx context.Context, cfg *Config) error {
275
return d.configReloader.ReloadConfig(ctx, cfg)
276
}
277
278
// Start runs all parts of the daemon until stop is called
279
func (d *Daemon) Start() error {
280
err := d.dispatch.Start()
281
if err != nil {
282
return xerrors.Errorf("cannot start dispatch: %w", err)
283
}
284
285
for _, dsk := range d.diskGuards {
286
go dsk.Start()
287
}
288
289
var ctx context.Context
290
ctx, d.cancel = context.WithCancel(context.Background())
291
292
go func() {
293
err := d.mgr.Start(ctx)
294
if err != nil {
295
log.WithError(err).Fatal("cannot start controller")
296
}
297
}()
298
299
return nil
300
}
301
302
// Stop gracefully shuts down the daemon. Once stopped, it
303
// cannot be started again.
304
func (d *Daemon) Stop() error {
305
d.cancel()
306
307
var errs []error
308
errs = append(errs, d.dispatch.Close())
309
for _, err := range errs {
310
if err != nil {
311
return err
312
}
313
}
314
315
return nil
316
}
317
318
func (d *Daemon) ReadinessProbe() func() error {
319
return func() error {
320
// use 2 second timeout to ensure that IsContainerdReady() will not block indefinetely
321
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(2*time.Second))
322
defer cancel()
323
isContainerdReady, err := d.dispatch.Runtime.IsContainerdReady(ctx)
324
if err != nil {
325
log.WithError(err).Errorf("readiness probe failure: containerd error")
326
return fmt.Errorf("containerd error: %v", err)
327
}
328
329
if !isContainerdReady {
330
err := fmt.Errorf("containerd is not ready")
331
log.WithError(err).Error("readiness probe failure")
332
return err
333
}
334
335
return nil
336
}
337
}
338
339
func (d *Daemon) MetricsRegistry() *prometheus.Registry {
340
return d.metricsRegistry
341
}
342
343