Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/image-builder-mk3/pkg/orchestrator/monitor.go
2500 views
1
// Copyright (c) 2021 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package orchestrator
6
7
import (
8
"context"
9
"encoding/json"
10
"errors"
11
"fmt"
12
"io"
13
"io/ioutil"
14
"net/http"
15
"strings"
16
"sync"
17
"time"
18
19
wsmanapi "github.com/gitpod-io/gitpod/ws-manager/api"
20
"github.com/hashicorp/go-retryablehttp"
21
"golang.org/x/xerrors"
22
23
"github.com/gitpod-io/gitpod/common-go/log"
24
"github.com/gitpod-io/gitpod/common-go/tracing"
25
"github.com/gitpod-io/gitpod/image-builder/api"
26
)
27
28
const (
29
annotationRef = "ref"
30
annotationBaseRef = "baseref"
31
annotationManagedBy = "managed-by"
32
)
33
34
type orchestrator interface {
35
PublishStatus(buildID string, resp *api.BuildResponse)
36
PublishLog(buildID string, message string)
37
}
38
39
func newBuildMonitor(o orchestrator, wsman wsmanapi.WorkspaceManagerClient) *buildMonitor {
40
return &buildMonitor{
41
O: o,
42
wsman: wsman,
43
runningBuilds: make(map[string]*runningBuild),
44
logs: map[string]context.CancelFunc{},
45
}
46
}
47
48
type buildMonitor struct {
49
O orchestrator
50
51
wsman wsmanapi.WorkspaceManagerClient
52
runningBuilds map[string]*runningBuild
53
runningBuildsMu sync.RWMutex
54
55
logs map[string]context.CancelFunc
56
}
57
58
type runningBuild struct {
59
Info api.BuildInfo
60
Logs buildLogs
61
}
62
63
type buildLogs struct {
64
IdeURL string
65
OwnerToken string
66
}
67
68
// Run subscribes to the ws-manager, listens for build updates and distributes them internally
69
func (m *buildMonitor) Run() {
70
ctx := context.Background()
71
for {
72
wss, err := m.wsman.GetWorkspaces(ctx, &wsmanapi.GetWorkspacesRequest{
73
MustMatch: &wsmanapi.MetadataFilter{
74
Annotations: map[string]string{
75
annotationManagedBy: buildWorkspaceManagerID,
76
},
77
},
78
})
79
if err != nil {
80
log.WithError(err).Info("cannot get running builds from ws-manager - retrying")
81
time.Sleep(1 * time.Second)
82
continue
83
}
84
85
m.runningBuildsMu.Lock()
86
m.runningBuilds = make(map[string]*runningBuild, len(wss.Status))
87
m.runningBuildsMu.Unlock()
88
for _, ws := range wss.Status {
89
m.handleStatusUpdate(ws)
90
}
91
92
sub, err := m.wsman.Subscribe(ctx, &wsmanapi.SubscribeRequest{
93
MustMatch: &wsmanapi.MetadataFilter{
94
Annotations: map[string]string{
95
annotationManagedBy: buildWorkspaceManagerID,
96
},
97
},
98
})
99
if err != nil {
100
log.WithError(err).Info("connection to ws-manager lost - retrying")
101
time.Sleep(5 * time.Second)
102
continue
103
}
104
105
for {
106
msg, err := sub.Recv()
107
if err != nil {
108
log.WithError(err).Info("connection to ws-manager lost - retrying")
109
time.Sleep(1 * time.Second)
110
break
111
}
112
113
status := msg.GetStatus()
114
if status == nil {
115
continue
116
}
117
118
m.handleStatusUpdate(status)
119
}
120
}
121
}
122
123
func (m *buildMonitor) handleStatusUpdate(status *wsmanapi.WorkspaceStatus) {
124
var (
125
bld = extractRunningBuild(status)
126
resp = extractBuildResponse(status)
127
)
128
m.runningBuildsMu.Lock()
129
if resp.Status != api.BuildStatus_running {
130
delete(m.runningBuilds, status.Id)
131
} else {
132
m.runningBuilds[status.Id] = bld
133
}
134
m.runningBuildsMu.Unlock()
135
136
m.O.PublishStatus(status.Id, resp)
137
138
// handleStatusUpdate is called from a single go-routine, hence there's no need to synchronize
139
// access to m.logs
140
if bld.Info.Status == api.BuildStatus_running {
141
if _, ok := m.logs[status.Id]; !ok {
142
// we don't have a headless log listener yet, but need one
143
ctx, cancel := context.WithCancel(context.Background())
144
go listenToHeadlessLogs(ctx, bld.Logs.IdeURL, bld.Logs.OwnerToken, m.handleHeadlessLogs(status.Id))
145
m.logs[status.Id] = cancel
146
}
147
} else {
148
if cancel, ok := m.logs[status.Id]; ok {
149
// we have a headless log listener, and need to stop it
150
cancel()
151
delete(m.logs, status.Id)
152
}
153
}
154
}
155
156
func (m *buildMonitor) handleHeadlessLogs(buildID string) listenToHeadlessLogsCallback {
157
return func(content []byte, err error) {
158
if err != nil && !errors.Is(err, context.Canceled) {
159
log.WithError(err).WithField("buildID", buildID).Warn("headless log listener failed")
160
m.O.PublishLog(buildID, "Build log listener failed. The image build is still running, but you won't see any log output.")
161
return
162
}
163
164
if len(content) > 0 {
165
m.O.PublishLog(buildID, string(content))
166
}
167
}
168
}
169
170
var errOutOfRetries = xerrors.Errorf("out of retries")
171
172
// retry makes multiple attempts to execute op if op returns an UNAVAILABLE gRPC status code
173
func retry(ctx context.Context, op func(ctx context.Context) error, retry func(err error) bool, initialBackoff time.Duration, retries int) (err error) {
174
span, ctx := tracing.FromContext(ctx, "retryIfUnavailable")
175
defer tracing.FinishSpan(span, &err)
176
177
for i := 0; i < retries; i++ {
178
err := op(ctx)
179
span.LogKV("attempt", i)
180
181
if retry(err) {
182
time.Sleep(initialBackoff * time.Duration(1+i))
183
continue
184
}
185
if err != nil {
186
return err
187
}
188
return nil
189
}
190
191
// we've maxed out our retry attempts
192
return errOutOfRetries
193
}
194
195
func extractBuildStatus(status *wsmanapi.WorkspaceStatus) *api.BuildInfo {
196
s := api.BuildStatus_running
197
if status.Phase == wsmanapi.WorkspacePhase_STOPPING || status.Phase == wsmanapi.WorkspacePhase_STOPPED {
198
if status.Conditions.Failed == "" && status.Conditions.HeadlessTaskFailed == "" {
199
s = api.BuildStatus_done_success
200
} else {
201
s = api.BuildStatus_done_failure
202
}
203
}
204
205
return &api.BuildInfo{
206
BuildId: status.Metadata.MetaId,
207
Ref: status.Metadata.Annotations[annotationRef],
208
BaseRef: status.Metadata.Annotations[annotationBaseRef],
209
Status: s,
210
StartedAt: status.Metadata.StartedAt.Seconds,
211
LogInfo: &api.LogInfo{
212
Url: status.Spec.Url,
213
Headers: map[string]string{
214
"x-gitpod-owner-token": status.Auth.OwnerToken,
215
},
216
},
217
}
218
}
219
220
func extractRunningBuild(status *wsmanapi.WorkspaceStatus) *runningBuild {
221
return &runningBuild{
222
Info: *extractBuildStatus(status),
223
Logs: buildLogs{
224
IdeURL: status.Spec.Url,
225
OwnerToken: status.Auth.OwnerToken,
226
},
227
}
228
}
229
230
func extractBuildResponse(status *wsmanapi.WorkspaceStatus) *api.BuildResponse {
231
var (
232
info = extractBuildStatus(status)
233
msg = status.Message
234
)
235
if status.Phase == wsmanapi.WorkspacePhase_STOPPING || status.Phase == wsmanapi.WorkspacePhase_STOPPED {
236
if status.Conditions.Failed != "" {
237
msg = status.Conditions.Failed
238
} else if status.Conditions.HeadlessTaskFailed != "" {
239
msg = status.Conditions.HeadlessTaskFailed
240
}
241
}
242
243
return &api.BuildResponse{
244
Ref: info.Ref, // set for backwards compatibilty - new clients should consume Info
245
BaseRef: info.BaseRef, // set for backwards compatibilty - new clients should consume Info
246
Status: info.Status,
247
Message: msg,
248
Info: info,
249
}
250
}
251
252
func (m *buildMonitor) GetAllRunningBuilds(ctx context.Context) (res []*runningBuild, err error) {
253
m.runningBuildsMu.RLock()
254
defer m.runningBuildsMu.RUnlock()
255
256
res = make([]*runningBuild, 0, len(m.runningBuilds))
257
for _, ws := range m.runningBuilds {
258
res = append(res, ws)
259
}
260
261
return
262
}
263
264
func (m *buildMonitor) RegisterNewBuild(buildID string, ref, baseRef, url, ownerToken string) {
265
m.runningBuildsMu.Lock()
266
defer m.runningBuildsMu.Unlock()
267
268
bld := &runningBuild{
269
Info: api.BuildInfo{
270
BuildId: buildID,
271
Ref: ref,
272
BaseRef: baseRef,
273
Status: api.BuildStatus_running,
274
StartedAt: time.Now().Unix(),
275
},
276
Logs: buildLogs{
277
IdeURL: url,
278
OwnerToken: ownerToken,
279
},
280
}
281
m.runningBuilds[buildID] = bld
282
log.WithField("build", bld).WithField("buildID", buildID).Debug("new build registered")
283
}
284
285
type listenToHeadlessLogsCallback func(content []byte, err error)
286
287
func listenToHeadlessLogs(ctx context.Context, url, authToken string, callback listenToHeadlessLogsCallback) {
288
var err error
289
defer func() {
290
if err != nil {
291
callback(nil, err)
292
}
293
}()
294
295
var (
296
logURL string
297
noTerminalErr = fmt.Errorf("no terminal")
298
)
299
err = retry(ctx, func(ctx context.Context) error {
300
logURL, err = findTaskLogURL(ctx, url, authToken)
301
if err != nil {
302
return err
303
}
304
if strings.HasSuffix(logURL, "/listen/") {
305
return noTerminalErr
306
}
307
return nil
308
}, func(err error) bool {
309
return err == noTerminalErr
310
}, 1*time.Second, 10)
311
if err != nil {
312
return
313
}
314
log.WithField("logURL", logURL).Debug("found log URL")
315
callback([]byte("Connecting to log output ...\n"), nil)
316
317
req, err := http.NewRequestWithContext(ctx, "GET", logURL, nil)
318
if err != nil {
319
return
320
}
321
req.Header.Set("x-gitpod-owner-token", authToken)
322
req.Header.Set("Cache", "no-cache")
323
324
client := retryablehttp.NewClient()
325
client.HTTPClient = &http.Client{
326
Timeout: 2 * time.Second,
327
}
328
329
resp, err := client.StandardClient().Do(req)
330
if err != nil {
331
return
332
}
333
log.WithField("logURL", logURL).Debug("terminal log response received")
334
callback([]byte("Connected to log output ...\n"), nil)
335
_ = resp.Body.Close()
336
337
resp, err = http.DefaultClient.Do(req)
338
if err != nil {
339
return
340
}
341
defer resp.Body.Close()
342
343
var line struct {
344
Result struct {
345
Data []byte `json:"data"`
346
} `json:"result"`
347
}
348
349
dec := json.NewDecoder(resp.Body)
350
for err == nil {
351
err = dec.Decode(&line)
352
if errors.Is(err, io.EOF) {
353
// EOF is not an error in this case
354
err = nil
355
break
356
}
357
if err != nil {
358
break
359
}
360
361
callback(line.Result.Data, nil)
362
}
363
}
364
365
func findTaskLogURL(ctx context.Context, ideURL, authToken string) (taskLogURL string, err error) {
366
ideURL = strings.TrimSuffix(ideURL, "/")
367
tasksURL := ideURL + "/_supervisor/v1/status/tasks"
368
req, err := http.NewRequestWithContext(ctx, "GET", tasksURL, nil)
369
if err != nil {
370
return "", err
371
}
372
req.Header.Set("x-gitpod-owner-token", authToken)
373
req.Header.Set("Cache", "no-cache")
374
375
client := retryablehttp.NewClient()
376
client.RetryMax = 10
377
client.Logger = nil
378
client.CheckRetry = func(ctx context.Context, resp *http.Response, err error) (bool, error) {
379
if errors.Is(err, io.EOF) {
380
// the network is not reliable
381
return true, nil
382
}
383
if err != nil && strings.Contains(err.Error(), "received non-200 status") {
384
// gRPC-web race in supervisor?
385
return true, nil
386
}
387
if resp != nil && resp.StatusCode == http.StatusNotFound {
388
// We're too quick to connect - ws-proxy doesn't keep up
389
return true, nil
390
}
391
return retryablehttp.DefaultRetryPolicy(ctx, resp, err)
392
}
393
394
resp, err := client.StandardClient().Do(req)
395
if err != nil {
396
return "", xerrors.Errorf("cannot connect to supervisor: %w", err)
397
}
398
defer resp.Body.Close()
399
400
if resp.StatusCode != http.StatusOK {
401
return "", xerrors.Errorf("received non-200 status from %s: %v", tasksURL, resp.StatusCode)
402
}
403
404
msg, err := ioutil.ReadAll(resp.Body)
405
if err != nil {
406
return "", err
407
}
408
409
var respb struct {
410
Result struct {
411
Tasks []struct {
412
Terminal string `json:"terminal"`
413
} `json:"tasks"`
414
} `json:"result"`
415
}
416
err = json.Unmarshal(msg, &respb)
417
if err != nil {
418
return "", xerrors.Errorf("cannot decode supervisor status response: %w", err)
419
}
420
421
if len(respb.Result.Tasks) == 0 {
422
return "", xerrors.Errorf("build workspace has no tasks")
423
}
424
return fmt.Sprintf("%s/_supervisor/v1/terminal/listen/%s", ideURL, respb.Result.Tasks[0].Terminal), nil
425
}
426
427