package integration
import (
"context"
"fmt"
"io"
"strings"
"sync"
"testing"
"time"
"github.com/google/uuid"
"golang.org/x/xerrors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/util/wait"
"github.com/gitpod-io/gitpod/common-go/namegen"
csapi "github.com/gitpod-io/gitpod/content-service/api"
protocol "github.com/gitpod-io/gitpod/gitpod-protocol"
ide "github.com/gitpod-io/gitpod/ide-service-api/config"
imgbldr "github.com/gitpod-io/gitpod/image-builder/api"
wsmanapi "github.com/gitpod-io/gitpod/ws-manager/api"
)
const (
gitpodBuiltinUserID = "builtin-user-workspace-probe-0000000"
perCallTimeout = 5 * time.Minute
ParallelLunchableWorkspaceLimit = 10
)
var (
ErrWorkspaceInstanceStopping = fmt.Errorf("workspace instance is stopping")
ErrWorkspaceInstanceStopped = fmt.Errorf("workspace instance has stopped")
parallelLimiter = make(chan struct{}, ParallelLunchableWorkspaceLimit)
)
type launchWorkspaceDirectlyOptions struct {
BaseImage string
IdeImage string
Mods []func(*wsmanapi.StartWorkspaceRequest) error
WaitForOpts []WaitForWorkspaceOpt
}
type LaunchWorkspaceDirectlyOpt func(*launchWorkspaceDirectlyOptions) error
func WithoutWorkspaceImage() LaunchWorkspaceDirectlyOpt {
return func(lwdo *launchWorkspaceDirectlyOptions) error {
lwdo.BaseImage = ""
return nil
}
}
func WithBaseImage(baseImage string) LaunchWorkspaceDirectlyOpt {
return func(lwdo *launchWorkspaceDirectlyOptions) error {
lwdo.BaseImage = baseImage
return nil
}
}
func WithIDEImage(ideImage string) LaunchWorkspaceDirectlyOpt {
return func(lwdo *launchWorkspaceDirectlyOptions) error {
lwdo.IdeImage = ideImage
return nil
}
}
func WithRequestModifier(mod func(*wsmanapi.StartWorkspaceRequest) error) LaunchWorkspaceDirectlyOpt {
return func(lwdo *launchWorkspaceDirectlyOptions) error {
lwdo.Mods = append(lwdo.Mods, mod)
return nil
}
}
func WithWaitWorkspaceForOpts(opt ...WaitForWorkspaceOpt) LaunchWorkspaceDirectlyOpt {
return func(lwdo *launchWorkspaceDirectlyOptions) error {
lwdo.WaitForOpts = opt
return nil
}
}
type LaunchWorkspaceDirectlyResult struct {
Req *wsmanapi.StartWorkspaceRequest
WorkspaceID string
IdeURL string
LastStatus *wsmanapi.WorkspaceStatus
}
type StopWorkspaceFunc = func(waitForStop bool, api *ComponentAPI) (*wsmanapi.WorkspaceStatus, error)
func LaunchWorkspaceDirectly(t *testing.T, ctx context.Context, api *ComponentAPI, opts ...LaunchWorkspaceDirectlyOpt) (*LaunchWorkspaceDirectlyResult, StopWorkspaceFunc, error) {
var stopWs StopWorkspaceFunc = nil
options := launchWorkspaceDirectlyOptions{
BaseImage: "docker.io/gitpod/workspace-full:latest",
}
for _, o := range opts {
err := o(&options)
if err != nil {
return nil, nil, err
}
}
instanceID, err := uuid.NewRandom()
if err != nil {
return nil, nil, err
}
workspaceID, err := namegen.GenerateWorkspaceID()
if err != nil {
return nil, nil, err
}
parallelLimiter <- struct{}{}
defer func() {
if err != nil && stopWs == nil {
t.Logf("unlock the parallelLimiter because of error during starting the workspace: %v", err)
<-parallelLimiter
}
}()
var workspaceImage string
if options.BaseImage != "" {
for i := 0; i < 3; i++ {
workspaceImage, err = resolveOrBuildImage(ctx, api, options.BaseImage)
if st, ok := status.FromError(err); ok && st.Code() == codes.Unavailable {
api.ClearImageBuilderClientCache()
time.Sleep(5 * time.Second)
continue
} else if err != nil && strings.Contains(err.Error(), "the server is currently unable to handle the request") {
api.ClearImageBuilderClientCache()
time.Sleep(5 * time.Second)
continue
} else if err != nil && strings.Contains(err.Error(), "apiserver not ready") {
api.ClearImageBuilderClientCache()
time.Sleep(5 * time.Second)
continue
} else if err != nil {
time.Sleep(5 * time.Second)
continue
}
break
}
if err != nil {
return nil, nil, err
}
}
waitErr := wait.PollImmediate(1*time.Second, 2*time.Minute, func() (bool, error) {
workspaceImage, err = resolveOrBuildImage(ctx, api, options.BaseImage)
if st, ok := status.FromError(err); ok && st.Code() == codes.Unavailable {
api.ClearImageBuilderClientCache()
return false, nil
} else if err != nil && strings.Contains(err.Error(), "the server is currently unable to handle the request") {
api.ClearImageBuilderClientCache()
return false, nil
} else if err != nil && strings.Contains(err.Error(), "apiserver not ready") {
api.ClearImageBuilderClientCache()
return false, nil
} else if err != nil {
return false, nil
}
return true, nil
})
if waitErr == wait.ErrWaitTimeout {
return nil, nil, fmt.Errorf("timeout waiting for resolving the build image: %w", waitErr)
} else if waitErr != nil {
return nil, nil, waitErr
} else if err != nil {
return nil, nil, err
} else if workspaceImage == "" {
err = xerrors.Errorf("cannot start workspaces without a workspace image (required by registry-facade resolver)")
return nil, nil, err
}
ideImage := options.IdeImage
ideImageLayers := make([]string, 0)
if ideImage == "" {
var cfg *ide.IDEConfig
for i := 0; i < 3; i++ {
cfg, err = GetIDEConfig(api.namespace, api.client)
if err != nil {
continue
}
}
if err != nil {
return nil, nil, xerrors.Errorf("cannot find server IDE config: %w", err)
}
ideImage = cfg.IdeOptions.Options["code"].Image
ideImageLayers = cfg.IdeOptions.Options["code"].ImageLayers
if ideImage == "" {
err = xerrors.Errorf("cannot start workspaces without an IDE image (required by registry-facade resolver)")
return nil, nil, err
}
}
req := &wsmanapi.StartWorkspaceRequest{
Id: instanceID.String(),
ServicePrefix: workspaceID,
Metadata: &wsmanapi.WorkspaceMetadata{
Owner: gitpodBuiltinUserID,
MetaId: workspaceID,
},
Type: wsmanapi.WorkspaceType_REGULAR,
Spec: &wsmanapi.StartWorkspaceSpec{
WorkspaceImage: workspaceImage,
IdeImage: &wsmanapi.IDEImage{
WebRef: ideImage,
},
IdeImageLayers: ideImageLayers,
WorkspaceLocation: "/",
Timeout: "30m",
Initializer: &csapi.WorkspaceInitializer{
Spec: &csapi.WorkspaceInitializer_Empty{
Empty: &csapi.EmptyInitializer{},
},
},
Git: &wsmanapi.GitSpec{
Username: "integration-test",
Email: "[email protected]",
},
Admission: wsmanapi.AdmissionLevel_ADMIT_OWNER_ONLY,
Envvars: []*wsmanapi.EnvironmentVariable{
{
Name: "VSX_REGISTRY_URL",
Value: "https://open-vsx.gitpod.io/",
},
},
},
}
for _, m := range options.Mods {
err := m(req)
if err != nil {
return nil, nil, err
}
}
t.Log("prepare for a connection with ws-manager")
wsm, err := api.WorkspaceManager()
if err != nil {
return nil, nil, xerrors.Errorf("cannot start workspace manager: %w", err)
}
t.Log("established a connection with ws-manager")
var sresp *wsmanapi.StartWorkspaceResponse
for i := 0; i < 3; i++ {
t.Logf("attempt to start up the workspace directly: %s, %s", instanceID, workspaceID)
sresp, err = wsm.StartWorkspace(ctx, req)
if err != nil {
scode := status.Code(err)
if scode == codes.NotFound || scode == codes.Unavailable {
t.Log("retry strarting a workspace because cannnot start workspace: %w", err)
time.Sleep(1 * time.Second)
api.ClearWorkspaceManagerClientCache()
wsm, err = api.WorkspaceManager()
if err != nil {
return nil, nil, xerrors.Errorf("cannot start workspace manager: %w", err)
}
continue
}
if strings.Contains(err.Error(), "too many requests") {
t.Log("hit too many requests so retry after some seconds")
time.Sleep(30 * time.Second)
continue
}
err = xerrors.Errorf("cannot start workspace: %w", err)
return nil, nil, err
}
break
}
t.Log("successfully sent workspace start request")
stopWs = stopWsF(t, req.Id, req.Metadata.MetaId, api, req.Type == wsmanapi.WorkspaceType_PREBUILD)
defer func() {
if err != nil {
_, _ = stopWs(true, api)
}
}()
t.Log("wait for workspace to be fully up and running")
lastStatus, err := WaitForWorkspaceStart(t, ctx, req.Id, req.Metadata.MetaId, api, options.WaitForOpts...)
if err != nil {
return nil, nil, xerrors.Errorf("cannot wait for workspace start: %w", err)
}
t.Log("successful launch of the workspace")
return &LaunchWorkspaceDirectlyResult{
Req: req,
WorkspaceID: workspaceID,
IdeURL: sresp.Url,
LastStatus: lastStatus,
}, stopWs, nil
}
func LaunchWorkspaceFromContextURL(t *testing.T, ctx context.Context, contextURL string, username string, api *ComponentAPI, serverOpts ...GitpodServerOpt) (*protocol.WorkspaceInfo, StopWorkspaceFunc, error) {
return LaunchWorkspaceWithOptions(t, ctx, &LaunchWorkspaceOptions{
ContextURL: contextURL,
}, username, api, serverOpts...)
}
type LaunchWorkspaceOptions struct {
ContextURL string
ProjectID string
IDESettings *protocol.IDESettings
}
func LaunchWorkspaceWithOptions(t *testing.T, ctx context.Context, opts *LaunchWorkspaceOptions, username string, api *ComponentAPI, serverOpts ...GitpodServerOpt) (*protocol.WorkspaceInfo, StopWorkspaceFunc, error) {
var (
defaultServerOpts []GitpodServerOpt
stopWs StopWorkspaceFunc = nil
err error
)
if username != "" {
defaultServerOpts = []GitpodServerOpt{WithGitpodUser(username)}
}
parallelLimiter <- struct{}{}
defer func() {
if err != nil && stopWs == nil {
<-parallelLimiter
}
}()
server, err := api.GitpodServer(append(defaultServerOpts, serverOpts...)...)
if err != nil {
return nil, nil, xerrors.Errorf("cannot start server: %w", err)
}
cctx, ccancel := context.WithTimeout(context.Background(), perCallTimeout)
defer ccancel()
var resp *protocol.WorkspaceCreationResult
for i := 0; i < 3; i++ {
u, _ := api.GetUserId(username)
t.Logf("attempt to create the workspace as user %v, with context %v\n", u, opts.ContextURL)
teams, _ := server.GetTeams(cctx)
var orgId string
if len(teams) == 0 {
team, err := server.CreateTeam(cctx, "test-team")
if err != nil {
return nil, nil, xerrors.Errorf("cannot create team: %w", err)
}
orgId = team.ID
} else {
orgId = teams[0].ID
}
resp, err = server.CreateWorkspace(cctx, &protocol.CreateWorkspaceOptions{
ContextURL: opts.ContextURL,
ProjectId: opts.ProjectID,
OrganizationId: orgId,
IgnoreRunningWorkspaceOnSameCommit: true,
StartWorkspaceOptions: protocol.StartWorkspaceOptions{
IdeSettings: opts.IDESettings,
},
})
if err != nil {
scode := status.Code(err)
if scode == codes.NotFound || scode == codes.Unavailable {
t.Log("retry strarting a workspace because cannnot start workspace: %w", err)
time.Sleep(1 * time.Second)
api.ClearGitpodServerClientCache()
server, err = api.GitpodServer(append(defaultServerOpts, serverOpts...)...)
if err != nil {
return nil, nil, xerrors.Errorf("cannot start server: %w", err)
}
continue
}
if strings.Contains(err.Error(), "too many requests") {
t.Log("hit too many requests so retry after some seconds")
time.Sleep(30 * time.Second)
continue
}
return nil, nil, xerrors.Errorf("cannot start workspace: %w", err)
}
break
}
t.Logf("attempt to get the workspace information: %s", resp.CreatedWorkspaceID)
launchStart := time.Now()
var wi *protocol.WorkspaceInfo
for i := 0; i < 3; i++ {
launchDuration := time.Since(launchStart)
wi, err = server.GetWorkspace(cctx, resp.CreatedWorkspaceID)
if err != nil || wi.LatestInstance == nil {
time.Sleep(2 * time.Second)
t.Logf("error or nil instance since %s", launchDuration)
continue
}
if wi.LatestInstance.Status.Phase != "preparing" {
t.Logf("not preparing")
break
}
t.Logf("sleeping")
time.Sleep(5 * time.Second)
}
if wi == nil || wi.LatestInstance == nil {
return nil, nil, xerrors.Errorf("CreateWorkspace did not start the workspace")
}
t.Logf("got the workspace information: %s", wi.Workspace.ID)
if wi.LatestInstance.IdeURL == "" {
wi.LatestInstance.IdeURL = resp.WorkspaceURL
}
if wi.LatestInstance.Status.Conditions.NeededImageBuild {
for ctx.Err() == nil {
wi, err = server.GetWorkspace(cctx, resp.CreatedWorkspaceID)
if err != nil {
return nil, nil, xerrors.Errorf("cannot get workspace: %w", err)
}
if wi.LatestInstance.Status.Phase == "running" {
break
}
time.Sleep(10 * time.Second)
}
}
stopWs = stopWsF(t, wi.LatestInstance.ID, resp.CreatedWorkspaceID, api, false)
defer func() {
if err != nil {
_, _ = stopWs(true, api)
}
}()
t.Log("wait for workspace to be fully up and running")
wsState, err := WaitForWorkspaceStart(t, cctx, wi.LatestInstance.ID, resp.CreatedWorkspaceID, api)
if err != nil {
return nil, nil, xerrors.Errorf("failed to wait for the workspace to start up: %w", err)
}
if wi.LatestInstance.IdeURL == "" {
wi.LatestInstance.IdeURL = wsState.Spec.Url
}
t.Log("successful launch of the workspace")
return wi, stopWs, nil
}
func stopWsF(t *testing.T, instanceID string, workspaceID string, api *ComponentAPI, isPrebuild bool) StopWorkspaceFunc {
var already bool
var unlocked bool
return func(waitForStop bool, api *ComponentAPI) (s *wsmanapi.WorkspaceStatus, err error) {
if already {
t.Logf("already sent stop request: %s", instanceID)
return nil, nil
}
already = true
tryUnlockParallelLimiter := func() {
if !unlocked {
unlocked = true
<-parallelLimiter
}
}
defer func() {
if err == nil {
return
}
tryUnlockParallelLimiter()
}()
sctx, scancel := context.WithTimeout(context.Background(), perCallTimeout)
defer scancel()
done := make(chan *wsmanapi.WorkspaceStatus)
errCh := make(chan error)
ready := make(chan struct{}, 1)
go func() {
var lastStatus *wsmanapi.WorkspaceStatus
defer func() {
done <- lastStatus
close(done)
}()
t.Logf("waiting for stopping the workspace: %s", instanceID)
lastStatus, err = WaitForWorkspaceStop(t, sctx, ready, api, instanceID, workspaceID)
if err != nil {
errCh <- err
}
}()
<-ready
for {
t.Logf("attempt to delete the workspace: %s", instanceID)
err := DeleteWorkspace(sctx, api, instanceID)
if err != nil {
if st, ok := status.FromError(err); ok && st.Code() == codes.Unavailable {
api.ClearWorkspaceManagerClientCache()
t.Logf("got %v when deleting workspace", st)
time.Sleep(5 * time.Second)
continue
}
return nil, err
}
break
}
waitAndUnlock := func() (*wsmanapi.WorkspaceStatus, error) {
defer tryUnlockParallelLimiter()
select {
case err := <-errCh:
return nil, err
case s := <-done:
t.Logf("successfully terminated workspace")
return s, nil
}
}
if !waitForStop {
go func() {
_, err = waitAndUnlock()
if err != nil {
t.Logf("error while waiting asynchronously for workspace to stop: %v", err)
}
}()
return nil, nil
}
return waitAndUnlock()
}
}
type WaitForWorkspaceOpt func(*waitForWorkspaceOpts)
type waitForWorkspaceOpts struct {
CanFail bool
WaitForStopped bool
}
func WorkspaceCanFail(o *waitForWorkspaceOpts) {
o.CanFail = true
}
func WaitForStopped(o *waitForWorkspaceOpts) {
o.WaitForStopped = true
}
func WaitForWorkspaceStart(t *testing.T, ctx context.Context, instanceID string, workspaceID string, api *ComponentAPI, opts ...WaitForWorkspaceOpt) (lastStatus *wsmanapi.WorkspaceStatus, err error) {
var cfg waitForWorkspaceOpts
for _, o := range opts {
o(&cfg)
}
checkStatus := func(status *wsmanapi.WorkspaceStatus) (done bool, err error) {
if status == nil {
return false, nil
}
if !cfg.CanFail && status.Conditions != nil && status.Conditions.Failed != "" {
return true, xerrors.Errorf("workspace instance %s failed: %s", instanceID, status.Conditions.Failed)
}
switch status.Phase {
case wsmanapi.WorkspacePhase_RUNNING:
if !cfg.WaitForStopped {
return true, nil
}
case wsmanapi.WorkspacePhase_STOPPING:
if !cfg.WaitForStopped {
return true, ErrWorkspaceInstanceStopping
}
case wsmanapi.WorkspacePhase_STOPPED:
if !cfg.WaitForStopped {
return true, ErrWorkspaceInstanceStopped
} else {
return true, nil
}
}
return false, nil
}
done := make(chan *wsmanapi.WorkspaceStatus)
errStatus := make(chan error)
reboot := make(chan struct{}, 1)
go func() {
t.Log("prepare for a connection with ws-manager")
wsman, err := api.WorkspaceManager()
if err != nil {
errStatus <- err
return
}
sub, err := wsman.Subscribe(ctx, &wsmanapi.SubscribeRequest{
MustMatch: &wsmanapi.MetadataFilter{
MetaId: workspaceID,
},
})
if err != nil {
errStatus <- err
return
}
defer func() {
if sub != nil {
_ = sub.CloseSend()
}
}()
t.Log("established for a connection with ws-manager")
var s *wsmanapi.WorkspaceStatus
defer func() {
done <- s
close(done)
}()
for {
waitForPhase := "running"
if cfg.WaitForStopped {
waitForPhase = "stopped"
}
t.Logf("check if the status of workspace is in the %s phase: %s", waitForPhase, instanceID)
resp, err := sub.Recv()
if err != nil {
if st, ok := status.FromError(err); ok && st.Code() == codes.Unavailable {
sub.CloseSend()
api.ClearWorkspaceManagerClientCache()
wsman, err = api.WorkspaceManager()
if err != nil {
time.Sleep(5 * time.Second)
reboot <- struct{}{}
t.Logf("we can't get the worksapce manger client: %v", err)
continue
}
sub, err = wsman.Subscribe(ctx, &wsmanapi.SubscribeRequest{
MustMatch: &wsmanapi.MetadataFilter{
MetaId: workspaceID,
},
})
if err != nil {
errStatus <- xerrors.Errorf("cannot listen for workspace updates: %w", err)
return
}
continue
}
errStatus <- xerrors.Errorf("workspace update error: %w", err)
return
}
s = resp.GetStatus()
if s == nil || s.Id != instanceID {
continue
}
t.Logf("subscribe status: %s, %s", s.Id, s.Phase)
done2, err := checkStatus(s)
if err != nil {
errStatus <- err
return
}
if done2 {
return
}
}
}()
handle := func() (*wsmanapi.WorkspaceStatus, bool, error) {
wsman, err := api.WorkspaceManager()
if err != nil {
api.ClearWorkspaceManagerClientCache()
return nil, false, err
}
desc, err := wsman.DescribeWorkspace(ctx, &wsmanapi.DescribeWorkspaceRequest{
Id: instanceID,
})
if err != nil {
scode := status.Code(err)
if scode == codes.NotFound || strings.Contains(err.Error(), "not found") {
if cfg.WaitForStopped {
t.Logf("describe: workspace couldn't be found, but we're expecting it to stop, so wait for subscribe to give us the last status")
return nil, false, nil
}
if !cfg.CanFail {
return nil, true, xerrors.Errorf("the workspace %s couldn't be found", instanceID)
}
return nil, true, nil
}
return nil, false, err
}
if desc == nil || desc.Status == nil {
t.Logf("describe status is nil: %s", instanceID)
return nil, false, nil
}
t.Logf("describe status: %s, %s", desc.Status.Id, desc.Status.Phase)
done, err := checkStatus(desc.Status)
return desc.Status, done, err
}
ticker := time.NewTicker(1 * time.Second)
for {
select {
case <-ticker.C:
desc, done, err := handle()
if !done {
if err != nil {
t.Logf("error checking workspace status, trying again later: %v", err)
}
continue
} else if err != nil {
return nil, err
} else if desc != nil {
return desc, nil
}
case <-reboot:
desc, done, err := handle()
if !done {
continue
} else if err != nil {
return nil, err
} else if desc != nil {
return desc, nil
}
case <-ctx.Done():
return nil, xerrors.Errorf("cannot wait for workspace: %w", ctx.Err())
case s := <-done:
return s, nil
case err := <-errStatus:
return nil, err
}
}
}
func WaitForWorkspaceStop(t *testing.T, ctx context.Context, ready chan<- struct{}, api *ComponentAPI, instanceID string, workspaceID string, opts ...WaitForWorkspaceOpt) (lastStatus *wsmanapi.WorkspaceStatus, err error) {
var cfg waitForWorkspaceOpts
for _, o := range opts {
o(&cfg)
}
wsman, err := api.WorkspaceManager()
if err != nil {
return nil, err
}
sub, err := wsman.Subscribe(ctx, &wsmanapi.SubscribeRequest{
MustMatch: &wsmanapi.MetadataFilter{
MetaId: workspaceID,
},
})
if err != nil {
ready <- struct{}{}
return nil, err
}
defer func() {
if sub != nil {
_ = sub.CloseSend()
}
}()
var notFound bool
done := make(chan *wsmanapi.WorkspaceStatus)
errCh := make(chan error)
reboot := make(chan struct{}, 1)
go func() {
var wss *wsmanapi.WorkspaceStatus
defer func() {
done <- wss
close(done)
}()
ready <- struct{}{}
for {
resp, err := sub.Recv()
notFound = false
if err != nil {
if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable {
var serr error
sub.CloseSend()
api.ClearWorkspaceManagerClientCache()
wsman, err = api.WorkspaceManager()
if err != nil {
t.Logf("we can't get the worksapce manger client: %v", err)
time.Sleep(5 * time.Second)
reboot <- struct{}{}
continue
}
sub, err = wsman.Subscribe(ctx, &wsmanapi.SubscribeRequest{
MustMatch: &wsmanapi.MetadataFilter{
MetaId: workspaceID,
},
})
if serr == nil {
continue
}
}
errCh <- xerrors.Errorf("workspace update error: %v", err)
return
}
wss = resp.GetStatus()
if wss.Conditions.Failed != "" && !cfg.CanFail {
errCh <- xerrors.Errorf("workspace instance %s failed: %s", instanceID, wss.Conditions.Failed)
return
}
if wss.Phase == wsmanapi.WorkspacePhase_STOPPED {
t.Logf("confirmed the worksapce is stopped: %s, %s", wss.Id, wss.Phase)
return
}
continue
}
}()
desc, err := wsman.DescribeWorkspace(ctx, &wsmanapi.DescribeWorkspaceRequest{
Id: instanceID,
})
if err != nil {
scode := status.Code(err)
if scode == codes.NotFound || strings.Contains(err.Error(), "not found") {
t.Log("for some reason, ws-manager subscriber doesn't get updated. But the workspace is gone")
return nil, nil
}
}
if desc != nil && desc.Status != nil {
if desc.Status.Phase == wsmanapi.WorkspacePhase_STOPPED {
return desc.Status, nil
}
}
for {
select {
case <-reboot:
wsman, err := api.WorkspaceManager()
if err != nil {
api.ClearWorkspaceManagerClientCache()
continue
}
desc, err := wsman.DescribeWorkspace(ctx, &wsmanapi.DescribeWorkspaceRequest{
Id: instanceID,
})
if err != nil {
scode := status.Code(err)
if scode == codes.NotFound || strings.Contains(err.Error(), "not found") {
if notFound {
t.Log("for some reason, ws-manager subscriber doesn't get updated. But the workspace is gone")
return nil, nil
}
notFound = true
continue
}
}
notFound = false
if desc != nil && desc.Status != nil {
if desc.Status.Phase == wsmanapi.WorkspacePhase_STOPPED {
return desc.Status, nil
}
}
case err := <-errCh:
return nil, err
case <-ctx.Done():
return nil, xerrors.Errorf("cannot wait for workspace: %w", ctx.Err())
case s := <-done:
return s, nil
}
}
}
func WaitForWorkspace(ctx context.Context, api *ComponentAPI, instanceID string, condition func(status *wsmanapi.WorkspaceStatus) bool) (lastStatus *wsmanapi.WorkspaceStatus, err error) {
wsman, err := api.WorkspaceManager()
if err != nil {
return
}
sub, err := wsman.Subscribe(ctx, &wsmanapi.SubscribeRequest{})
if err != nil {
return nil, xerrors.Errorf("cannot listen for workspace updates: %q", err)
}
done := make(chan *wsmanapi.WorkspaceStatus, 1)
errCh := make(chan error)
var once sync.Once
go func() {
var status *wsmanapi.WorkspaceStatus
defer func() {
once.Do(func() {
done <- status
close(done)
})
_ = sub.CloseSend()
}()
for {
resp, err := sub.Recv()
if err == io.EOF {
return
}
if err != nil {
errCh <- xerrors.Errorf("workspace update error: %q", err)
return
}
status = resp.GetStatus()
if status == nil {
continue
}
if status.Id != instanceID {
continue
}
if condition(status) {
return
}
}
}()
desc, err := wsman.DescribeWorkspace(ctx, &wsmanapi.DescribeWorkspaceRequest{Id: instanceID})
if err != nil {
return nil, xerrors.Errorf("cannot get workspace: %q", err)
}
if condition(desc.Status) {
once.Do(func() { close(done) })
return desc.Status, nil
}
select {
case err := <-errCh:
return nil, err
case <-ctx.Done():
return nil, xerrors.Errorf("cannot wait for workspace: %q", ctx.Err())
case s := <-done:
return s, nil
}
}
func resolveOrBuildImage(ctx context.Context, api *ComponentAPI, baseRef string) (absref string, err error) {
cl, err := api.ImageBuilder()
if err != nil {
return
}
reslv, err := cl.ResolveWorkspaceImage(ctx, &imgbldr.ResolveWorkspaceImageRequest{
Source: &imgbldr.BuildSource{
From: &imgbldr.BuildSource_Ref{
Ref: &imgbldr.BuildSourceReference{
Ref: baseRef,
},
},
},
Auth: &imgbldr.BuildRegistryAuth{
Mode: &imgbldr.BuildRegistryAuth_Total{
Total: &imgbldr.BuildRegistryAuthTotal{
AllowAll: true,
},
},
},
})
if err != nil {
return
}
if reslv.Status == imgbldr.BuildStatus_done_success {
return reslv.Ref, nil
}
bld, err := cl.Build(ctx, &imgbldr.BuildRequest{
TriggeredBy: "integration-test",
Source: &imgbldr.BuildSource{
From: &imgbldr.BuildSource_Ref{
Ref: &imgbldr.BuildSourceReference{
Ref: baseRef,
},
},
},
Auth: &imgbldr.BuildRegistryAuth{
Mode: &imgbldr.BuildRegistryAuth_Total{
Total: &imgbldr.BuildRegistryAuthTotal{
AllowAll: true,
},
},
},
})
if err != nil {
return
}
for {
resp, err := bld.Recv()
if err != nil {
return "", err
}
if resp.Status == imgbldr.BuildStatus_done_success {
break
} else if resp.Status == imgbldr.BuildStatus_done_failure {
return "", xerrors.Errorf("cannot build workspace image: %s", resp.Message)
}
}
return reslv.Ref, nil
}
func DeleteWorkspace(ctx context.Context, api *ComponentAPI, instanceID string) error {
wm, err := api.WorkspaceManager()
if err != nil {
return err
}
_, err = wm.StopWorkspace(ctx, &wsmanapi.StopWorkspaceRequest{
Id: instanceID,
})
if err != nil {
s, ok := status.FromError(err)
if ok && s.Code() == codes.NotFound {
return nil
}
return err
}
return nil
}