Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/dev/blowtorch/pkg/dart/injector.go
2500 views
1
// Copyright (c) 2020 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package dart
6
7
import (
8
"context"
9
"fmt"
10
"strings"
11
"time"
12
13
log "github.com/sirupsen/logrus"
14
"golang.org/x/xerrors"
15
appsv1 "k8s.io/api/apps/v1"
16
corev1 "k8s.io/api/core/v1"
17
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18
"k8s.io/apimachinery/pkg/labels"
19
"k8s.io/apimachinery/pkg/util/intstr"
20
"k8s.io/apimachinery/pkg/util/wait"
21
"k8s.io/client-go/kubernetes"
22
"k8s.io/client-go/rest"
23
)
24
25
const (
26
fmtOriginalService = "%s-original"
27
fmtProxyDeployment = "%s-toxiproxy"
28
renamedServiceLabelPrefix = "renamed/"
29
)
30
31
type injectOptions struct {
32
AdditionalRoutes map[int][]int
33
}
34
35
// InjectOption customizes the injection behaviour
36
type InjectOption func(*injectOptions)
37
38
// WithAdditionalRoute adds an additional route over the same service but on a different port.
39
// This is handy if one wants to affect services differently from each other, as each of them
40
// get different proxies.
41
func WithAdditionalRoute(targetPort int, additionalPort int) InjectOption {
42
return func(o *injectOptions) {
43
var r = o.AdditionalRoutes[targetPort]
44
r = append(r, additionalPort)
45
o.AdditionalRoutes[targetPort] = r
46
}
47
}
48
49
// Inject launches a toxiproxy pod that forwards all original traffic from a service
50
// via that toxiproxy.
51
func Inject(cfg *rest.Config, namespace, targetService string, options ...InjectOption) (*ProxiedToxiproxy, error) {
52
opts := injectOptions{
53
AdditionalRoutes: make(map[int][]int),
54
}
55
for _, opt := range options {
56
opt(&opts)
57
}
58
59
client, err := kubernetes.NewForConfig(cfg)
60
if err != nil {
61
return nil, err
62
}
63
64
oldService, err := client.CoreV1().Services(namespace).Get(context.Background(), targetService, metav1.GetOptions{})
65
if err != nil {
66
return nil, err
67
}
68
log.WithField("name", oldService.Name).Info("target service found")
69
70
// service.Spec.Selector
71
pods, err := client.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{
72
LabelSelector: labels.FormatLabels(oldService.Spec.Selector),
73
})
74
if err != nil {
75
return nil, err
76
}
77
if len(pods.Items) == 0 {
78
return nil, xerrors.Errorf("found no pods matching the service selector")
79
}
80
originalPod := pods.Items[0]
81
log.WithField("name", originalPod.Name).Info("original pods found")
82
83
var ndname string
84
if opsegs := strings.Split(originalPod.Name, "-"); len(opsegs) > 2 {
85
ndname = strings.Join(opsegs[:len(opsegs)-2], "-")
86
} else {
87
ndname = originalPod.Name
88
}
89
ndname = fmt.Sprintf(fmtProxyDeployment, ndname)
90
91
var (
92
labels = map[string]string{
93
"blowtorch.sh/component": "toxiproxy",
94
"blowtorch.sh/id": originalPod.ResourceVersion,
95
}
96
replicas int32 = 1
97
uid int64 = 1000
98
)
99
newDeployment := &appsv1.Deployment{
100
ObjectMeta: metav1.ObjectMeta{
101
Name: ndname,
102
Namespace: originalPod.Namespace,
103
Labels: labels,
104
},
105
Spec: appsv1.DeploymentSpec{
106
Replicas: &replicas,
107
Selector: metav1.SetAsLabelSelector(labels),
108
Template: corev1.PodTemplateSpec{
109
ObjectMeta: metav1.ObjectMeta{
110
Name: ndname,
111
Labels: labels,
112
},
113
Spec: corev1.PodSpec{
114
Containers: []corev1.Container{
115
{
116
Name: "proxy",
117
Image: "shopify/toxiproxy",
118
SecurityContext: &corev1.SecurityContext{
119
RunAsUser: &uid,
120
},
121
},
122
},
123
ServiceAccountName: originalPod.Spec.ServiceAccountName,
124
},
125
},
126
},
127
}
128
_, err = client.AppsV1().Deployments(namespace).Create(context.Background(), newDeployment, metav1.CreateOptions{})
129
if err != nil && !strings.Contains(err.Error(), "already exists") {
130
return nil, err
131
}
132
log.WithField("name", ndname).Info("deployment created")
133
134
renamedSpec := oldService.Spec.DeepCopy()
135
renamedSpec.ClusterIP = ""
136
renamedSpec.ClusterIPs = []string{}
137
renamedMeta := oldService.ObjectMeta.DeepCopy()
138
renamedMeta.Name = fmt.Sprintf(fmtOriginalService, oldService.Name)
139
renamedMeta.ResourceVersion = ""
140
renamedMeta.Labels = make(map[string]string)
141
for k, v := range oldService.ObjectMeta.Labels {
142
renamedKey := fmt.Sprintf("%s%s", renamedServiceLabelPrefix, k)
143
renamedMeta.Labels[renamedKey] = v
144
}
145
renamedService := &corev1.Service{
146
ObjectMeta: *renamedMeta,
147
Spec: *renamedSpec,
148
}
149
renamedService, err = client.CoreV1().Services(namespace).Create(context.Background(), renamedService, metav1.CreateOptions{})
150
if err != nil {
151
return nil, err
152
}
153
var (
154
deletionPolicy = metav1.DeletePropagationForeground
155
gracePeriod int64 = 30
156
)
157
err = client.CoreV1().Services(namespace).Delete(context.Background(), oldService.Name,
158
metav1.DeleteOptions{PropagationPolicy: &deletionPolicy, GracePeriodSeconds: &gracePeriod})
159
if err != nil {
160
return nil, err
161
}
162
log.Info("original service renamed")
163
164
newSpec := oldService.Spec.DeepCopy()
165
newSpec.ClusterIP = ""
166
newSpec.ClusterIPs = []string{}
167
newSpec.Selector = labels
168
newMeta := oldService.ObjectMeta.DeepCopy()
169
if newMeta.Labels == nil {
170
newMeta.Labels = make(map[string]string)
171
}
172
for k, v := range labels {
173
newMeta.Labels[k] = v
174
}
175
newMeta.ResourceVersion = ""
176
var additionalPorts []corev1.ServicePort
177
for tp, aps := range opts.AdditionalRoutes {
178
var op *corev1.ServicePort
179
for _, pspec := range newSpec.Ports {
180
if pspec.TargetPort.IntValue() == tp {
181
op = &pspec
182
break
183
}
184
}
185
if op == nil {
186
log.WithField("targetPort", tp).WithField("routes", aps).Warn("cannot find target port in service - not adding additional routes")
187
continue
188
}
189
190
for _, ap := range aps {
191
sp := op.DeepCopy()
192
sp.Name = fmt.Sprintf("%s-ar-%d", sp.Name, ap)
193
sp.Port = int32(ap)
194
sp.TargetPort = intstr.FromInt(ap)
195
additionalPorts = append(additionalPorts, *sp)
196
}
197
}
198
newSpec.Ports = append(newSpec.Ports, additionalPorts...)
199
newService := &corev1.Service{
200
ObjectMeta: *newMeta,
201
Spec: *newSpec,
202
}
203
for i := 0; i < 10; i++ {
204
cs, err := client.CoreV1().Services(namespace).Create(context.Background(), newService, metav1.CreateOptions{})
205
if err == nil {
206
newService = cs
207
break
208
}
209
210
if strings.Contains(err.Error(), "object is being deleted") {
211
log.WithField("name", newService.Name).Info("original service still exists - retrying in four seconds")
212
time.Sleep(4 * time.Second)
213
continue
214
}
215
return nil, err
216
}
217
log.WithField("name", newService.Name).Info("new service created")
218
219
err = wait.PollImmediate(1*time.Second, 30*time.Second, func() (bool, error) {
220
depl, err := client.AppsV1().Deployments(namespace).Get(context.Background(), newDeployment.Name, metav1.GetOptions{})
221
if err != nil {
222
return false, err
223
}
224
225
return depl.Status.ReadyReplicas >= 1, nil
226
227
})
228
if err != nil {
229
return nil, xerrors.Errorf("cannot wait for proxy pod: %w", err)
230
}
231
log.Info("proxy pod up and running")
232
233
proxyPods, err := client.CoreV1().Pods(namespace).List(
234
context.Background(),
235
metav1.ListOptions{
236
LabelSelector: metav1.FormatLabelSelector(metav1.SetAsLabelSelector(labels)),
237
},
238
)
239
if err != nil {
240
return nil, nil
241
}
242
if len(proxyPods.Items) == 0 {
243
return nil, xerrors.Errorf("no proxy pod found")
244
}
245
tppod := proxyPods.Items[0].Name
246
247
tpc, err := NewProxiedToxiproxy(cfg, namespace, tppod)
248
if err != nil {
249
return nil, xerrors.Errorf("cannot start proxy: %w", err)
250
}
251
252
for _, p := range oldService.Spec.Ports {
253
_, err := tpc.CreateProxy(targetService, fmt.Sprintf(":%d", p.TargetPort.IntVal), fmt.Sprintf("%s:%d", renamedService.Name, p.Port))
254
if err != nil {
255
return nil, xerrors.Errorf("cannot proxy port %d -> %d: %w", p.TargetPort.IntVal, p.Port, err)
256
}
257
log.WithField("port", p.Port).Infof("toxiproxy for port %d -> %d set up", p.TargetPort.IntVal, p.Port)
258
}
259
for tp, aps := range opts.AdditionalRoutes {
260
for _, ap := range aps {
261
_, err := tpc.CreateProxy(fmt.Sprintf("%s-%d", targetService, ap), fmt.Sprintf(":%d", ap), fmt.Sprintf("%s:%d", renamedService.Name, tp))
262
if err != nil {
263
log.WithField("targetPort", tp).WithField("additionalPort", ap).WithError(err).Warn("cannot proxy additional port")
264
}
265
log.WithField("port", ap).Infof("toxiproxy for port %d -> %d set up", ap, tp)
266
}
267
}
268
269
return tpc, nil
270
}
271
272
// Remove reverts the changes made by inject
273
func Remove(cfg *rest.Config, namespace, targetService string) error {
274
client, err := kubernetes.NewForConfig(cfg)
275
if err != nil {
276
return err
277
}
278
279
proxiedService, err := client.CoreV1().Services(namespace).Get(context.Background(), fmt.Sprintf(fmtOriginalService, targetService), metav1.GetOptions{})
280
if err != nil {
281
return err
282
}
283
log.WithField("name", proxiedService.Name).Info("original service found")
284
285
var (
286
deletionPolicy = metav1.DeletePropagationForeground
287
gracePeriod int64 = 30
288
)
289
err = client.CoreV1().Services(namespace).Delete(context.Background(), targetService, metav1.DeleteOptions{
290
GracePeriodSeconds: &gracePeriod,
291
PropagationPolicy: &deletionPolicy,
292
})
293
if err != nil {
294
return err
295
}
296
log.WithField("name", targetService).Info("proxy service deleted")
297
298
renamedSpec := proxiedService.Spec.DeepCopy()
299
renamedSpec.ClusterIP = ""
300
renamedMeta := proxiedService.ObjectMeta.DeepCopy()
301
renamedMeta.Name = targetService
302
renamedMeta.ResourceVersion = ""
303
renamedMeta.Labels = make(map[string]string)
304
for k, v := range proxiedService.ObjectMeta.Labels {
305
renamedKey := strings.TrimPrefix(k, renamedServiceLabelPrefix)
306
renamedMeta.Labels[renamedKey] = v
307
}
308
renamedService := &corev1.Service{
309
ObjectMeta: *renamedMeta,
310
Spec: *renamedSpec,
311
}
312
for i := 0; i < 10; i++ {
313
_, err = client.CoreV1().Services(namespace).Create(context.Background(), renamedService, metav1.CreateOptions{})
314
if err == nil {
315
break
316
}
317
318
if strings.Contains(err.Error(), "object is being deleted") {
319
log.WithField("name", renamedService.Name).Info("proxy service still exists - retrying in four seconds")
320
time.Sleep(4 * time.Second)
321
continue
322
}
323
return err
324
}
325
err = client.CoreV1().Services(namespace).Delete(context.Background(), proxiedService.Name,
326
metav1.DeleteOptions{PropagationPolicy: &deletionPolicy, GracePeriodSeconds: &gracePeriod})
327
if err != nil {
328
return err
329
}
330
log.WithField("name", proxiedService.Name).Info("original service renamed")
331
332
pdp := fmt.Sprintf(fmtProxyDeployment, targetService)
333
err = client.AppsV1().Deployments(namespace).Delete(context.Background(), pdp, metav1.DeleteOptions{
334
PropagationPolicy: &deletionPolicy,
335
GracePeriodSeconds: &gracePeriod,
336
})
337
if err != nil {
338
339
log.Info("tried to: proxy deployment deleted")
340
return err
341
}
342
log.WithField("name", pdp).Info("proxy deployment deleted")
343
344
return nil
345
}
346
347