Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/test/pkg/integration/workspace.go
2498 views
1
// Copyright (c) 2020 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package integration
6
7
import (
8
"context"
9
"fmt"
10
"io"
11
"strings"
12
"sync"
13
"testing"
14
"time"
15
16
"github.com/google/uuid"
17
"golang.org/x/xerrors"
18
"google.golang.org/grpc/codes"
19
"google.golang.org/grpc/status"
20
"k8s.io/apimachinery/pkg/util/wait"
21
22
"github.com/gitpod-io/gitpod/common-go/namegen"
23
csapi "github.com/gitpod-io/gitpod/content-service/api"
24
protocol "github.com/gitpod-io/gitpod/gitpod-protocol"
25
ide "github.com/gitpod-io/gitpod/ide-service-api/config"
26
imgbldr "github.com/gitpod-io/gitpod/image-builder/api"
27
wsmanapi "github.com/gitpod-io/gitpod/ws-manager/api"
28
)
29
30
const (
31
gitpodBuiltinUserID = "builtin-user-workspace-probe-0000000"
32
perCallTimeout = 5 * time.Minute
33
ParallelLunchableWorkspaceLimit = 10
34
)
35
36
var (
37
ErrWorkspaceInstanceStopping = fmt.Errorf("workspace instance is stopping")
38
ErrWorkspaceInstanceStopped = fmt.Errorf("workspace instance has stopped")
39
parallelLimiter = make(chan struct{}, ParallelLunchableWorkspaceLimit)
40
)
41
42
type launchWorkspaceDirectlyOptions struct {
43
BaseImage string
44
IdeImage string
45
Mods []func(*wsmanapi.StartWorkspaceRequest) error
46
WaitForOpts []WaitForWorkspaceOpt
47
}
48
49
// LaunchWorkspaceDirectlyOpt configures the behaviour of LaunchWorkspaceDirectly
50
type LaunchWorkspaceDirectlyOpt func(*launchWorkspaceDirectlyOptions) error
51
52
// WithoutWorkspaceImage prevents the image-builder based base image resolution and sets
53
// the workspace image to an empty string.
54
// Usually callers would then use WithRequestModifier to set the workspace image themselves.
55
func WithoutWorkspaceImage() LaunchWorkspaceDirectlyOpt {
56
return func(lwdo *launchWorkspaceDirectlyOptions) error {
57
lwdo.BaseImage = ""
58
return nil
59
}
60
}
61
62
// WithBaseImage configures the base image used to start the workspace. The base image
63
// will be resolved to a workspace image using the image builder. If the corresponding
64
// workspace image isn't built yet, it will NOT be built.
65
func WithBaseImage(baseImage string) LaunchWorkspaceDirectlyOpt {
66
return func(lwdo *launchWorkspaceDirectlyOptions) error {
67
lwdo.BaseImage = baseImage
68
return nil
69
}
70
}
71
72
// WithIDEImage configures the IDE image used to start the workspace. Using this option
73
// as compared to setting the image using a modifier prevents the image ref computation
74
// based on the server's configuration.
75
func WithIDEImage(ideImage string) LaunchWorkspaceDirectlyOpt {
76
return func(lwdo *launchWorkspaceDirectlyOptions) error {
77
lwdo.IdeImage = ideImage
78
return nil
79
}
80
}
81
82
// WithRequestModifier modifies the start workspace request before it's sent.
83
func WithRequestModifier(mod func(*wsmanapi.StartWorkspaceRequest) error) LaunchWorkspaceDirectlyOpt {
84
return func(lwdo *launchWorkspaceDirectlyOptions) error {
85
lwdo.Mods = append(lwdo.Mods, mod)
86
return nil
87
}
88
}
89
90
// WithWaitWorkspaceForOpts adds options to the WaitForWorkspace call that happens as part of LaunchWorkspaceDirectly
91
func WithWaitWorkspaceForOpts(opt ...WaitForWorkspaceOpt) LaunchWorkspaceDirectlyOpt {
92
return func(lwdo *launchWorkspaceDirectlyOptions) error {
93
lwdo.WaitForOpts = opt
94
return nil
95
}
96
}
97
98
// LaunchWorkspaceDirectlyResult is returned by LaunchWorkspaceDirectly
99
type LaunchWorkspaceDirectlyResult struct {
100
Req *wsmanapi.StartWorkspaceRequest
101
WorkspaceID string
102
IdeURL string
103
LastStatus *wsmanapi.WorkspaceStatus
104
}
105
106
type StopWorkspaceFunc = func(waitForStop bool, api *ComponentAPI) (*wsmanapi.WorkspaceStatus, error)
107
108
// LaunchWorkspaceDirectly starts a workspace pod by talking directly to ws-manager.
109
// Whenever possible prefer this function over LaunchWorkspaceFromContextURL, because
110
// it has fewer prerequisites.
111
func LaunchWorkspaceDirectly(t *testing.T, ctx context.Context, api *ComponentAPI, opts ...LaunchWorkspaceDirectlyOpt) (*LaunchWorkspaceDirectlyResult, StopWorkspaceFunc, error) {
112
var stopWs StopWorkspaceFunc = nil
113
options := launchWorkspaceDirectlyOptions{
114
BaseImage: "docker.io/gitpod/workspace-full:latest",
115
}
116
for _, o := range opts {
117
err := o(&options)
118
if err != nil {
119
return nil, nil, err
120
}
121
}
122
123
instanceID, err := uuid.NewRandom()
124
if err != nil {
125
return nil, nil, err
126
127
}
128
workspaceID, err := namegen.GenerateWorkspaceID()
129
if err != nil {
130
return nil, nil, err
131
}
132
133
parallelLimiter <- struct{}{}
134
defer func() {
135
if err != nil && stopWs == nil {
136
t.Logf("unlock the parallelLimiter because of error during starting the workspace: %v", err)
137
<-parallelLimiter
138
}
139
}()
140
141
var workspaceImage string
142
if options.BaseImage != "" {
143
for i := 0; i < 3; i++ {
144
workspaceImage, err = resolveOrBuildImage(ctx, api, options.BaseImage)
145
if st, ok := status.FromError(err); ok && st.Code() == codes.Unavailable {
146
api.ClearImageBuilderClientCache()
147
time.Sleep(5 * time.Second)
148
continue
149
} else if err != nil && strings.Contains(err.Error(), "the server is currently unable to handle the request") {
150
api.ClearImageBuilderClientCache()
151
time.Sleep(5 * time.Second)
152
continue
153
} else if err != nil && strings.Contains(err.Error(), "apiserver not ready") {
154
api.ClearImageBuilderClientCache()
155
time.Sleep(5 * time.Second)
156
continue
157
} else if err != nil {
158
time.Sleep(5 * time.Second)
159
continue
160
}
161
break
162
}
163
if err != nil {
164
return nil, nil, err
165
}
166
}
167
168
waitErr := wait.PollImmediate(1*time.Second, 2*time.Minute, func() (bool, error) {
169
workspaceImage, err = resolveOrBuildImage(ctx, api, options.BaseImage)
170
if st, ok := status.FromError(err); ok && st.Code() == codes.Unavailable {
171
api.ClearImageBuilderClientCache()
172
return false, nil
173
} else if err != nil && strings.Contains(err.Error(), "the server is currently unable to handle the request") {
174
api.ClearImageBuilderClientCache()
175
return false, nil
176
} else if err != nil && strings.Contains(err.Error(), "apiserver not ready") {
177
api.ClearImageBuilderClientCache()
178
return false, nil
179
} else if err != nil {
180
return false, nil
181
}
182
return true, nil
183
})
184
185
if waitErr == wait.ErrWaitTimeout {
186
return nil, nil, fmt.Errorf("timeout waiting for resolving the build image: %w", waitErr)
187
} else if waitErr != nil {
188
return nil, nil, waitErr
189
} else if err != nil {
190
return nil, nil, err
191
} else if workspaceImage == "" {
192
err = xerrors.Errorf("cannot start workspaces without a workspace image (required by registry-facade resolver)")
193
return nil, nil, err
194
}
195
196
ideImage := options.IdeImage
197
ideImageLayers := make([]string, 0)
198
if ideImage == "" {
199
var cfg *ide.IDEConfig
200
for i := 0; i < 3; i++ {
201
cfg, err = GetIDEConfig(api.namespace, api.client)
202
if err != nil {
203
continue
204
}
205
}
206
if err != nil {
207
return nil, nil, xerrors.Errorf("cannot find server IDE config: %w", err)
208
}
209
ideImage = cfg.IdeOptions.Options["code"].Image
210
ideImageLayers = cfg.IdeOptions.Options["code"].ImageLayers
211
if ideImage == "" {
212
err = xerrors.Errorf("cannot start workspaces without an IDE image (required by registry-facade resolver)")
213
return nil, nil, err
214
}
215
}
216
217
req := &wsmanapi.StartWorkspaceRequest{
218
Id: instanceID.String(),
219
ServicePrefix: workspaceID,
220
Metadata: &wsmanapi.WorkspaceMetadata{
221
Owner: gitpodBuiltinUserID,
222
MetaId: workspaceID,
223
},
224
Type: wsmanapi.WorkspaceType_REGULAR,
225
Spec: &wsmanapi.StartWorkspaceSpec{
226
WorkspaceImage: workspaceImage,
227
IdeImage: &wsmanapi.IDEImage{
228
WebRef: ideImage,
229
},
230
IdeImageLayers: ideImageLayers,
231
WorkspaceLocation: "/",
232
Timeout: "30m",
233
Initializer: &csapi.WorkspaceInitializer{
234
Spec: &csapi.WorkspaceInitializer_Empty{
235
Empty: &csapi.EmptyInitializer{},
236
},
237
},
238
Git: &wsmanapi.GitSpec{
239
Username: "integration-test",
240
Email: "[email protected]",
241
},
242
Admission: wsmanapi.AdmissionLevel_ADMIT_OWNER_ONLY,
243
Envvars: []*wsmanapi.EnvironmentVariable{
244
// VSX_REGISTRY_URL is set by server, since we start the workspace directly
245
// from ws-manager in these tests we need to set it here ourselves.
246
{
247
Name: "VSX_REGISTRY_URL",
248
Value: "https://open-vsx.gitpod.io/",
249
},
250
},
251
},
252
}
253
for _, m := range options.Mods {
254
err := m(req)
255
if err != nil {
256
return nil, nil, err
257
}
258
}
259
260
t.Log("prepare for a connection with ws-manager")
261
wsm, err := api.WorkspaceManager()
262
if err != nil {
263
return nil, nil, xerrors.Errorf("cannot start workspace manager: %w", err)
264
}
265
t.Log("established a connection with ws-manager")
266
267
var sresp *wsmanapi.StartWorkspaceResponse
268
for i := 0; i < 3; i++ {
269
t.Logf("attempt to start up the workspace directly: %s, %s", instanceID, workspaceID)
270
sresp, err = wsm.StartWorkspace(ctx, req)
271
if err != nil {
272
scode := status.Code(err)
273
if scode == codes.NotFound || scode == codes.Unavailable {
274
t.Log("retry strarting a workspace because cannnot start workspace: %w", err)
275
time.Sleep(1 * time.Second)
276
277
api.ClearWorkspaceManagerClientCache()
278
wsm, err = api.WorkspaceManager()
279
if err != nil {
280
return nil, nil, xerrors.Errorf("cannot start workspace manager: %w", err)
281
}
282
continue
283
}
284
if strings.Contains(err.Error(), "too many requests") {
285
t.Log("hit too many requests so retry after some seconds")
286
time.Sleep(30 * time.Second)
287
continue
288
}
289
err = xerrors.Errorf("cannot start workspace: %w", err)
290
return nil, nil, err
291
}
292
break
293
}
294
t.Log("successfully sent workspace start request")
295
296
stopWs = stopWsF(t, req.Id, req.Metadata.MetaId, api, req.Type == wsmanapi.WorkspaceType_PREBUILD)
297
defer func() {
298
if err != nil {
299
_, _ = stopWs(true, api)
300
}
301
}()
302
303
t.Log("wait for workspace to be fully up and running")
304
lastStatus, err := WaitForWorkspaceStart(t, ctx, req.Id, req.Metadata.MetaId, api, options.WaitForOpts...)
305
if err != nil {
306
return nil, nil, xerrors.Errorf("cannot wait for workspace start: %w", err)
307
}
308
t.Log("successful launch of the workspace")
309
310
return &LaunchWorkspaceDirectlyResult{
311
Req: req,
312
WorkspaceID: workspaceID,
313
IdeURL: sresp.Url,
314
LastStatus: lastStatus,
315
}, stopWs, nil
316
}
317
318
// LaunchWorkspaceFromContextURL force-creates a new workspace using the Gitpod server API,
319
// and waits for the workspace to start. If any step along the way fails, this function will
320
// fail the test.
321
//
322
// When possible, prefer the less complex LaunchWorkspaceDirectly.
323
func LaunchWorkspaceFromContextURL(t *testing.T, ctx context.Context, contextURL string, username string, api *ComponentAPI, serverOpts ...GitpodServerOpt) (*protocol.WorkspaceInfo, StopWorkspaceFunc, error) {
324
return LaunchWorkspaceWithOptions(t, ctx, &LaunchWorkspaceOptions{
325
ContextURL: contextURL,
326
}, username, api, serverOpts...)
327
}
328
329
type LaunchWorkspaceOptions struct {
330
ContextURL string
331
ProjectID string
332
IDESettings *protocol.IDESettings
333
}
334
335
// LaunchWorkspaceWithOptions force-creates a new workspace using the Gitpod server API,
336
// and waits for the workspace to start. If any step along the way fails, this function will
337
// fail the test.
338
//
339
// When possible, prefer the less complex LaunchWorkspaceDirectly.
340
func LaunchWorkspaceWithOptions(t *testing.T, ctx context.Context, opts *LaunchWorkspaceOptions, username string, api *ComponentAPI, serverOpts ...GitpodServerOpt) (*protocol.WorkspaceInfo, StopWorkspaceFunc, error) {
341
var (
342
defaultServerOpts []GitpodServerOpt
343
stopWs StopWorkspaceFunc = nil
344
err error
345
)
346
347
if username != "" {
348
defaultServerOpts = []GitpodServerOpt{WithGitpodUser(username)}
349
}
350
351
parallelLimiter <- struct{}{}
352
defer func() {
353
if err != nil && stopWs == nil {
354
<-parallelLimiter
355
}
356
}()
357
358
server, err := api.GitpodServer(append(defaultServerOpts, serverOpts...)...)
359
if err != nil {
360
return nil, nil, xerrors.Errorf("cannot start server: %w", err)
361
}
362
363
cctx, ccancel := context.WithTimeout(context.Background(), perCallTimeout)
364
defer ccancel()
365
366
var resp *protocol.WorkspaceCreationResult
367
for i := 0; i < 3; i++ {
368
u, _ := api.GetUserId(username)
369
t.Logf("attempt to create the workspace as user %v, with context %v\n", u, opts.ContextURL)
370
371
teams, _ := server.GetTeams(cctx)
372
var orgId string
373
if len(teams) == 0 {
374
team, err := server.CreateTeam(cctx, "test-team")
375
if err != nil {
376
return nil, nil, xerrors.Errorf("cannot create team: %w", err)
377
}
378
orgId = team.ID
379
} else {
380
orgId = teams[0].ID
381
}
382
383
resp, err = server.CreateWorkspace(cctx, &protocol.CreateWorkspaceOptions{
384
ContextURL: opts.ContextURL,
385
ProjectId: opts.ProjectID,
386
OrganizationId: orgId,
387
IgnoreRunningWorkspaceOnSameCommit: true,
388
StartWorkspaceOptions: protocol.StartWorkspaceOptions{
389
IdeSettings: opts.IDESettings,
390
},
391
})
392
if err != nil {
393
scode := status.Code(err)
394
if scode == codes.NotFound || scode == codes.Unavailable {
395
t.Log("retry strarting a workspace because cannnot start workspace: %w", err)
396
time.Sleep(1 * time.Second)
397
api.ClearGitpodServerClientCache()
398
server, err = api.GitpodServer(append(defaultServerOpts, serverOpts...)...)
399
if err != nil {
400
return nil, nil, xerrors.Errorf("cannot start server: %w", err)
401
}
402
continue
403
}
404
if strings.Contains(err.Error(), "too many requests") {
405
t.Log("hit too many requests so retry after some seconds")
406
time.Sleep(30 * time.Second)
407
continue
408
}
409
return nil, nil, xerrors.Errorf("cannot start workspace: %w", err)
410
}
411
break
412
}
413
414
t.Logf("attempt to get the workspace information: %s", resp.CreatedWorkspaceID)
415
launchStart := time.Now()
416
var wi *protocol.WorkspaceInfo
417
for i := 0; i < 3; i++ {
418
launchDuration := time.Since(launchStart)
419
wi, err = server.GetWorkspace(cctx, resp.CreatedWorkspaceID)
420
if err != nil || wi.LatestInstance == nil {
421
time.Sleep(2 * time.Second)
422
t.Logf("error or nil instance since %s", launchDuration)
423
continue
424
}
425
if wi.LatestInstance.Status.Phase != "preparing" {
426
t.Logf("not preparing")
427
break
428
}
429
t.Logf("sleeping")
430
time.Sleep(5 * time.Second)
431
}
432
if wi == nil || wi.LatestInstance == nil {
433
return nil, nil, xerrors.Errorf("CreateWorkspace did not start the workspace")
434
}
435
t.Logf("got the workspace information: %s", wi.Workspace.ID)
436
437
// GetWorkspace might receive an instance before we seen the first event
438
// from ws-manager, in which case IdeURL is not set
439
if wi.LatestInstance.IdeURL == "" {
440
wi.LatestInstance.IdeURL = resp.WorkspaceURL
441
}
442
443
if wi.LatestInstance.Status.Conditions.NeededImageBuild {
444
for ctx.Err() == nil {
445
wi, err = server.GetWorkspace(cctx, resp.CreatedWorkspaceID)
446
if err != nil {
447
return nil, nil, xerrors.Errorf("cannot get workspace: %w", err)
448
}
449
if wi.LatestInstance.Status.Phase == "running" {
450
break
451
}
452
time.Sleep(10 * time.Second)
453
}
454
}
455
456
stopWs = stopWsF(t, wi.LatestInstance.ID, resp.CreatedWorkspaceID, api, false)
457
defer func() {
458
if err != nil {
459
_, _ = stopWs(true, api)
460
}
461
}()
462
463
t.Log("wait for workspace to be fully up and running")
464
wsState, err := WaitForWorkspaceStart(t, cctx, wi.LatestInstance.ID, resp.CreatedWorkspaceID, api)
465
if err != nil {
466
return nil, nil, xerrors.Errorf("failed to wait for the workspace to start up: %w", err)
467
}
468
if wi.LatestInstance.IdeURL == "" {
469
wi.LatestInstance.IdeURL = wsState.Spec.Url
470
}
471
t.Log("successful launch of the workspace")
472
473
return wi, stopWs, nil
474
}
475
476
func stopWsF(t *testing.T, instanceID string, workspaceID string, api *ComponentAPI, isPrebuild bool) StopWorkspaceFunc {
477
var already bool
478
var unlocked bool
479
return func(waitForStop bool, api *ComponentAPI) (s *wsmanapi.WorkspaceStatus, err error) {
480
if already {
481
t.Logf("already sent stop request: %s", instanceID)
482
return nil, nil
483
}
484
already = true
485
486
tryUnlockParallelLimiter := func() {
487
if !unlocked {
488
unlocked = true
489
<-parallelLimiter
490
}
491
}
492
493
defer func() {
494
if err == nil {
495
return
496
}
497
498
// Only unlock on error here, otherwise we'll unlock below
499
// after waiting for the workspace to stop.
500
tryUnlockParallelLimiter()
501
}()
502
503
sctx, scancel := context.WithTimeout(context.Background(), perCallTimeout)
504
defer scancel()
505
506
done := make(chan *wsmanapi.WorkspaceStatus)
507
errCh := make(chan error)
508
ready := make(chan struct{}, 1)
509
go func() {
510
var lastStatus *wsmanapi.WorkspaceStatus
511
defer func() {
512
done <- lastStatus
513
close(done)
514
}()
515
516
t.Logf("waiting for stopping the workspace: %s", instanceID)
517
lastStatus, err = WaitForWorkspaceStop(t, sctx, ready, api, instanceID, workspaceID)
518
if err != nil {
519
errCh <- err
520
}
521
}()
522
523
<-ready
524
525
for {
526
t.Logf("attempt to delete the workspace: %s", instanceID)
527
err := DeleteWorkspace(sctx, api, instanceID)
528
if err != nil {
529
if st, ok := status.FromError(err); ok && st.Code() == codes.Unavailable {
530
api.ClearWorkspaceManagerClientCache()
531
t.Logf("got %v when deleting workspace", st)
532
time.Sleep(5 * time.Second)
533
continue
534
}
535
536
return nil, err
537
}
538
break
539
}
540
541
waitAndUnlock := func() (*wsmanapi.WorkspaceStatus, error) {
542
defer tryUnlockParallelLimiter()
543
select {
544
case err := <-errCh:
545
return nil, err
546
case s := <-done:
547
t.Logf("successfully terminated workspace")
548
return s, nil
549
}
550
}
551
552
if !waitForStop {
553
// Still wait for stop asynchroniously to unblock the parallelLimiter
554
go func() {
555
_, err = waitAndUnlock()
556
if err != nil {
557
t.Logf("error while waiting asynchronously for workspace to stop: %v", err)
558
}
559
}()
560
return nil, nil
561
}
562
563
return waitAndUnlock()
564
}
565
}
566
567
// WaitForWorkspaceOpt configures a WaitForWorkspace call
568
type WaitForWorkspaceOpt func(*waitForWorkspaceOpts)
569
570
type waitForWorkspaceOpts struct {
571
CanFail bool
572
WaitForStopped bool
573
}
574
575
// WorkspaceCanFail doesn't fail the test if the workspace fails to start
576
func WorkspaceCanFail(o *waitForWorkspaceOpts) {
577
o.CanFail = true
578
}
579
580
func WaitForStopped(o *waitForWorkspaceOpts) {
581
o.WaitForStopped = true
582
}
583
584
// WaitForWorkspace waits until a workspace is running. Fails the test if the workspace
585
// fails or does not become RUNNING before the context is canceled.
586
func WaitForWorkspaceStart(t *testing.T, ctx context.Context, instanceID string, workspaceID string, api *ComponentAPI, opts ...WaitForWorkspaceOpt) (lastStatus *wsmanapi.WorkspaceStatus, err error) {
587
var cfg waitForWorkspaceOpts
588
for _, o := range opts {
589
o(&cfg)
590
}
591
592
checkStatus := func(status *wsmanapi.WorkspaceStatus) (done bool, err error) {
593
if status == nil {
594
return false, nil
595
}
596
if !cfg.CanFail && status.Conditions != nil && status.Conditions.Failed != "" {
597
return true, xerrors.Errorf("workspace instance %s failed: %s", instanceID, status.Conditions.Failed)
598
}
599
600
switch status.Phase {
601
case wsmanapi.WorkspacePhase_RUNNING:
602
if !cfg.WaitForStopped {
603
// Done.
604
return true, nil
605
}
606
case wsmanapi.WorkspacePhase_STOPPING:
607
if !cfg.WaitForStopped {
608
return true, ErrWorkspaceInstanceStopping
609
}
610
case wsmanapi.WorkspacePhase_STOPPED:
611
if !cfg.WaitForStopped {
612
return true, ErrWorkspaceInstanceStopped
613
} else {
614
return true, nil
615
}
616
}
617
return false, nil
618
}
619
620
done := make(chan *wsmanapi.WorkspaceStatus)
621
errStatus := make(chan error)
622
reboot := make(chan struct{}, 1)
623
go func() {
624
t.Log("prepare for a connection with ws-manager")
625
wsman, err := api.WorkspaceManager()
626
if err != nil {
627
errStatus <- err
628
return
629
}
630
sub, err := wsman.Subscribe(ctx, &wsmanapi.SubscribeRequest{
631
MustMatch: &wsmanapi.MetadataFilter{
632
MetaId: workspaceID,
633
},
634
})
635
if err != nil {
636
errStatus <- err
637
return
638
}
639
640
defer func() {
641
if sub != nil {
642
_ = sub.CloseSend()
643
}
644
}()
645
t.Log("established for a connection with ws-manager")
646
647
var s *wsmanapi.WorkspaceStatus
648
defer func() {
649
done <- s
650
close(done)
651
}()
652
for {
653
waitForPhase := "running"
654
if cfg.WaitForStopped {
655
waitForPhase = "stopped"
656
}
657
t.Logf("check if the status of workspace is in the %s phase: %s", waitForPhase, instanceID)
658
resp, err := sub.Recv()
659
if err != nil {
660
if st, ok := status.FromError(err); ok && st.Code() == codes.Unavailable {
661
sub.CloseSend()
662
api.ClearWorkspaceManagerClientCache()
663
wsman, err = api.WorkspaceManager()
664
if err != nil {
665
time.Sleep(5 * time.Second)
666
reboot <- struct{}{}
667
t.Logf("we can't get the worksapce manger client: %v", err)
668
continue
669
}
670
sub, err = wsman.Subscribe(ctx, &wsmanapi.SubscribeRequest{
671
MustMatch: &wsmanapi.MetadataFilter{
672
MetaId: workspaceID,
673
},
674
})
675
if err != nil {
676
errStatus <- xerrors.Errorf("cannot listen for workspace updates: %w", err)
677
return
678
}
679
continue
680
}
681
errStatus <- xerrors.Errorf("workspace update error: %w", err)
682
return
683
}
684
685
s = resp.GetStatus()
686
if s == nil || s.Id != instanceID {
687
continue
688
}
689
690
t.Logf("subscribe status: %s, %s", s.Id, s.Phase)
691
692
done2, err := checkStatus(s)
693
if err != nil {
694
errStatus <- err
695
return
696
}
697
698
if done2 {
699
return
700
}
701
}
702
}()
703
704
handle := func() (*wsmanapi.WorkspaceStatus, bool, error) {
705
wsman, err := api.WorkspaceManager()
706
if err != nil {
707
api.ClearWorkspaceManagerClientCache()
708
return nil, false, err
709
}
710
desc, err := wsman.DescribeWorkspace(ctx, &wsmanapi.DescribeWorkspaceRequest{
711
Id: instanceID,
712
})
713
if err != nil {
714
scode := status.Code(err)
715
if scode == codes.NotFound || strings.Contains(err.Error(), "not found") {
716
if cfg.WaitForStopped {
717
t.Logf("describe: workspace couldn't be found, but we're expecting it to stop, so wait for subscribe to give us the last status")
718
return nil, false, nil
719
}
720
if !cfg.CanFail {
721
return nil, true, xerrors.Errorf("the workspace %s couldn't be found", instanceID)
722
}
723
return nil, true, nil
724
}
725
return nil, false, err
726
}
727
728
if desc == nil || desc.Status == nil {
729
t.Logf("describe status is nil: %s", instanceID)
730
return nil, false, nil
731
}
732
733
t.Logf("describe status: %s, %s", desc.Status.Id, desc.Status.Phase)
734
735
done, err := checkStatus(desc.Status)
736
return desc.Status, done, err
737
}
738
739
ticker := time.NewTicker(1 * time.Second)
740
for {
741
select {
742
case <-ticker.C:
743
// For in case missed the status change
744
desc, done, err := handle()
745
if !done {
746
if err != nil {
747
t.Logf("error checking workspace status, trying again later: %v", err)
748
}
749
continue
750
} else if err != nil {
751
return nil, err
752
} else if desc != nil {
753
return desc, nil
754
}
755
case <-reboot:
756
// Consider workspace state changes during subscriber reboot
757
desc, done, err := handle()
758
if !done {
759
continue
760
} else if err != nil {
761
return nil, err
762
} else if desc != nil {
763
return desc, nil
764
}
765
case <-ctx.Done():
766
return nil, xerrors.Errorf("cannot wait for workspace: %w", ctx.Err())
767
case s := <-done:
768
return s, nil
769
case err := <-errStatus:
770
return nil, err
771
}
772
}
773
}
774
775
// WaitForWorkspaceStop waits until a workspace is stopped. Fails the test if the workspace
776
// fails or does not stop before the context is canceled.
777
func WaitForWorkspaceStop(t *testing.T, ctx context.Context, ready chan<- struct{}, api *ComponentAPI, instanceID string, workspaceID string, opts ...WaitForWorkspaceOpt) (lastStatus *wsmanapi.WorkspaceStatus, err error) {
778
var cfg waitForWorkspaceOpts
779
for _, o := range opts {
780
o(&cfg)
781
}
782
783
wsman, err := api.WorkspaceManager()
784
if err != nil {
785
return nil, err
786
}
787
sub, err := wsman.Subscribe(ctx, &wsmanapi.SubscribeRequest{
788
MustMatch: &wsmanapi.MetadataFilter{
789
MetaId: workspaceID,
790
},
791
})
792
if err != nil {
793
ready <- struct{}{}
794
return nil, err
795
}
796
797
defer func() {
798
if sub != nil {
799
_ = sub.CloseSend()
800
}
801
}()
802
803
var notFound bool
804
done := make(chan *wsmanapi.WorkspaceStatus)
805
errCh := make(chan error)
806
reboot := make(chan struct{}, 1)
807
go func() {
808
var wss *wsmanapi.WorkspaceStatus
809
defer func() {
810
done <- wss
811
close(done)
812
}()
813
814
ready <- struct{}{}
815
for {
816
resp, err := sub.Recv()
817
notFound = false
818
if err != nil {
819
if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable {
820
var serr error
821
sub.CloseSend()
822
api.ClearWorkspaceManagerClientCache()
823
wsman, err = api.WorkspaceManager()
824
if err != nil {
825
t.Logf("we can't get the worksapce manger client: %v", err)
826
time.Sleep(5 * time.Second)
827
reboot <- struct{}{}
828
continue
829
}
830
sub, err = wsman.Subscribe(ctx, &wsmanapi.SubscribeRequest{
831
MustMatch: &wsmanapi.MetadataFilter{
832
MetaId: workspaceID,
833
},
834
})
835
if serr == nil {
836
continue
837
}
838
}
839
errCh <- xerrors.Errorf("workspace update error: %v", err)
840
return
841
}
842
843
wss = resp.GetStatus()
844
if wss.Conditions.Failed != "" && !cfg.CanFail {
845
errCh <- xerrors.Errorf("workspace instance %s failed: %s", instanceID, wss.Conditions.Failed)
846
return
847
}
848
if wss.Phase == wsmanapi.WorkspacePhase_STOPPED {
849
t.Logf("confirmed the worksapce is stopped: %s, %s", wss.Id, wss.Phase)
850
return
851
}
852
continue
853
}
854
}()
855
856
desc, err := wsman.DescribeWorkspace(ctx, &wsmanapi.DescribeWorkspaceRequest{
857
Id: instanceID,
858
})
859
if err != nil {
860
scode := status.Code(err)
861
if scode == codes.NotFound || strings.Contains(err.Error(), "not found") {
862
t.Log("for some reason, ws-manager subscriber doesn't get updated. But the workspace is gone")
863
return nil, nil
864
}
865
}
866
if desc != nil && desc.Status != nil {
867
if desc.Status.Phase == wsmanapi.WorkspacePhase_STOPPED {
868
return desc.Status, nil
869
}
870
}
871
872
for {
873
select {
874
// Consider workspace state changes during subscriber reboot
875
case <-reboot:
876
wsman, err := api.WorkspaceManager()
877
if err != nil {
878
api.ClearWorkspaceManagerClientCache()
879
continue
880
}
881
desc, err := wsman.DescribeWorkspace(ctx, &wsmanapi.DescribeWorkspaceRequest{
882
Id: instanceID,
883
})
884
if err != nil {
885
scode := status.Code(err)
886
if scode == codes.NotFound || strings.Contains(err.Error(), "not found") {
887
if notFound {
888
t.Log("for some reason, ws-manager subscriber doesn't get updated. But the workspace is gone")
889
return nil, nil
890
}
891
notFound = true
892
continue
893
}
894
}
895
notFound = false
896
if desc != nil && desc.Status != nil {
897
if desc.Status.Phase == wsmanapi.WorkspacePhase_STOPPED {
898
return desc.Status, nil
899
}
900
}
901
case err := <-errCh:
902
return nil, err
903
case <-ctx.Done():
904
return nil, xerrors.Errorf("cannot wait for workspace: %w", ctx.Err())
905
case s := <-done:
906
return s, nil
907
}
908
}
909
}
910
911
// WaitForWorkspace waits until the condition function returns true. Fails the test if the condition does
912
// not become true before the context is canceled.
913
func WaitForWorkspace(ctx context.Context, api *ComponentAPI, instanceID string, condition func(status *wsmanapi.WorkspaceStatus) bool) (lastStatus *wsmanapi.WorkspaceStatus, err error) {
914
wsman, err := api.WorkspaceManager()
915
if err != nil {
916
return
917
}
918
919
sub, err := wsman.Subscribe(ctx, &wsmanapi.SubscribeRequest{})
920
if err != nil {
921
return nil, xerrors.Errorf("cannot listen for workspace updates: %q", err)
922
}
923
924
done := make(chan *wsmanapi.WorkspaceStatus, 1)
925
errCh := make(chan error)
926
927
var once sync.Once
928
go func() {
929
var status *wsmanapi.WorkspaceStatus
930
defer func() {
931
once.Do(func() {
932
done <- status
933
close(done)
934
})
935
_ = sub.CloseSend()
936
}()
937
for {
938
resp, err := sub.Recv()
939
if err == io.EOF {
940
return
941
}
942
if err != nil {
943
errCh <- xerrors.Errorf("workspace update error: %q", err)
944
return
945
}
946
status = resp.GetStatus()
947
if status == nil {
948
continue
949
}
950
if status.Id != instanceID {
951
continue
952
}
953
954
if condition(status) {
955
return
956
}
957
}
958
}()
959
960
// maybe the workspace has started in the meantime and we've missed the update
961
desc, err := wsman.DescribeWorkspace(ctx, &wsmanapi.DescribeWorkspaceRequest{Id: instanceID})
962
if err != nil {
963
return nil, xerrors.Errorf("cannot get workspace: %q", err)
964
}
965
if condition(desc.Status) {
966
once.Do(func() { close(done) })
967
return desc.Status, nil
968
}
969
970
select {
971
case err := <-errCh:
972
return nil, err
973
case <-ctx.Done():
974
return nil, xerrors.Errorf("cannot wait for workspace: %q", ctx.Err())
975
case s := <-done:
976
return s, nil
977
}
978
}
979
980
func resolveOrBuildImage(ctx context.Context, api *ComponentAPI, baseRef string) (absref string, err error) {
981
cl, err := api.ImageBuilder()
982
if err != nil {
983
return
984
}
985
986
reslv, err := cl.ResolveWorkspaceImage(ctx, &imgbldr.ResolveWorkspaceImageRequest{
987
Source: &imgbldr.BuildSource{
988
From: &imgbldr.BuildSource_Ref{
989
Ref: &imgbldr.BuildSourceReference{
990
Ref: baseRef,
991
},
992
},
993
},
994
Auth: &imgbldr.BuildRegistryAuth{
995
Mode: &imgbldr.BuildRegistryAuth_Total{
996
Total: &imgbldr.BuildRegistryAuthTotal{
997
AllowAll: true,
998
},
999
},
1000
},
1001
})
1002
if err != nil {
1003
return
1004
}
1005
1006
if reslv.Status == imgbldr.BuildStatus_done_success {
1007
return reslv.Ref, nil
1008
}
1009
1010
bld, err := cl.Build(ctx, &imgbldr.BuildRequest{
1011
TriggeredBy: "integration-test",
1012
Source: &imgbldr.BuildSource{
1013
From: &imgbldr.BuildSource_Ref{
1014
Ref: &imgbldr.BuildSourceReference{
1015
Ref: baseRef,
1016
},
1017
},
1018
},
1019
Auth: &imgbldr.BuildRegistryAuth{
1020
Mode: &imgbldr.BuildRegistryAuth_Total{
1021
Total: &imgbldr.BuildRegistryAuthTotal{
1022
AllowAll: true,
1023
},
1024
},
1025
},
1026
})
1027
if err != nil {
1028
return
1029
}
1030
1031
for {
1032
resp, err := bld.Recv()
1033
if err != nil {
1034
return "", err
1035
}
1036
1037
if resp.Status == imgbldr.BuildStatus_done_success {
1038
break
1039
} else if resp.Status == imgbldr.BuildStatus_done_failure {
1040
return "", xerrors.Errorf("cannot build workspace image: %s", resp.Message)
1041
}
1042
}
1043
1044
return reslv.Ref, nil
1045
}
1046
1047
// DeleteWorkspace cleans up a workspace started during an integration test
1048
func DeleteWorkspace(ctx context.Context, api *ComponentAPI, instanceID string) error {
1049
wm, err := api.WorkspaceManager()
1050
if err != nil {
1051
return err
1052
}
1053
1054
_, err = wm.StopWorkspace(ctx, &wsmanapi.StopWorkspaceRequest{
1055
Id: instanceID,
1056
})
1057
if err != nil {
1058
s, ok := status.FromError(err)
1059
if ok && s.Code() == codes.NotFound {
1060
// Workspace is already gone.
1061
return nil
1062
}
1063
return err
1064
}
1065
return nil
1066
}
1067
1068