Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/dev/loadgen/pkg/observer/success.go
2499 views
1
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package observer
6
7
import (
8
"context"
9
"fmt"
10
"strings"
11
"sync"
12
"time"
13
14
"github.com/gitpod-io/gitpod/common-go/log"
15
"github.com/gitpod-io/gitpod/loadgen/pkg/loadgen"
16
"github.com/gitpod-io/gitpod/ws-manager/api"
17
)
18
19
type SuccessObserver struct {
20
workspaces map[string]*workspaceSuccess
21
m sync.Mutex
22
successRate float32
23
}
24
25
type workspaceSuccess struct {
26
Phase api.WorkspacePhase
27
Failed bool
28
}
29
30
func NewSuccessObserver(successRate float32) *SuccessObserver {
31
return &SuccessObserver{
32
workspaces: make(map[string]*workspaceSuccess),
33
successRate: successRate,
34
}
35
}
36
37
func (o *SuccessObserver) Observe() chan<- *loadgen.SessionEvent {
38
res := make(chan *loadgen.SessionEvent, defaultCapacity)
39
40
go func() {
41
for evt := range res {
42
switch evt.Kind {
43
case loadgen.SessionWorkspaceStart:
44
o.m.Lock()
45
o.workspaces[evt.WorkspaceStart.Spec.Id] = &workspaceSuccess{}
46
o.m.Unlock()
47
48
case loadgen.SessionWorkspaceUpdate:
49
{
50
up := evt.WorkspaceUpdate.Update
51
o.m.Lock()
52
ws, ok := o.workspaces[up.InstanceID]
53
o.m.Unlock()
54
if !ok {
55
continue
56
}
57
58
ws.Phase = up.Phase
59
ws.Failed = up.Failed
60
}
61
}
62
}
63
}()
64
65
return res
66
}
67
68
func (o *SuccessObserver) Wait(ctx context.Context, expected int) error {
69
ticker := time.NewTicker(2 * time.Second)
70
defer ticker.Stop()
71
72
for {
73
select {
74
case <-ticker.C:
75
o.m.Lock()
76
running := 0
77
var stopped []string
78
for id, ws := range o.workspaces {
79
switch ws.Phase {
80
case api.WorkspacePhase_RUNNING:
81
running += 1
82
case api.WorkspacePhase_STOPPED:
83
stopped = append(stopped, id)
84
}
85
}
86
87
if float32(running) >= float32(len(o.workspaces))*o.successRate {
88
return nil
89
}
90
91
// Quit early if too many workspaces have stopped already. They'll never become ready.
92
maxRunning := len(o.workspaces) - len(stopped)
93
if float32(maxRunning) < float32(len(o.workspaces))*o.successRate {
94
return fmt.Errorf("too many workspaces in stopped state (%d), will never get enough ready workspaces. Stopped workspaces: %v", len(stopped), strings.Join(stopped, ", "))
95
}
96
97
o.m.Unlock()
98
case <-ctx.Done():
99
o.m.Lock()
100
log.Warnf("workspaces did not get ready in time. Expected %v workspaces, did see %v", expected, len(o.workspaces))
101
for id, ws := range o.workspaces {
102
if ws.Phase != api.WorkspacePhase_RUNNING {
103
log.Warnf("workspace %s is in phase %v", id, ws.Phase)
104
}
105
}
106
o.m.Unlock()
107
return fmt.Errorf("timeout waiting for workspace to get ready")
108
}
109
}
110
}
111
112