Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/operator/kubelet.go
4093 views
1
package operator
2
3
import (
4
"context"
5
"fmt"
6
7
"github.com/go-kit/log"
8
"github.com/go-kit/log/level"
9
"github.com/grafana/agent/pkg/operator/clientutil"
10
"github.com/grafana/agent/pkg/operator/logutil"
11
core_v1 "k8s.io/api/core/v1"
12
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13
controller "sigs.k8s.io/controller-runtime"
14
"sigs.k8s.io/controller-runtime/pkg/client"
15
)
16
17
type kubeletReconciler struct {
18
client.Client
19
kubeletNamespace, kubeletName string
20
}
21
22
func (r *kubeletReconciler) Reconcile(ctx context.Context, req controller.Request) (res controller.Result, err error) {
23
l := logutil.FromContext(ctx)
24
level.Info(l).Log("msg", "reconciling node")
25
26
var nodes core_v1.NodeList
27
if err := r.List(ctx, &nodes); err != nil {
28
level.Error(l).Log("msg", "failed to list nodes for kubelet service", "err", err)
29
return res, fmt.Errorf("unable to list nodes: %w", err)
30
}
31
nodeAddrs, err := getNodeAddrs(l, &nodes)
32
if err != nil {
33
level.Error(l).Log("msg", "could not get addresses from all nodes", "err", err)
34
return res, fmt.Errorf("unable to get addresses from nodes: %w", err)
35
}
36
37
labels := mergeMaps(managedByOperatorLabels, map[string]string{
38
// Labels taken from prometheus-operator:
39
// https://github.com/prometheus-operator/prometheus-operator/blob/2c81b0cf6a5673e08057499a08ddce396b19dda4/pkg/prometheus/operator.go#L586-L587
40
"k8s-app": "kubelet",
41
"app.kubernetes.io/name": "kubelet",
42
})
43
44
svc := &core_v1.Service{
45
ObjectMeta: meta_v1.ObjectMeta{
46
Name: r.kubeletName,
47
Namespace: r.kubeletNamespace,
48
Labels: labels,
49
},
50
Spec: core_v1.ServiceSpec{
51
Type: core_v1.ServiceTypeClusterIP,
52
ClusterIP: "None",
53
Ports: []core_v1.ServicePort{
54
{Name: "https-metrics", Port: 10250},
55
{Name: "http-metrics", Port: 10255},
56
{Name: "cadvisor", Port: 4194},
57
},
58
},
59
}
60
61
eps := &core_v1.Endpoints{
62
ObjectMeta: meta_v1.ObjectMeta{
63
Name: r.kubeletName,
64
Namespace: r.kubeletNamespace,
65
Labels: labels,
66
},
67
Subsets: []core_v1.EndpointSubset{{
68
Addresses: nodeAddrs,
69
Ports: []core_v1.EndpointPort{
70
// Taken from https://github.com/prometheus-operator/prometheus-operator/blob/2c81b0cf6a5673e08057499a08ddce396b19dda4/pkg/prometheus/operator.go#L593
71
{Name: "https-metrics", Port: 10250},
72
{Name: "http-metrics", Port: 10255},
73
{Name: "cadvisor", Port: 4194},
74
},
75
}},
76
}
77
78
level.Debug(l).Log("msg", "reconciling kubelet service", "svc", client.ObjectKeyFromObject(svc))
79
err = clientutil.CreateOrUpdateService(ctx, r.Client, svc)
80
if err != nil {
81
return res, fmt.Errorf("failed to reconcile kubelet service %s: %w", client.ObjectKeyFromObject(svc), err)
82
}
83
84
level.Debug(l).Log("msg", "reconciling kubelet endpoints", "eps", client.ObjectKeyFromObject(eps))
85
err = clientutil.CreateOrUpdateEndpoints(ctx, r.Client, eps)
86
if err != nil {
87
return res, fmt.Errorf("failed to reconcile kubelet endpoints %s: %w", client.ObjectKeyFromObject(eps), err)
88
}
89
90
return
91
}
92
93
// mergeMaps merges the contents of b with a. Keys from b take precedence.
94
func mergeMaps(a, b map[string]string) map[string]string {
95
res := make(map[string]string)
96
for k, v := range a {
97
res[k] = v
98
}
99
for k, v := range b {
100
res[k] = v
101
}
102
return res
103
}
104
105
func getNodeAddrs(l log.Logger, nodes *core_v1.NodeList) (addrs []core_v1.EndpointAddress, err error) {
106
var failed bool
107
108
for _, n := range nodes.Items {
109
addr, err := nodeAddress(n)
110
if err != nil {
111
level.Error(l).Log("msg", "failed to get address from node", "node", n.Name, "err", err)
112
failed = true
113
}
114
115
addrs = append(addrs, core_v1.EndpointAddress{
116
IP: addr,
117
TargetRef: &core_v1.ObjectReference{
118
Kind: n.Kind,
119
APIVersion: n.APIVersion,
120
Name: n.Name,
121
UID: n.UID,
122
},
123
})
124
}
125
126
if failed {
127
return nil, fmt.Errorf("failed to get the address from one or more nodes")
128
}
129
return
130
}
131
132
// nodeAddresses returns the provided node's address, based on the priority:
133
//
134
// 1. NodeInternalIP
135
// 2. NodeExternalIP
136
//
137
// Copied from github.com/prometheus/prometheus/discovery/kubernetes/node.go
138
func nodeAddress(node core_v1.Node) (string, error) {
139
m := map[core_v1.NodeAddressType][]string{}
140
for _, a := range node.Status.Addresses {
141
m[a.Type] = append(m[a.Type], a.Address)
142
}
143
144
if addresses, ok := m[core_v1.NodeInternalIP]; ok {
145
return addresses[0], nil
146
}
147
if addresses, ok := m[core_v1.NodeExternalIP]; ok {
148
return addresses[0], nil
149
}
150
return "", fmt.Errorf("host address unknown")
151
}
152
153