Path: blob/main/test/pkg/integration/common/port_forward.go
2499 views
// Copyright (c) 2020 Gitpod GmbH. All rights reserved.1// Licensed under the GNU Affero General Public License (AGPL).2// See License.AGPL.txt in the project root for license information.34package common56import (7"bytes"8"context"9"errors"10"fmt"11"io"12"net"13"os/exec"14"strings"15"time"1617"k8s.io/apimachinery/pkg/util/wait"18)1920const (21errorDialingBackend = "error: error upgrading connection: error dialing backend: EOF"22)2324// ForwardPortOfPod establishes a TCP port forwarding to a Kubernetes pod25func ForwardPortOfPod(ctx context.Context, kubeconfig string, namespace, name, port string) (readychan chan struct{}, errchan chan error) {26return forwardPort(ctx, kubeconfig, namespace, "pod", name, port)27}2829// ForwardPortOfSvc establishes a TCP port forwarding to a Kubernetes service30func ForwardPortOfSvc(ctx context.Context, kubeconfig string, namespace, name, port string) (readychan chan struct{}, errchan chan error) {31return forwardPort(ctx, kubeconfig, namespace, "service", name, port)32}3334// forwardPort establishes a TCP port forwarding to a Kubernetes resource - pod or service35// Uses kubectl instead of Go to use a local process that can reproduce the same behavior outside the tests36// Since we are using kubectl directly we need to pass kubeconfig explicitly37func forwardPort(ctx context.Context, kubeconfig string, namespace, resourceType, name, port string) (readychan chan struct{}, errchan chan error) {38errchan = make(chan error, 1)39readychan = make(chan struct{}, 1)4041go func() {42args := []string{43"port-forward",44"--address=0.0.0.0",45fmt.Sprintf("%s/%v", resourceType, name),46fmt.Sprintf("--namespace=%v", namespace),47fmt.Sprintf("--kubeconfig=%v", kubeconfig),48port,49}5051command := exec.CommandContext(ctx, "kubectl", args...)52var serr, sout bytes.Buffer53command.Stdout = &sout54command.Stderr = &serr55err := command.Start()56defer func() {57if command.Process != nil {58_ = command.Process.Kill()59}60}()61if err != nil {62if strings.TrimSuffix(serr.String(), "\n") == errorDialingBackend {63errchan <- io.EOF64} else {65errchan <- fmt.Errorf("unexpected error string port-forward: %w", errors.New(serr.String()))66}67}6869err = command.Wait()70if err != nil {71if strings.TrimSuffix(serr.String(), "\n") == errorDialingBackend {72errchan <- io.EOF73} else {74errchan <- fmt.Errorf("unexpected error running port-forward: %w", errors.New(serr.String()))75}76}77}()7879// wait until we can reach the local port before signaling we are ready80go func() {81localPort := strings.Split(port, ":")[0]82waitErr := wait.PollImmediate(500*time.Millisecond, 1*time.Minute, func() (bool, error) {83conn, err := net.DialTimeout("tcp", net.JoinHostPort("localhost", localPort), 1*time.Second)84if err != nil {85return false, nil86}87if conn == nil {88return false, nil89}9091conn.Close()92return true, nil93})9495if waitErr == wait.ErrWaitTimeout {96errchan <- fmt.Errorf("timeout waiting for port-foward: %w", waitErr)97return98} else if waitErr != nil {99errchan <- waitErr100return101} else {102readychan <- struct{}{}103104}105}()106107return readychan, errchan108}109110111