Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/operator/clientutil/clientutil.go
4095 views
1
package clientutil
2
3
import (
4
"context"
5
"fmt"
6
"regexp"
7
"strings"
8
9
"github.com/go-kit/log"
10
"github.com/go-kit/log/level"
11
apps_v1 "k8s.io/api/apps/v1"
12
v1 "k8s.io/api/core/v1"
13
k8s_errors "k8s.io/apimachinery/pkg/api/errors"
14
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15
"k8s.io/apimachinery/pkg/types"
16
"k8s.io/apimachinery/pkg/util/validation"
17
"sigs.k8s.io/controller-runtime/pkg/client"
18
)
19
20
var invalidDNS1123Characters = regexp.MustCompile("[^-a-z0-9]+")
21
22
// SanitizeVolumeName ensures that the given volume name is a valid DNS-1123 label
23
// accepted by Kubernetes.
24
//
25
// Copied from github.com/prometheus-operator/prometheus-operator/pkg/k8sutil.
26
func SanitizeVolumeName(name string) string {
27
name = strings.ToLower(name)
28
name = invalidDNS1123Characters.ReplaceAllString(name, "-")
29
if len(name) > validation.DNS1123LabelMaxLength {
30
name = name[0:validation.DNS1123LabelMaxLength]
31
}
32
return strings.Trim(name, "-")
33
}
34
35
// CreateOrUpdateSecret applies the given secret against the client.
36
func CreateOrUpdateSecret(ctx context.Context, c client.Client, s *v1.Secret) error {
37
var exist v1.Secret
38
err := c.Get(ctx, client.ObjectKeyFromObject(s), &exist)
39
if err != nil && !k8s_errors.IsNotFound(err) {
40
return fmt.Errorf("failed to retrieve existing service: %w", err)
41
}
42
43
if k8s_errors.IsNotFound(err) {
44
err := c.Create(ctx, s)
45
if err != nil {
46
return fmt.Errorf("failed to create service: %w", err)
47
}
48
} else {
49
s.ResourceVersion = exist.ResourceVersion
50
s.SetOwnerReferences(mergeOwnerReferences(s.GetOwnerReferences(), exist.GetOwnerReferences()))
51
s.SetLabels(mergeMaps(s.Labels, exist.Labels))
52
s.SetAnnotations(mergeMaps(s.Annotations, exist.Annotations))
53
54
err := c.Update(ctx, s)
55
if err != nil && !k8s_errors.IsNotFound(err) {
56
return fmt.Errorf("failed to update service: %w", err)
57
}
58
}
59
60
return nil
61
}
62
63
// CreateOrUpdateService applies the given svc against the client.
64
func CreateOrUpdateService(ctx context.Context, c client.Client, svc *v1.Service) error {
65
var exist v1.Service
66
err := c.Get(ctx, client.ObjectKeyFromObject(svc), &exist)
67
if err != nil && !k8s_errors.IsNotFound(err) {
68
return fmt.Errorf("failed to retrieve existing service: %w", err)
69
}
70
71
if k8s_errors.IsNotFound(err) {
72
err := c.Create(ctx, svc)
73
if err != nil {
74
return fmt.Errorf("failed to create service: %w", err)
75
}
76
} else {
77
svc.ResourceVersion = exist.ResourceVersion
78
svc.Spec.IPFamilies = exist.Spec.IPFamilies
79
svc.SetOwnerReferences(mergeOwnerReferences(svc.GetOwnerReferences(), exist.GetOwnerReferences()))
80
svc.SetLabels(mergeMaps(svc.Labels, exist.Labels))
81
svc.SetAnnotations(mergeMaps(svc.Annotations, exist.Annotations))
82
83
err := c.Update(ctx, svc)
84
if err != nil && !k8s_errors.IsNotFound(err) {
85
return fmt.Errorf("failed to update service: %w", err)
86
}
87
}
88
89
return nil
90
}
91
92
// CreateOrUpdateEndpoints applies the given eps against the client.
93
func CreateOrUpdateEndpoints(ctx context.Context, c client.Client, eps *v1.Endpoints) error {
94
var exist v1.Endpoints
95
err := c.Get(ctx, client.ObjectKeyFromObject(eps), &exist)
96
if err != nil && !k8s_errors.IsNotFound(err) {
97
return fmt.Errorf("failed to retrieve existing endpoints: %w", err)
98
}
99
100
if k8s_errors.IsNotFound(err) {
101
err := c.Create(ctx, eps)
102
if err != nil {
103
return fmt.Errorf("failed to create endpoints: %w", err)
104
}
105
} else {
106
eps.ResourceVersion = exist.ResourceVersion
107
eps.SetOwnerReferences(mergeOwnerReferences(eps.GetOwnerReferences(), exist.GetOwnerReferences()))
108
eps.SetLabels(mergeMaps(eps.Labels, exist.Labels))
109
eps.SetAnnotations(mergeMaps(eps.Annotations, exist.Annotations))
110
111
err := c.Update(ctx, eps)
112
if err != nil && !k8s_errors.IsNotFound(err) {
113
return fmt.Errorf("failed to update endpoints: %w", err)
114
}
115
}
116
117
return nil
118
}
119
120
// CreateOrUpdateStatefulSet applies the given StatefulSet against the client.
121
func CreateOrUpdateStatefulSet(ctx context.Context, c client.Client, ss *apps_v1.StatefulSet, l log.Logger) error {
122
var exist apps_v1.StatefulSet
123
err := c.Get(ctx, client.ObjectKeyFromObject(ss), &exist)
124
if err != nil && !k8s_errors.IsNotFound(err) {
125
return fmt.Errorf("failed to retrieve existing statefulset: %w", err)
126
}
127
128
if k8s_errors.IsNotFound(err) {
129
err := c.Create(ctx, ss)
130
if err != nil {
131
return fmt.Errorf("failed to create statefulset: %w", err)
132
}
133
} else {
134
ss.ResourceVersion = exist.ResourceVersion
135
ss.SetOwnerReferences(mergeOwnerReferences(ss.GetOwnerReferences(), exist.GetOwnerReferences()))
136
ss.SetLabels(mergeMaps(ss.Labels, exist.Labels))
137
ss.SetAnnotations(mergeMaps(ss.Annotations, exist.Annotations))
138
139
err := c.Update(ctx, ss)
140
if k8s_errors.IsNotAcceptable(err) || k8s_errors.IsInvalid(err) {
141
level.Error(l).Log("msg", "error updating StatefulSet. Attempting to recreate", "err", err.Error())
142
// Resource version should only be set when updating
143
ss.ResourceVersion = ""
144
145
err = c.Delete(ctx, ss)
146
if err != nil {
147
return fmt.Errorf("failed to update statefulset when deleting old statefulset: %w", err)
148
}
149
err = c.Create(ctx, ss)
150
if err != nil {
151
return fmt.Errorf("failed to update statefulset when creating replacement statefulset: %w", err)
152
}
153
} else if err != nil {
154
return fmt.Errorf("failed to update statefulset: %w", err)
155
}
156
}
157
158
return nil
159
}
160
161
// CreateOrUpdateDaemonSet applies the given DaemonSet against the client.
162
func CreateOrUpdateDaemonSet(ctx context.Context, c client.Client, ss *apps_v1.DaemonSet, l log.Logger) error {
163
var exist apps_v1.DaemonSet
164
err := c.Get(ctx, client.ObjectKeyFromObject(ss), &exist)
165
if err != nil && !k8s_errors.IsNotFound(err) {
166
return fmt.Errorf("failed to retrieve existing daemonset: %w", err)
167
}
168
169
if k8s_errors.IsNotFound(err) {
170
err := c.Create(ctx, ss)
171
if err != nil {
172
return fmt.Errorf("failed to create daemonset: %w", err)
173
}
174
} else {
175
ss.ResourceVersion = exist.ResourceVersion
176
ss.SetOwnerReferences(mergeOwnerReferences(ss.GetOwnerReferences(), exist.GetOwnerReferences()))
177
ss.SetLabels(mergeMaps(ss.Labels, exist.Labels))
178
ss.SetAnnotations(mergeMaps(ss.Annotations, exist.Annotations))
179
180
err := c.Update(ctx, ss)
181
if k8s_errors.IsNotAcceptable(err) || k8s_errors.IsInvalid(err) {
182
level.Error(l).Log("msg", "error updating Daemonset. Attempting to recreate", "err", err.Error())
183
// Resource version should only be set when updating
184
ss.ResourceVersion = ""
185
186
err = c.Delete(ctx, ss)
187
if err != nil {
188
return fmt.Errorf("failed to update daemonset: deleting old daemonset: %w", err)
189
}
190
err = c.Create(ctx, ss)
191
if err != nil {
192
return fmt.Errorf("failed to update daemonset: creating new deamonset: %w", err)
193
}
194
} else if err != nil {
195
return fmt.Errorf("failed to update daemonset: %w", err)
196
}
197
}
198
199
return nil
200
}
201
202
// CreateOrUpdateDeployment applies the given DaemonSet against the client.
203
func CreateOrUpdateDeployment(ctx context.Context, c client.Client, d *apps_v1.Deployment, l log.Logger) error {
204
var exist apps_v1.Deployment
205
err := c.Get(ctx, client.ObjectKeyFromObject(d), &exist)
206
if err != nil && !k8s_errors.IsNotFound(err) {
207
return fmt.Errorf("failed to retrieve existing Deployment: %w", err)
208
}
209
210
if k8s_errors.IsNotFound(err) {
211
err := c.Create(ctx, d)
212
if err != nil {
213
return fmt.Errorf("failed to create Deployment: %w", err)
214
}
215
} else {
216
d.ResourceVersion = exist.ResourceVersion
217
d.SetOwnerReferences(mergeOwnerReferences(d.GetOwnerReferences(), exist.GetOwnerReferences()))
218
d.SetLabels(mergeMaps(d.Labels, exist.Labels))
219
d.SetAnnotations(mergeMaps(d.Annotations, exist.Annotations))
220
221
err := c.Update(ctx, d)
222
if k8s_errors.IsNotAcceptable(err) || k8s_errors.IsInvalid(err) {
223
level.Error(l).Log("msg", "error updating Deployment. Attempting to recreate", "err", err.Error())
224
// Resource version should only be set when updating
225
d.ResourceVersion = ""
226
227
err = c.Delete(ctx, d)
228
if err != nil {
229
return fmt.Errorf("failed to update Deployment: deleting old Deployment: %w", err)
230
}
231
err = c.Create(ctx, d)
232
if err != nil {
233
return fmt.Errorf("failed to update Deployment: creating new Deployment: %w", err)
234
}
235
} else if err != nil {
236
return fmt.Errorf("failed to update Deployment: %w", err)
237
}
238
}
239
240
return nil
241
}
242
243
func mergeOwnerReferences(new, old []meta_v1.OwnerReference) []meta_v1.OwnerReference {
244
existing := make(map[types.UID]bool)
245
for _, ref := range old {
246
existing[ref.UID] = true
247
}
248
for _, ref := range new {
249
if _, ok := existing[ref.UID]; !ok {
250
old = append(old, ref)
251
}
252
}
253
return old
254
}
255
256
func mergeMaps(new, old map[string]string) map[string]string {
257
if old == nil {
258
old = make(map[string]string, len(new))
259
}
260
for k, v := range new {
261
old[k] = v
262
}
263
return old
264
}
265
266