Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/node-labeler/cmd/run.go
2498 views
1
// Copyright (c) 2023 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package cmd
6
7
import (
8
"context"
9
"crypto/tls"
10
"fmt"
11
"net"
12
"net/http"
13
"strconv"
14
"strings"
15
"time"
16
17
"github.com/bombsimon/logrusr/v2"
18
workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1"
19
"github.com/spf13/cobra"
20
corev1 "k8s.io/api/core/v1"
21
"k8s.io/apimachinery/pkg/api/errors"
22
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
23
"k8s.io/apimachinery/pkg/runtime"
24
"k8s.io/apimachinery/pkg/types"
25
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
26
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
27
_ "k8s.io/client-go/plugin/pkg/client/auth"
28
"k8s.io/client-go/util/retry"
29
"k8s.io/utils/ptr"
30
ctrl "sigs.k8s.io/controller-runtime"
31
"sigs.k8s.io/controller-runtime/pkg/builder"
32
"sigs.k8s.io/controller-runtime/pkg/cache"
33
"sigs.k8s.io/controller-runtime/pkg/client"
34
"sigs.k8s.io/controller-runtime/pkg/controller"
35
"sigs.k8s.io/controller-runtime/pkg/event"
36
"sigs.k8s.io/controller-runtime/pkg/healthz"
37
"sigs.k8s.io/controller-runtime/pkg/manager"
38
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
39
"sigs.k8s.io/controller-runtime/pkg/predicate"
40
"sigs.k8s.io/controller-runtime/pkg/reconcile"
41
"sigs.k8s.io/controller-runtime/pkg/webhook"
42
43
"github.com/gitpod-io/gitpod/common-go/log"
44
)
45
46
const (
47
registryFacadeLabel = "gitpod.io/registry-facade_ready_ns_%v"
48
wsdaemonLabel = "gitpod.io/ws-daemon_ready_ns_%v"
49
50
registryFacade = "registry-facade"
51
wsDaemon = "ws-daemon"
52
53
// Taint keys for different components
54
registryFacadeTaintKey = "gitpod.io/registry-facade-not-ready"
55
wsDaemonTaintKey = "gitpod.io/ws-daemon-not-ready"
56
57
workspacesRegularLabel = "gitpod.io/workload_workspace_regular"
58
workspacesHeadlessLabel = "gitpod.io/workload_workspace_headless"
59
)
60
61
var defaultRequeueTime = time.Second * 10
62
63
// serveCmd represents the serve command
64
var runCmd = &cobra.Command{
65
Use: "run",
66
Short: "Starts the node labeler",
67
Run: func(cmd *cobra.Command, args []string) {
68
ctrl.SetLogger(logrusr.New(log.Log))
69
70
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
71
Scheme: scheme,
72
HealthProbeBindAddress: ":8086",
73
Metrics: metricsserver.Options{BindAddress: "127.0.0.1:9500"},
74
Cache: cache.Options{
75
DefaultNamespaces: map[string]cache.Config{
76
namespace: {},
77
},
78
// default sync period is 10h.
79
// in case node-labeler is restarted and not change happens, we could waste (at least) 20m in a node
80
// that never will run workspaces and the additional nodes cluster-autoscaler adds to compensate
81
SyncPeriod: ptr.To(time.Duration(2 * time.Minute)),
82
},
83
WebhookServer: webhook.NewServer(webhook.Options{
84
Port: 9443,
85
}),
86
LeaderElection: true,
87
LeaderElectionID: "node-labeler.gitpod.io",
88
})
89
if err != nil {
90
log.WithError(err).Fatal("unable to start node-labeler")
91
}
92
93
r := &PodReconciler{
94
mgr.GetClient(),
95
}
96
97
componentPredicate, err := predicate.LabelSelectorPredicate(metav1.LabelSelector{
98
MatchExpressions: []metav1.LabelSelectorRequirement{{
99
Key: "component",
100
Operator: metav1.LabelSelectorOpIn,
101
Values: []string{"ws-daemon", "registry-facade"},
102
}},
103
})
104
if err != nil {
105
log.WithError(err).Fatal("unable to create predicate")
106
}
107
108
err = ctrl.NewControllerManagedBy(mgr).
109
Named("pod-watcher").
110
For(&corev1.Pod{}, builder.WithPredicates(predicate.Or(componentPredicate))).
111
WithOptions(controller.Options{MaxConcurrentReconciles: 1}).
112
Complete(r)
113
if err != nil {
114
log.WithError(err).Fatal("unable to bind controller watch event handler")
115
}
116
nr := &NodeReconciler{
117
mgr.GetClient(),
118
}
119
120
err = ctrl.NewControllerManagedBy(mgr).
121
Named("node-watcher").
122
For(&corev1.Node{}, builder.WithPredicates(predicate.Or(nr.nodeFilter()))).
123
WithOptions(controller.Options{MaxConcurrentReconciles: 1}).
124
Complete(nr)
125
if err != nil {
126
log.WithError(err).Fatal("unable to bind controller watch event handler")
127
}
128
129
go func() {
130
<-mgr.Elected()
131
if err := nr.reconcileAll(context.Background()); err != nil {
132
log.WithError(err).Fatal("failed to reconcile all nodes")
133
}
134
}()
135
136
if err := mgr.GetFieldIndexer().IndexField(context.Background(), &workspacev1.Workspace{}, "status.runtime.nodeName", func(o client.Object) []string {
137
ws := o.(*workspacev1.Workspace)
138
if ws.Status.Runtime == nil {
139
return nil
140
}
141
return []string{ws.Status.Runtime.NodeName}
142
}); err != nil {
143
log.WithError(err).Fatal("unable to create workspace indexer")
144
return
145
}
146
147
if err := mgr.GetFieldIndexer().IndexField(context.Background(), &corev1.Pod{}, "spec.nodeName", func(o client.Object) []string {
148
pod := o.(*corev1.Pod)
149
if pod.Spec.NodeName == "" {
150
return nil
151
}
152
return []string{pod.Spec.NodeName}
153
}); err != nil {
154
log.WithError(err).Fatal("unable to create pod indexer")
155
return
156
}
157
158
nsac, err := NewNodeScaledownAnnotationController(mgr.GetClient())
159
if err != nil {
160
log.WithError(err).Fatal("unable to create node scaledown annotation controller")
161
}
162
err = nsac.SetupWithManager(mgr)
163
if err != nil {
164
log.WithError(err).Fatal("unable to bind node scaledown annotation controller")
165
}
166
167
err = mgr.Add(manager.RunnableFunc(func(ctx context.Context) error {
168
<-ctx.Done()
169
log.Info("Received shutdown signal - stopping NodeScaledownAnnotationController")
170
nsac.Stop()
171
return nil
172
}))
173
if err != nil {
174
log.WithError(err).Fatal("couldn't properly clean up node scaledown annotation controller")
175
}
176
err = mgr.AddHealthzCheck("healthz", healthz.Ping)
177
if err != nil {
178
log.WithError(err).Fatal("unable to set up health check")
179
}
180
181
err = mgr.AddReadyzCheck("readyz", healthz.Ping)
182
if err != nil {
183
log.WithError(err).Fatal("unable to set up ready check")
184
}
185
186
log.Info("starting node-labeler")
187
err = mgr.Start(ctrl.SetupSignalHandler())
188
if err != nil {
189
log.WithError(err).Fatal("problem running node-labeler")
190
}
191
192
log.Info("Received SIGINT - shutting down")
193
},
194
}
195
196
func init() {
197
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
198
utilruntime.Must(workspacev1.AddToScheme(scheme))
199
200
rootCmd.AddCommand(runCmd)
201
}
202
203
var (
204
scheme = runtime.NewScheme()
205
)
206
207
type PodReconciler struct {
208
client.Client
209
}
210
211
func (r *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
212
var pod corev1.Pod
213
err := r.Get(ctx, req.NamespacedName, &pod)
214
if err != nil {
215
if !errors.IsNotFound(err) {
216
log.WithError(err).Error("unable to fetch pod")
217
}
218
219
return ctrl.Result{}, client.IgnoreNotFound(err)
220
}
221
222
nodeName := pod.Spec.NodeName
223
if nodeName == "" {
224
return reconcile.Result{RequeueAfter: defaultRequeueTime}, nil
225
}
226
227
var taintKey string
228
switch {
229
case strings.HasPrefix(pod.Name, registryFacade):
230
taintKey = registryFacadeTaintKey
231
case strings.HasPrefix(pod.Name, wsDaemon):
232
taintKey = wsDaemonTaintKey
233
default:
234
// nothing to do
235
return reconcile.Result{}, nil
236
}
237
238
healthy, err := checkPodHealth(pod)
239
if err != nil {
240
log.WithError(err).Error("cannot check pod health")
241
return reconcile.Result{RequeueAfter: defaultRequeueTime}, nil
242
}
243
244
var node corev1.Node
245
err = r.Get(ctx, types.NamespacedName{Name: nodeName}, &node)
246
if err != nil {
247
if !errors.IsNotFound(err) {
248
log.WithError(err).Error("cannot get node")
249
}
250
return reconcile.Result{}, client.IgnoreNotFound(err)
251
}
252
253
if isNodeTaintExists(taintKey, node) != healthy {
254
// nothing to do, the taint already exists and is in the desired state.
255
return reconcile.Result{}, nil
256
}
257
258
err = updateNodeTaint(taintKey, !healthy, nodeName, r)
259
if err != nil {
260
log.WithError(err).
261
WithField("taintKey", taintKey).
262
WithField("add", !healthy).
263
WithField("nodeName", nodeName).
264
Error("cannot update node taint")
265
return reconcile.Result{RequeueAfter: defaultRequeueTime}, nil
266
}
267
268
return reconcile.Result{}, nil
269
}
270
271
func checkPodHealth(pod corev1.Pod) (bool, error) {
272
var (
273
ipAddress string
274
port string
275
)
276
switch {
277
case strings.HasPrefix(pod.Name, registryFacade):
278
ipAddress = pod.Status.HostIP
279
port = strconv.Itoa(registryFacadePort)
280
case strings.HasPrefix(pod.Name, wsDaemon):
281
ipAddress = pod.Status.PodIP
282
port = strconv.Itoa(wsdaemonPort)
283
default:
284
// nothing to do
285
return true, nil
286
}
287
288
if !pod.ObjectMeta.DeletionTimestamp.IsZero() {
289
// the pod is being removed.
290
// add the taint to the node
291
return false, nil
292
}
293
294
if !IsPodReady(pod) {
295
// not ready. Wait until the next update.
296
return false, nil
297
}
298
299
err := checkTCPPortIsReachable(ipAddress, port)
300
if err != nil {
301
log.WithField("host", ipAddress).WithField("port", port).WithField("pod", pod.Name).WithError(err).Error("checking if TCP port is open")
302
return false, nil
303
}
304
305
if strings.HasPrefix(pod.Name, registryFacade) {
306
err = checkRegistryFacade(ipAddress, port)
307
if err != nil {
308
log.WithError(err).Error("checking registry-facade")
309
return false, nil
310
}
311
}
312
313
return true, nil
314
}
315
316
type NodeReconciler struct {
317
client.Client
318
}
319
320
func (r *NodeReconciler) nodeFilter() predicate.Predicate {
321
return predicate.Funcs{
322
CreateFunc: func(e event.CreateEvent) bool {
323
node, ok := e.Object.(*corev1.Node)
324
if !ok {
325
return false
326
}
327
return isWorkspaceNode(*node)
328
},
329
UpdateFunc: func(e event.UpdateEvent) bool {
330
return false
331
},
332
DeleteFunc: func(e event.DeleteEvent) bool {
333
return false
334
},
335
}
336
}
337
338
func (r *NodeReconciler) reconcileAll(ctx context.Context) error {
339
log.Info("start reconciling all nodes")
340
341
var nodes corev1.NodeList
342
if err := r.List(ctx, &nodes); err != nil {
343
return fmt.Errorf("failed to list nodes: %w", err)
344
}
345
346
for _, node := range nodes.Items {
347
if node.Labels == nil {
348
continue
349
}
350
if !isWorkspaceNode(node) {
351
continue
352
}
353
354
err := updateNodeLabel(node.Name, r.Client)
355
if err != nil {
356
log.WithError(err).WithField("node", node.Name).Error("failed to initialize labels on node")
357
}
358
r.Reconcile(ctx, reconcile.Request{NamespacedName: types.NamespacedName{Name: node.Name}})
359
}
360
361
log.Info("finished reconciling all nodes")
362
return nil
363
}
364
365
func (r *NodeReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
366
var node corev1.Node
367
err := r.Get(ctx, req.NamespacedName, &node)
368
if err != nil {
369
if !errors.IsNotFound(err) {
370
log.WithError(err).Error("unable to fetch node")
371
}
372
return ctrl.Result{}, client.IgnoreNotFound(err)
373
}
374
var podList corev1.PodList
375
err = r.List(ctx, &podList, client.MatchingFields{
376
"spec.nodeName": node.Name,
377
})
378
if err != nil {
379
return reconcile.Result{}, fmt.Errorf("cannot list pods: %w", err)
380
}
381
err = updateNodeLabel(node.Name, r.Client)
382
if err != nil {
383
log.WithError(err).WithField("node", node.Name).Error("failed to initialize labels on node")
384
}
385
isWsdaemonTaintExists := isNodeTaintExists(wsDaemonTaintKey, node)
386
isRegistryFacadeTaintExists := isNodeTaintExists(registryFacadeTaintKey, node)
387
isWsDaemonReady, isRegistryFacadeReady := false, false
388
for _, pod := range podList.Items {
389
if strings.HasPrefix(pod.Name, wsDaemon) {
390
isWsDaemonReady, err = checkPodHealth(pod)
391
if err != nil {
392
log.WithError(err).Error("checking pod health")
393
}
394
}
395
if strings.HasPrefix(pod.Name, registryFacade) {
396
isRegistryFacadeReady, err = checkPodHealth(pod)
397
if err != nil {
398
log.WithError(err).Error("checking pod health")
399
}
400
}
401
}
402
if isWsDaemonReady == isWsdaemonTaintExists {
403
updateNodeTaint(wsDaemonTaintKey, !isWsDaemonReady, node.Name, r)
404
}
405
if isRegistryFacadeReady == isRegistryFacadeTaintExists {
406
updateNodeTaint(registryFacadeTaintKey, !isRegistryFacadeReady, node.Name, r)
407
}
408
return reconcile.Result{}, nil
409
}
410
411
type NodeScaledownAnnotationController struct {
412
client.Client
413
nodesToReconcile chan string
414
stopChan chan struct{}
415
}
416
417
func NewNodeScaledownAnnotationController(client client.Client) (*NodeScaledownAnnotationController, error) {
418
controller := &NodeScaledownAnnotationController{
419
Client: client,
420
nodesToReconcile: make(chan string, 1000),
421
stopChan: make(chan struct{}),
422
}
423
424
return controller, nil
425
}
426
427
func (c *NodeScaledownAnnotationController) SetupWithManager(mgr ctrl.Manager) error {
428
go c.reconciliationWorker()
429
go c.periodicReconciliation()
430
431
return ctrl.NewControllerManagedBy(mgr).
432
Named("node-scaledown-annotation-controller").
433
For(&workspacev1.Workspace{}).
434
WithEventFilter(c.workspaceFilter()).
435
Complete(c)
436
}
437
438
// periodicReconciliation periodically reconciles all nodes in the cluster
439
func (c *NodeScaledownAnnotationController) periodicReconciliation() {
440
ticker := time.NewTicker(5 * time.Minute)
441
defer ticker.Stop()
442
443
for {
444
select {
445
case <-ticker.C:
446
log.Info("starting periodic full reconciliation")
447
ctx := context.Background()
448
if _, err := c.reconcileAllNodes(ctx); err != nil {
449
log.WithError(err).Error("periodic reconciliation failed")
450
}
451
case <-c.stopChan:
452
log.Info("stopping periodic full reconciliation")
453
return
454
}
455
}
456
}
457
458
// reconciliationWorker consumes nodesToReconcile and reconciles each node
459
func (c *NodeScaledownAnnotationController) reconciliationWorker() {
460
log.Info("reconciliation worker started")
461
for {
462
select {
463
case nodeName := <-c.nodesToReconcile:
464
ctx := context.Background()
465
if err := c.reconcileNode(ctx, nodeName); err != nil {
466
log.WithError(err).WithField("node", nodeName).Error("failed to reconcile node from queue")
467
}
468
case <-c.stopChan:
469
log.Info("reconciliation worker stopping")
470
return
471
}
472
}
473
}
474
475
func (c *NodeScaledownAnnotationController) workspaceFilter() predicate.Predicate {
476
return predicate.Funcs{
477
CreateFunc: func(e event.CreateEvent) bool {
478
ws := e.Object.(*workspacev1.Workspace)
479
if ws.Status.Runtime == nil {
480
log.WithField("workspace", ws.Name).Info("workspace not ready yet")
481
return false
482
}
483
484
return ws.Status.Runtime != nil && ws.Status.Runtime.NodeName != ""
485
},
486
UpdateFunc: func(e event.UpdateEvent) bool {
487
wsOld := e.ObjectOld.(*workspacev1.Workspace)
488
ws := e.ObjectNew.(*workspacev1.Workspace)
489
// if we haven't seen runtime info before and now it's there, let's reconcile.
490
// similarly, if the node name changed, we need to reconcile the old node as well.
491
if (wsOld.Status.Runtime == nil && ws.Status.Runtime != nil && ws.Status.Runtime.NodeName != "") || // we just got runtime info
492
(wsOld.Status.Runtime != nil && ws.Status.Runtime != nil && wsOld.Status.Runtime.NodeName != ws.Status.Runtime.NodeName) { // node name changed
493
if wsOld.Status.Runtime != nil && wsOld.Status.Runtime.NodeName != "" {
494
c.queueNodeForReconciliation(wsOld.Status.Runtime.NodeName)
495
}
496
return true
497
}
498
499
return false
500
},
501
DeleteFunc: func(e event.DeleteEvent) bool {
502
ws := e.Object.(*workspacev1.Workspace)
503
if ws.Status.Runtime != nil && ws.Status.Runtime.NodeName != "" {
504
c.queueNodeForReconciliation(ws.Status.Runtime.NodeName)
505
return true
506
}
507
return false
508
},
509
}
510
}
511
512
func (c *NodeScaledownAnnotationController) queueNodeForReconciliation(nodeName string) {
513
select {
514
case c.nodesToReconcile <- nodeName:
515
log.WithField("node", nodeName).Info("queued node for reconciliation")
516
default:
517
log.WithField("node", nodeName).Warn("reconciliation queue full")
518
}
519
}
520
521
func (c *NodeScaledownAnnotationController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
522
log.WithField("request", req.NamespacedName.String()).Info("WorkspaceCountController reconciling")
523
524
var ws workspacev1.Workspace
525
if err := c.Get(ctx, req.NamespacedName, &ws); err != nil {
526
if !errors.IsNotFound(err) {
527
log.WithError(err).WithField("workspace", req.NamespacedName).Error("unable to fetch Workspace")
528
return ctrl.Result{}, err
529
}
530
return ctrl.Result{}, nil
531
}
532
533
if ws.Status.Runtime != nil && ws.Status.Runtime.NodeName != "" {
534
c.queueNodeForReconciliation(ws.Status.Runtime.NodeName)
535
}
536
537
log.WithField("runtime", ws.Status.Runtime).Warn("reconciling object with no Runtime/NodeName, which wasn't filtered out by workspaceFilter")
538
return ctrl.Result{}, nil
539
}
540
541
// Cleanup method to be called when shutting down the controller
542
func (wc *NodeScaledownAnnotationController) Stop() {
543
close(wc.stopChan)
544
}
545
546
func (c *NodeScaledownAnnotationController) reconcileAllNodes(ctx context.Context) (ctrl.Result, error) {
547
var nodes corev1.NodeList
548
if err := c.List(ctx, &nodes); err != nil {
549
log.WithError(err).Error("failed to list nodes")
550
return ctrl.Result{}, err
551
}
552
553
for _, node := range nodes.Items {
554
c.queueNodeForReconciliation(node.Name)
555
}
556
557
return ctrl.Result{}, nil
558
}
559
560
func (c *NodeScaledownAnnotationController) reconcileNode(ctx context.Context, nodeName string) error {
561
var workspaceList workspacev1.WorkspaceList
562
if err := c.List(ctx, &workspaceList, client.MatchingFields{
563
"status.runtime.nodeName": nodeName,
564
}); err != nil {
565
return fmt.Errorf("failed to list workspaces: %w", err)
566
}
567
568
log.WithField("node", nodeName).WithField("count", len(workspaceList.Items)).Info("acting on workspaces")
569
count := len(workspaceList.Items)
570
571
return c.updateNodeAnnotation(ctx, nodeName, count)
572
}
573
574
func (c *NodeScaledownAnnotationController) updateNodeAnnotation(ctx context.Context, nodeName string, count int) error {
575
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
576
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
577
defer cancel()
578
579
var node corev1.Node
580
err := c.Get(ctx, types.NamespacedName{Name: nodeName}, &node)
581
if err != nil {
582
return fmt.Errorf("obtaining node %s: %w", nodeName, err)
583
}
584
585
shouldDisableScaleDown := count > 0
586
currentlyDisabled := false
587
if val, exists := node.Annotations["cluster-autoscaler.kubernetes.io/scale-down-disabled"]; exists {
588
currentlyDisabled = val == "true"
589
}
590
591
// Only update if the state needs to change
592
if shouldDisableScaleDown != currentlyDisabled {
593
if node.Annotations == nil {
594
node.Annotations = make(map[string]string)
595
}
596
597
if shouldDisableScaleDown {
598
node.Annotations["cluster-autoscaler.kubernetes.io/scale-down-disabled"] = "true"
599
log.WithField("nodeName", nodeName).Info("disabling scale-down for node")
600
} else {
601
delete(node.Annotations, "cluster-autoscaler.kubernetes.io/scale-down-disabled")
602
log.WithField("nodeName", nodeName).Info("enabling scale-down for node")
603
}
604
605
return c.Update(ctx, &node)
606
}
607
608
return nil
609
})
610
}
611
612
func updateNodeTaint(taintKey string, add bool, nodeName string, client client.Client) error {
613
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
614
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
615
defer cancel()
616
617
var node corev1.Node
618
err := client.Get(ctx, types.NamespacedName{Name: nodeName}, &node)
619
if err != nil {
620
if !errors.IsNotFound(err) {
621
return err
622
}
623
return nil
624
}
625
626
// Create or remove taint
627
if add {
628
// Add taint if it doesn't exist
629
taintExists := false
630
for _, taint := range node.Spec.Taints {
631
if taint.Key == taintKey {
632
taintExists = true
633
break
634
}
635
}
636
if !taintExists {
637
node.Spec.Taints = append(node.Spec.Taints, corev1.Taint{
638
Key: taintKey,
639
Value: "true",
640
Effect: corev1.TaintEffectNoSchedule,
641
})
642
log.WithField("taint", taintKey).WithField("node", nodeName).Info("adding taint to node")
643
}
644
} else {
645
// Remove taint if it exists
646
newTaints := make([]corev1.Taint, 0)
647
for _, taint := range node.Spec.Taints {
648
if taint.Key != taintKey {
649
newTaints = append(newTaints, taint)
650
}
651
}
652
if len(newTaints) != len(node.Spec.Taints) {
653
node.Spec.Taints = newTaints
654
log.WithField("taint", taintKey).WithField("node", nodeName).Info("removing taint from node")
655
}
656
}
657
658
err = client.Update(ctx, &node)
659
if err != nil {
660
return err
661
}
662
663
return nil
664
})
665
}
666
667
func isNodeTaintExists(taintKey string, node corev1.Node) bool {
668
for _, taint := range node.Spec.Taints {
669
if taint.Key == taintKey {
670
return true
671
}
672
}
673
return false
674
}
675
676
func checkTCPPortIsReachable(host string, port string) error {
677
conn, err := net.DialTimeout("tcp", net.JoinHostPort(host, port), 1*time.Second)
678
if err != nil {
679
return err
680
}
681
defer conn.Close()
682
683
return nil
684
}
685
686
func checkRegistryFacade(host, port string) error {
687
transport := newDefaultTransport()
688
transport.TLSClientConfig = &tls.Config{
689
InsecureSkipVerify: true,
690
}
691
692
client := &http.Client{
693
Transport: transport,
694
}
695
696
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
697
defer cancel()
698
699
dummyURL := fmt.Sprintf("https://%v:%v/v2/remote/not-a-valid-image/manifests/latest", host, port)
700
req, err := http.NewRequestWithContext(ctx, http.MethodGet, dummyURL, nil)
701
if err != nil {
702
return fmt.Errorf("building HTTP request: %v", err)
703
}
704
705
req.Header.Set("Accept", "application/vnd.oci.image.manifest.v1+json, application/vnd.oci.image.index.v1+json")
706
resp, err := client.Do(req)
707
if err != nil {
708
return fmt.Errorf("unexpected error during HTTP request: %v", err)
709
}
710
resp.Body.Close()
711
712
if resp.StatusCode == http.StatusNotFound {
713
return nil
714
}
715
716
return fmt.Errorf("registry-facade is not ready yet")
717
}
718
719
func newDefaultTransport() *http.Transport {
720
return &http.Transport{
721
DialContext: (&net.Dialer{
722
Timeout: 1 * time.Second,
723
DualStack: false,
724
}).DialContext,
725
MaxIdleConns: 0,
726
MaxIdleConnsPerHost: 1,
727
IdleConnTimeout: 5 * time.Second,
728
ExpectContinueTimeout: 5 * time.Second,
729
DisableKeepAlives: true,
730
}
731
}
732
733
func isWorkspaceNode(node corev1.Node) bool {
734
_, isRegularWorkspaceNode := node.Labels[workspacesRegularLabel]
735
_, isHeadlessWorkspaceNode := node.Labels[workspacesHeadlessLabel]
736
return isRegularWorkspaceNode || isHeadlessWorkspaceNode
737
}
738
739
func updateNodeLabel(nodeName string, client client.Client) error {
740
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
741
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
742
defer cancel()
743
744
var node corev1.Node
745
err := client.Get(ctx, types.NamespacedName{Name: nodeName}, &node)
746
if err != nil {
747
return err
748
}
749
750
registryFacadeLabelForNamespace := fmt.Sprintf(registryFacadeLabel, namespace)
751
wsDaemonLabelForNamespace := fmt.Sprintf(wsdaemonLabel, namespace)
752
753
needUpdate := false
754
755
if node.Labels == nil {
756
node.Labels = make(map[string]string)
757
}
758
759
if v := node.Labels[registryFacadeLabelForNamespace]; v != "true" {
760
needUpdate = true
761
}
762
if v := node.Labels[wsDaemonLabelForNamespace]; v != "true" {
763
needUpdate = true
764
}
765
766
if !needUpdate {
767
return nil
768
}
769
node.Labels[registryFacadeLabelForNamespace] = "true"
770
node.Labels[wsDaemonLabelForNamespace] = "true"
771
772
err = client.Update(ctx, &node)
773
if err != nil {
774
return err
775
}
776
777
return nil
778
})
779
}
780
781