Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/dev/blowtorch/pkg/dart/toxiproxy.go
2500 views
1
// Copyright (c) 2020 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package dart
6
7
import (
8
"bytes"
9
"context"
10
"fmt"
11
"math/rand"
12
"net/http"
13
"net/url"
14
"strings"
15
"sync"
16
17
toxiproxy "github.com/Shopify/toxiproxy/client"
18
log "github.com/sirupsen/logrus"
19
"golang.org/x/xerrors"
20
"k8s.io/client-go/rest"
21
"k8s.io/client-go/tools/portforward"
22
"k8s.io/client-go/transport/spdy"
23
)
24
25
const (
26
// port to toxiproxy's control plane API
27
toxiproxyPort = 8474
28
)
29
30
// ProxiedToxiproxy provides a connection to a toxiproxy pod
31
type ProxiedToxiproxy struct {
32
*toxiproxy.Client
33
closer func()
34
}
35
36
// NewProxiedToxiproxy connects to a Toxiproxy pod
37
func NewProxiedToxiproxy(cfg *rest.Config, namespace, pod string) (*ProxiedToxiproxy, error) {
38
localPort := rand.Intn(2000) + 31000
39
40
ctx, cancel := context.WithCancel(context.Background())
41
defer cancel()
42
readychan, errchan := forwardPort(ctx, cfg, namespace, pod, fmt.Sprintf("%d:%d", localPort, toxiproxyPort))
43
select {
44
case <-readychan:
45
case err := <-errchan:
46
return nil, err
47
}
48
log.WithField("port", localPort).Info("forwarding control API of toxiproxy")
49
50
tpc := toxiproxy.NewClient(fmt.Sprintf("localhost:%d", localPort))
51
return &ProxiedToxiproxy{
52
Client: tpc,
53
closer: cancel,
54
}, nil
55
}
56
57
// Close shuts down the connection to the toxiproxy pod
58
func (p *ProxiedToxiproxy) Close() error {
59
p.closer()
60
return nil
61
}
62
63
// ForwardPort establishes a TCP port forwarding to a Kubernetes pod
64
func forwardPort(ctx context.Context, config *rest.Config, namespace, pod, port string) (readychan chan struct{}, errchan chan error) {
65
errchan = make(chan error, 1)
66
readychan = make(chan struct{}, 1)
67
68
roundTripper, upgrader, err := spdy.RoundTripperFor(config)
69
if err != nil {
70
errchan <- err
71
return
72
}
73
74
path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward", namespace, pod)
75
hostIP := strings.TrimPrefix(config.Host, "https://")
76
serverURL := url.URL{Scheme: "https", Path: path, Host: hostIP}
77
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: roundTripper}, http.MethodPost, &serverURL)
78
79
stopChan := make(chan struct{}, 1)
80
fwdReadyChan := make(chan struct{}, 1)
81
out, errOut := new(bytes.Buffer), new(bytes.Buffer)
82
forwarder, err := portforward.New(dialer, []string{port}, stopChan, fwdReadyChan, out, errOut)
83
if err != nil {
84
panic(err)
85
}
86
87
var once sync.Once
88
go func() {
89
err := forwarder.ForwardPorts()
90
if err != nil {
91
errchan <- err
92
}
93
once.Do(func() { close(readychan) })
94
}()
95
96
go func() {
97
select {
98
case <-readychan:
99
// we're out of here
100
case <-ctx.Done():
101
close(stopChan)
102
}
103
}()
104
105
go func() {
106
for range fwdReadyChan {
107
}
108
109
if errOut.Len() != 0 {
110
errchan <- xerrors.Errorf(errOut.String())
111
return
112
}
113
114
once.Do(func() { close(readychan) })
115
}()
116
117
return
118
}
119
120