Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/test/pkg/integration/integration.go
2498 views
1
// Copyright (c) 2020 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package integration
6
7
import (
8
"bytes"
9
"context"
10
"encoding/json"
11
"errors"
12
"fmt"
13
"io"
14
"net"
15
"net/rpc"
16
"os"
17
"os/exec"
18
"path"
19
"path/filepath"
20
"runtime"
21
"strconv"
22
"strings"
23
"sync"
24
"syscall"
25
"time"
26
27
"golang.org/x/xerrors"
28
"google.golang.org/grpc/codes"
29
"google.golang.org/grpc/status"
30
corev1 "k8s.io/api/core/v1"
31
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32
"k8s.io/apimachinery/pkg/runtime/schema"
33
"k8s.io/apimachinery/pkg/runtime/serializer"
34
"k8s.io/apimachinery/pkg/util/wait"
35
"k8s.io/cli-runtime/pkg/genericclioptions"
36
"k8s.io/client-go/kubernetes"
37
"k8s.io/client-go/kubernetes/scheme"
38
"k8s.io/client-go/rest"
39
"k8s.io/kubectl/pkg/cmd/cp"
40
kubectlcp "k8s.io/kubectl/pkg/cmd/cp"
41
kubectlexec "k8s.io/kubectl/pkg/cmd/exec"
42
"k8s.io/kubectl/pkg/cmd/util"
43
"sigs.k8s.io/e2e-framework/klient"
44
45
"github.com/gitpod-io/gitpod/common-go/log"
46
ide "github.com/gitpod-io/gitpod/ide-service-api/config"
47
"github.com/gitpod-io/gitpod/test/pkg/integration/common"
48
)
49
50
const (
51
connectFailureMaxTries = 5
52
errorDialingBackendEOF = "error dialing backend: EOF"
53
)
54
55
var (
56
errorNoPods = fmt.Errorf("no pods found")
57
)
58
59
type PodExec struct {
60
RestConfig *rest.Config
61
*kubernetes.Clientset
62
}
63
64
func NewPodExec(config rest.Config, clientset *kubernetes.Clientset) *PodExec {
65
config.APIPath = "/api" // Make sure we target /api and not just /
66
config.GroupVersion = &schema.GroupVersion{Version: "v1"} // this targets the core api groups so the url path will be /api/v1
67
config.NegotiatedSerializer = serializer.WithoutConversionCodecFactory{CodecFactory: scheme.Codecs}
68
return &PodExec{
69
RestConfig: &config,
70
Clientset: clientset,
71
}
72
}
73
74
func (p *PodExec) PodCopyFile(src string, dst string, containername string) (*bytes.Buffer, *bytes.Buffer, *bytes.Buffer, error) {
75
var in, out, errOut *bytes.Buffer
76
var ioStreams genericclioptions.IOStreams
77
for count := 0; ; count++ {
78
ioStreams, in, out, errOut = genericclioptions.NewTestIOStreams()
79
copyOptions := kubectlcp.NewCopyOptions(ioStreams)
80
copyOptions.ClientConfig = p.RestConfig
81
copyOptions.Container = containername
82
configFlags := genericclioptions.NewConfigFlags(false)
83
f := util.NewFactory(configFlags)
84
cmd := cp.NewCmdCp(f, ioStreams)
85
err := copyOptions.Complete(f, cmd, []string{src, dst})
86
if err != nil {
87
return nil, nil, nil, err
88
}
89
90
c := rest.CopyConfig(p.RestConfig)
91
cs, err := kubernetes.NewForConfig(c)
92
if err != nil {
93
return nil, nil, nil, err
94
}
95
96
copyOptions.ClientConfig = c
97
copyOptions.Clientset = cs
98
99
err = copyOptions.Run()
100
if err != nil {
101
if !shouldRetry(count, err) {
102
return nil, nil, nil, fmt.Errorf("could not run copy operation: %v. Stdout: %v, Stderr: %v", err, out.String(), errOut.String())
103
}
104
time.Sleep(10 * time.Second)
105
continue
106
}
107
break
108
}
109
return in, out, errOut, nil
110
}
111
112
func shouldRetry(count int, err error) bool {
113
if count < connectFailureMaxTries {
114
return err.Error() == errorDialingBackendEOF
115
}
116
return false
117
}
118
119
func (p *PodExec) ExecCmd(command []string, podname string, namespace string, containername string) (*bytes.Buffer, *bytes.Buffer, *bytes.Buffer, error) {
120
var (
121
in, out, errOut *bytes.Buffer
122
ioStreams genericclioptions.IOStreams
123
)
124
for count := 0; ; count++ {
125
ioStreams, in, out, errOut = genericclioptions.NewTestIOStreams()
126
execOptions := &kubectlexec.ExecOptions{
127
StreamOptions: kubectlexec.StreamOptions{
128
IOStreams: ioStreams,
129
Namespace: namespace,
130
PodName: podname,
131
ContainerName: containername,
132
},
133
134
Command: command,
135
Executor: &kubectlexec.DefaultRemoteExecutor{},
136
PodClient: p.Clientset.CoreV1(),
137
Config: p.RestConfig,
138
}
139
err := execOptions.Run()
140
if err != nil {
141
if !shouldRetry(count, err) {
142
return nil, nil, nil, fmt.Errorf("could not run exec operation: %v", err)
143
}
144
time.Sleep(30 * time.Second)
145
continue
146
}
147
break
148
}
149
150
return in, out, errOut, nil
151
}
152
153
// InstrumentOption configures an Instrument call
154
type InstrumentOption func(*instrumentOptions) error
155
156
type instrumentOptions struct {
157
SPO selectPodOptions
158
WorkspacekitLift bool
159
}
160
161
type selectPodOptions struct {
162
InstanceID string
163
164
Container string
165
}
166
167
// WithInstanceID provides a hint during pod selection for Instrument.
168
// When instrumenting ws-daemon, we try to select the daemon on the node where the workspace is located.
169
// When instrumenting the workspace, we select the workspace based on the instance ID.
170
// For all other component types, this hint is ignored.
171
func WithInstanceID(instanceID string) InstrumentOption {
172
return func(io *instrumentOptions) error {
173
io.SPO.InstanceID = instanceID
174
return nil
175
}
176
}
177
178
// Container provides a hint during pod selection for Instrument a particular container
179
func WithContainer(container string) InstrumentOption {
180
return func(io *instrumentOptions) error {
181
io.SPO.Container = container
182
return nil
183
}
184
}
185
186
// WithWorkspacekitLift executes the agent using `workspacekit lift` thereby lifting it into ring1.
187
// Only relevant for ComponentWorkspace and ignored for all other components.
188
// Defaults to true.
189
func WithWorkspacekitLift(lift bool) InstrumentOption {
190
return func(io *instrumentOptions) error {
191
io.WorkspacekitLift = lift
192
return nil
193
}
194
}
195
196
// The rpc is brittle because it installs the agent in the workspace and port-foward to send instructions. Therefore, wrap and retry.
197
type RpcClient struct {
198
client *rpc.Client
199
component ComponentType
200
agentName string
201
namespace string
202
kubeconfig string
203
kclient klient.Client
204
opts []InstrumentOption
205
}
206
207
func (r *RpcClient) Call(serviceMethod string, args any, reply any) error {
208
var err error
209
cl := r
210
for i := 0; i < connectFailureMaxTries; i++ {
211
if cl == nil {
212
cl, _, err = Instrument(r.component, r.agentName, r.namespace, r.kubeconfig, r.kclient, r.opts...)
213
if err != nil {
214
log.Warnf("failed to re-instrument (attempt %d): %v", i, err)
215
time.Sleep(10 * time.Second)
216
continue
217
}
218
}
219
220
err = cl.client.Call(serviceMethod, args, reply)
221
if err == nil {
222
return nil
223
}
224
225
log.Warnf("rpc call %s failed (attempt %d): %v", serviceMethod, i, err)
226
if i == connectFailureMaxTries-1 {
227
return err
228
}
229
230
time.Sleep(10 * time.Second)
231
cl.Close()
232
cl = nil // Try to Instrument again next attempt
233
}
234
return err
235
}
236
237
func (r *RpcClient) Close() error {
238
return r.client.Close()
239
}
240
241
// Instrument builds and uploads an agent to a pod, then connects to its RPC service.
242
// We first check if there's an executable in the path named `gitpod-integration-test-<agentName>-agent`.
243
// If there isn't, we attempt to build `<agentName>_agent/main.go`.
244
// The binary is copied to the destination pod, started and port-forwarded. Then we
245
// create an RPC client.
246
func Instrument(component ComponentType, agentName string, namespace string, kubeconfig string, client klient.Client, opts ...InstrumentOption) (*RpcClient, []func() error, error) {
247
var closer []func() error
248
249
options := instrumentOptions{
250
WorkspacekitLift: true,
251
}
252
for _, o := range opts {
253
err := o(&options)
254
if err != nil {
255
return nil, closer, err
256
}
257
}
258
259
var (
260
res *rpc.Client
261
clientConfig *kubernetes.Clientset
262
cl []func() error
263
podName string
264
containerName string
265
err error
266
)
267
for i := 0; i < connectFailureMaxTries; i++ {
268
expectedBinaryName := fmt.Sprintf("gitpod-integration-test-%d-%s-agent", i, agentName)
269
agentLoc, _ := exec.LookPath(expectedBinaryName)
270
if agentLoc == "" {
271
var err error
272
agentLoc, err = buildAgent(agentName)
273
if err != nil {
274
return nil, closer, fmt.Errorf("failed to build agent: %w", err)
275
}
276
}
277
278
podName, containerName, err = selectPod(component, options.SPO, namespace, client)
279
if err != nil {
280
if errors.Is(err, errorNoPods) {
281
// When there are no pods, assume that the component has already
282
// stopped, so return the error and don't retry.
283
return nil, closer, err
284
}
285
time.Sleep(10 * time.Second)
286
continue
287
}
288
289
clientConfig, err = kubernetes.NewForConfig(client.RESTConfig())
290
if err != nil {
291
return nil, closer, err
292
}
293
podExec := NewPodExec(*client.RESTConfig(), clientConfig)
294
295
tgtFN := filepath.Base(agentLoc)
296
_, _, _, err = podExec.PodCopyFile(agentLoc, fmt.Sprintf("%s/%s:/home/gitpod/%s", namespace, podName, tgtFN), containerName)
297
if err != nil {
298
log.WithError(err).Warnf("failed to copy agent to pod (attempt %d)", i)
299
time.Sleep(10 * time.Second)
300
continue
301
}
302
303
res, cl, err = portfw(podExec, kubeconfig, podName, namespace, containerName, tgtFN, options)
304
if err != nil {
305
var serror error
306
waitErr := wait.PollImmediate(1*time.Second, 2*time.Minute, func() (bool, error) {
307
serror = shutdownAgent(podExec, kubeconfig, podName, namespace, containerName)
308
if serror != nil {
309
if strings.Contains(serror.Error(), "exit code 7") {
310
serror = nil
311
return true, nil
312
}
313
return false, nil
314
}
315
return true, nil
316
})
317
if waitErr == wait.ErrWaitTimeout {
318
return nil, closer, xerrors.Errorf("timed out attempting to shutdown agent: %v", serror)
319
} else if waitErr != nil {
320
return nil, closer, waitErr
321
}
322
323
if serror != nil {
324
return nil, closer, serror
325
}
326
for _, c := range cl {
327
_ = c()
328
}
329
330
continue
331
}
332
break
333
}
334
if err != nil {
335
for _, c := range cl {
336
_ = c()
337
}
338
return nil, closer, err
339
}
340
341
closer = append(closer, cl...)
342
closer = append(closer, func() error {
343
if res != nil {
344
err := res.Call(MethodTestAgentShutdown, new(TestAgentShutdownRequest), new(TestAgentShutdownResponse))
345
if err != nil && strings.Contains(err.Error(), "connection is shut down") {
346
return nil
347
}
348
349
if err != nil {
350
return xerrors.Errorf("cannot shutdown agent: %w", err)
351
}
352
}
353
return nil
354
})
355
356
return &RpcClient{
357
client: res,
358
component: component,
359
agentName: agentName,
360
namespace: namespace,
361
kubeconfig: kubeconfig,
362
kclient: client,
363
opts: opts,
364
}, closer, nil
365
}
366
367
func portfw(podExec *PodExec, kubeconfig string, podName string, namespace string, containerName string, tgtFN string, options instrumentOptions) (*rpc.Client, []func() error, error) {
368
var closer []func() error
369
370
localAgentPort, err := getFreePort()
371
if err != nil {
372
return nil, closer, err
373
}
374
375
cmd := []string{filepath.Join("/home/gitpod/", tgtFN), "-rpc-port", strconv.Itoa(localAgentPort)}
376
if options.WorkspacekitLift {
377
cmd = append([]string{"/.supervisor/workspacekit", "lift"}, cmd...)
378
}
379
380
execErrs := make(chan error, 1)
381
go func() {
382
defer close(execErrs)
383
_, _, _, execErr := podExec.ExecCmd(cmd, podName, namespace, containerName)
384
if execErr != nil {
385
execErrs <- execErr
386
}
387
}()
388
389
ctx, cancel := context.WithCancel(context.Background())
390
defer func() {
391
if err == nil {
392
closer = append(closer, func() error {
393
cancel()
394
return nil
395
})
396
} else {
397
cancel()
398
}
399
}()
400
L:
401
for {
402
fwdReady, fwdErr := common.ForwardPortOfPod(ctx, kubeconfig, namespace, podName, strconv.Itoa(localAgentPort))
403
404
select {
405
case <-time.After(2 * time.Minute):
406
cancel()
407
return nil, closer, xerrors.New("timeout")
408
case <-fwdReady:
409
break L
410
case err = <-execErrs:
411
return nil, closer, xerrors.Errorf("failure of port-fowarding: %w", err)
412
case err = <-fwdErr:
413
var eno syscall.Errno
414
if errors.Is(err, io.EOF) || (errors.As(err, &eno) && eno == syscall.ECONNREFUSED) {
415
time.Sleep(5 * time.Second)
416
} else if st, ok := status.FromError(err); ok && st.Code() == codes.Unavailable {
417
time.Sleep(5 * time.Second)
418
} else if err != nil {
419
return nil, closer, err
420
}
421
}
422
}
423
424
var res *rpc.Client
425
var lastError error
426
waitErr := wait.PollImmediate(500*time.Millisecond, 1*time.Minute, func() (bool, error) {
427
res, lastError = rpc.DialHTTP("tcp", net.JoinHostPort("localhost", strconv.Itoa(localAgentPort)))
428
if lastError != nil {
429
return false, nil
430
}
431
return true, nil
432
})
433
if waitErr == wait.ErrWaitTimeout {
434
cancel()
435
return nil, closer, xerrors.Errorf("timed out attempting to connect agent: %v", lastError)
436
} else if waitErr != nil {
437
cancel()
438
return nil, closer, waitErr
439
}
440
441
return res, closer, nil
442
}
443
444
func shutdownAgent(podExec *PodExec, kubeconfig string, podName string, namespace string, containerName string) error {
445
cmd := []string{"curl", "localhost:8080/shutdown"}
446
_, _, _, err := podExec.ExecCmd(cmd, podName, namespace, containerName)
447
if err != nil {
448
return fmt.Errorf("curl failed: %v", err)
449
}
450
return nil
451
}
452
453
func getFreePort() (int, error) {
454
l, err := net.Listen("tcp", "localhost:0")
455
if err != nil {
456
return 0, err
457
}
458
defer l.Close()
459
460
result, err := net.ResolveTCPAddr("tcp", l.Addr().String())
461
if err != nil {
462
return 0, err
463
}
464
465
return result.Port, nil
466
}
467
468
type agentBuildResult struct {
469
agentLoc string
470
err error
471
}
472
473
var (
474
buildOnce = make(map[string]*sync.Once)
475
buildMu sync.Mutex
476
builtAgents = make(map[string]agentBuildResult)
477
)
478
479
func buildAgent(name string) (loc string, err error) {
480
buildMu.Lock()
481
once, ok := buildOnce[name]
482
if !ok {
483
once = &sync.Once{}
484
buildOnce[name] = once
485
}
486
buildMu.Unlock()
487
488
once.Do(func() {
489
loc, err = doBuildAgent(name)
490
builtAgents[name] = agentBuildResult{
491
agentLoc: loc,
492
err: err,
493
}
494
})
495
496
res, ok := builtAgents[name]
497
if !ok {
498
return "", xerrors.Errorf("expected agent build result but got none: %w", err)
499
}
500
return res.agentLoc, res.err
501
}
502
503
func doBuildAgent(name string) (loc string, err error) {
504
log.Infof("building agent %s", name)
505
defer func() {
506
if err != nil {
507
err = xerrors.Errorf("cannot build agent: %w", err)
508
}
509
}()
510
511
_, filename, _, _ := runtime.Caller(0)
512
src := path.Join(path.Dir(filename), "..", "agent", name, "main.go")
513
if _, err := os.Stat(src); err != nil {
514
return "", err
515
}
516
517
f, err := os.CreateTemp("", fmt.Sprintf("gitpod-integration-test-%s-*", name))
518
if err != nil {
519
return "", err
520
}
521
f.Close()
522
523
cmd := exec.Command("go", "build", "-trimpath", "-ldflags", "-buildid= -w -s", "-o", f.Name(), src)
524
cmd.Env = append(os.Environ(),
525
"CGO_ENABLED=0",
526
)
527
out, err := cmd.CombinedOutput()
528
if err != nil {
529
return "", xerrors.Errorf("%w: %s", err, string(out))
530
}
531
532
return f.Name(), nil
533
}
534
535
func selectPod(component ComponentType, options selectPodOptions, namespace string, client klient.Client) (string, string, error) {
536
clientSet, err := kubernetes.NewForConfig(client.RESTConfig())
537
if err != nil {
538
return "", "", err
539
}
540
541
listOptions := metav1.ListOptions{
542
LabelSelector: "component=" + string(component),
543
}
544
545
if component == ComponentWorkspace && options.InstanceID != "" {
546
listOptions.LabelSelector = "component=workspace,workspaceID=" + options.InstanceID
547
}
548
549
if component == ComponentWorkspaceDaemon && options.InstanceID != "" {
550
pods, err := clientSet.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{
551
LabelSelector: "component=workspace,workspaceID=" + options.InstanceID,
552
})
553
if err != nil {
554
return "", "", xerrors.Errorf("cannot list pods: %w", err)
555
}
556
557
if len(pods.Items) == 0 {
558
return "", "", xerrors.Errorf("no workspace pod for instance %s", options.InstanceID)
559
}
560
561
listOptions.FieldSelector = "spec.nodeName=" + pods.Items[0].Spec.NodeName
562
}
563
564
pods, err := clientSet.CoreV1().Pods(namespace).List(context.Background(), listOptions)
565
if err != nil {
566
return "", "", xerrors.Errorf("cannot list pods: %w", err)
567
}
568
569
if len(pods.Items) == 0 {
570
return "", "", xerrors.Errorf("no pods for %s: %w", component, errorNoPods)
571
}
572
573
p := pods.Items[0]
574
err = waitForPodRunningReady(clientSet, p.Name, namespace, 10*time.Second)
575
if err != nil {
576
return "", "", xerrors.Errorf("pods for component %s is not running", component)
577
}
578
579
var container string
580
if options.Container != "" {
581
var found bool
582
for _, container := range p.Spec.Containers {
583
if container.Name == options.Container {
584
found = true
585
break
586
}
587
}
588
589
if !found {
590
return "", "", xerrors.Errorf("no container name %s found", options.Container)
591
}
592
593
container = options.Container
594
}
595
return p.Name, container, nil
596
}
597
598
// ServerConfigPartial is the subset of server config we're using for integration tests.
599
// Ideally we're using a definition derived from the config interface, someday...
600
// NOTE: keep in sync with chart/templates/server-configmap.yaml
601
type ServerConfigPartial struct {
602
HostURL string `json:"hostUrl"`
603
WorkspaceDefaults struct {
604
WorkspaceImage string `json:"workspaceImage"`
605
} `json:"workspaceDefaults"`
606
Session struct {
607
Secret string `json:"secret"`
608
} `json:"session"`
609
}
610
611
func GetServerConfig(namespace string, client klient.Client) (*ServerConfigPartial, error) {
612
var cm corev1.ConfigMap
613
err := client.Resources().Get(context.Background(), "server-config", namespace, &cm)
614
if err != nil {
615
return nil, err
616
}
617
618
key := "config.json"
619
configJson, ok := cm.Data[key]
620
if !ok {
621
return nil, fmt.Errorf("key %s not found", key)
622
}
623
624
var config ServerConfigPartial
625
err = json.Unmarshal([]byte(configJson), &config)
626
if err != nil {
627
return nil, fmt.Errorf("error unmarshalling server config: %v", err)
628
}
629
return &config, nil
630
}
631
632
func GetIDEConfig(namespace string, client klient.Client) (*ide.IDEConfig, error) {
633
var cm corev1.ConfigMap
634
err := client.Resources().Get(context.Background(), "ide-config", namespace, &cm)
635
if err != nil {
636
return nil, err
637
}
638
639
key := "config.json"
640
configJson, ok := cm.Data[key]
641
if !ok {
642
return nil, fmt.Errorf("key %s not found", key)
643
}
644
645
var config ide.IDEConfig
646
err = json.Unmarshal([]byte(configJson), &config)
647
if err != nil {
648
return nil, fmt.Errorf("error unmarshalling server IDE config: %v", err)
649
}
650
return &config, nil
651
}
652
653
// ComponentType denotes a Gitpod component
654
type ComponentType string
655
656
const (
657
// ComponentWorkspaceDaemon points to the workspace daemon
658
ComponentWorkspaceDaemon ComponentType = "ws-daemon"
659
// ComponentWorkspaceManager points to the workspace manager
660
ComponentWorkspaceManager ComponentType = "ws-manager"
661
// ComponentWorkspaceManagerMK2 points to the MK2 workspace manager
662
ComponentWorkspaceManagerMK2 ComponentType = "ws-manager-mk2"
663
// ComponentContentService points to the content service
664
ComponentContentService ComponentType = "content-service"
665
// ComponentWorkspace points to a workspace
666
ComponentWorkspace ComponentType = "workspace"
667
// ComponentImageBuilderMK3 points to the image-builder-mk3
668
ComponentImageBuilderMK3 ComponentType = "image-builder-mk3"
669
)
670
671
func waitForPodRunningReady(c kubernetes.Interface, podName string, namespace string, timeout time.Duration) error {
672
return wait.PollImmediate(time.Second, timeout, isPodRunningReady(c, podName, namespace))
673
}
674
675
func isPodRunningReady(c kubernetes.Interface, podName string, namespace string) wait.ConditionFunc {
676
return func() (bool, error) {
677
pod, err := c.CoreV1().Pods(namespace).Get(context.Background(), podName, metav1.GetOptions{})
678
if err != nil {
679
return false, err
680
}
681
682
if pod.Status.Phase != corev1.PodRunning {
683
return false, nil
684
}
685
686
return isPodReady(&pod.Status), nil
687
}
688
}
689
690
func isPodReady(s *corev1.PodStatus) bool {
691
for i := range s.Conditions {
692
if s.Conditions[i].Type == corev1.PodReady {
693
return s.Conditions[i].Status == corev1.ConditionTrue
694
}
695
}
696
697
return false
698
}
699
700