Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/ws-daemon/pkg/controller/workspace_operations.go
2500 views
1
// Copyright (c) 2023 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package controller
6
7
import (
8
"context"
9
"encoding/json"
10
"errors"
11
"fmt"
12
"io/fs"
13
"os"
14
"path/filepath"
15
"time"
16
17
"github.com/gitpod-io/gitpod/common-go/log"
18
glog "github.com/gitpod-io/gitpod/common-go/log"
19
"github.com/gitpod-io/gitpod/common-go/tracing"
20
csapi "github.com/gitpod-io/gitpod/content-service/api"
21
"github.com/gitpod-io/gitpod/content-service/pkg/archive"
22
wsinit "github.com/gitpod-io/gitpod/content-service/pkg/initializer"
23
"github.com/gitpod-io/gitpod/content-service/pkg/logs"
24
"github.com/gitpod-io/gitpod/content-service/pkg/storage"
25
"github.com/gitpod-io/gitpod/ws-daemon/pkg/content"
26
"github.com/gitpod-io/gitpod/ws-daemon/pkg/dispatch"
27
"github.com/gitpod-io/gitpod/ws-daemon/pkg/internal/session"
28
workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1"
29
"github.com/opentracing/opentracing-go"
30
"github.com/prometheus/client_golang/prometheus"
31
"github.com/sirupsen/logrus"
32
"golang.org/x/xerrors"
33
)
34
35
type Metrics struct {
36
BackupWaitingTimeHist prometheus.Histogram
37
BackupWaitingTimeoutCounter prometheus.Counter
38
InitializerHistogram *prometheus.HistogramVec
39
}
40
41
func registerConcurrentBackupMetrics(reg prometheus.Registerer, suffix string) (prometheus.Histogram, prometheus.Counter, error) {
42
backupWaitingTime := prometheus.NewHistogram(prometheus.HistogramOpts{
43
Name: "concurrent_backup_waiting_seconds" + suffix,
44
Help: "waiting time for concurrent backups to finish",
45
Buckets: []float64{5, 10, 30, 60, 120, 180, 300, 600, 1800},
46
})
47
48
err := reg.Register(backupWaitingTime)
49
if err != nil {
50
return nil, nil, xerrors.Errorf("cannot register Prometheus histogram for backup waiting time: %w", err)
51
}
52
53
waitingTimeoutCounter := prometheus.NewCounter(prometheus.CounterOpts{
54
Name: "concurrent_backup_waiting_timeout_total" + suffix,
55
Help: "total count of backup rate limiting timeouts",
56
})
57
err = reg.Register(waitingTimeoutCounter)
58
if err != nil {
59
return nil, nil, xerrors.Errorf("cannot register Prometheus counter for backup waiting timeouts: %w", err)
60
}
61
62
return backupWaitingTime, waitingTimeoutCounter, nil
63
}
64
65
//go:generate sh -c "go install github.com/golang/mock/[email protected] && mockgen -destination=mock.go -package=controller . WorkspaceOperations"
66
type WorkspaceOperations interface {
67
// InitWorkspace initializes the workspace content
68
InitWorkspace(ctx context.Context, options InitOptions) (*csapi.InitializerMetrics, string, error)
69
// BackupWorkspace backups the content of the workspace
70
BackupWorkspace(ctx context.Context, opts BackupOptions) (*csapi.GitStatus, error)
71
// DeleteWorkspace deletes the content of the workspace from disk
72
DeleteWorkspace(ctx context.Context, instanceID string) error
73
// WipeWorkspace deletes all references to the workspace. Does not fail if parts are already gone, or state is incosistent.
74
WipeWorkspace(ctx context.Context, instanceID string) error
75
// SnapshotIDs generates the name and url for a snapshot
76
SnapshotIDs(ctx context.Context, instanceID string) (snapshotUrl, snapshotName string, err error)
77
// Snapshot takes a snapshot of the workspace
78
Snapshot(ctx context.Context, instanceID, snapshotName string) (err error)
79
// Setup ensures that the workspace has been setup
80
SetupWorkspace(ctx context.Context, instanceID string, imageInfo *workspacev1.WorkspaceImageInfo) error
81
}
82
83
type DefaultWorkspaceOperations struct {
84
config content.Config
85
provider *WorkspaceProvider
86
backupWorkspaceLimiter chan struct{}
87
metrics *Metrics
88
dispatch *dispatch.Dispatch
89
}
90
91
var _ WorkspaceOperations = (*DefaultWorkspaceOperations)(nil)
92
93
type WorkspaceMeta struct {
94
Owner string
95
WorkspaceID string
96
InstanceID string
97
}
98
99
type InitOptions struct {
100
Meta WorkspaceMeta
101
Initializer *csapi.WorkspaceInitializer
102
Headless bool
103
StorageQuota int
104
}
105
106
type BackupOptions struct {
107
Meta WorkspaceMeta
108
BackupLogs bool
109
UpdateGitStatus bool
110
SnapshotName string
111
SkipBackupContent bool
112
}
113
114
func NewWorkspaceOperations(config content.Config, provider *WorkspaceProvider, reg prometheus.Registerer, dispatch *dispatch.Dispatch) (WorkspaceOperations, error) {
115
waitingTimeHist, waitingTimeoutCounter, err := registerConcurrentBackupMetrics(reg, "_mk2")
116
if err != nil {
117
return nil, err
118
}
119
120
return &DefaultWorkspaceOperations{
121
config: config,
122
provider: provider,
123
metrics: &Metrics{
124
BackupWaitingTimeHist: waitingTimeHist,
125
BackupWaitingTimeoutCounter: waitingTimeoutCounter,
126
},
127
// we permit five concurrent backups at any given time, hence the five in the channel
128
backupWorkspaceLimiter: make(chan struct{}, 5),
129
dispatch: dispatch,
130
}, nil
131
}
132
133
func (wso *DefaultWorkspaceOperations) InitWorkspace(ctx context.Context, options InitOptions) (*csapi.InitializerMetrics, string, error) {
134
ws, err := wso.provider.NewWorkspace(ctx, options.Meta.InstanceID, filepath.Join(wso.provider.Location, options.Meta.InstanceID),
135
wso.creator(options.Meta.Owner, options.Meta.WorkspaceID, options.Meta.InstanceID, options.Initializer, false, options.StorageQuota))
136
137
if err != nil {
138
return nil, "bug: cannot add workspace to store", xerrors.Errorf("cannot add workspace to store: %w", err)
139
}
140
141
rs, ok := ws.NonPersistentAttrs[session.AttrRemoteStorage].(storage.DirectAccess)
142
if rs == nil || !ok {
143
return nil, "bug: workspace has no remote storage", xerrors.Errorf("workspace has no remote storage")
144
}
145
ps, err := storage.NewPresignedAccess(&wso.config.Storage)
146
if err != nil {
147
return nil, "bug: no presigned storage available", xerrors.Errorf("no presigned storage available: %w", err)
148
}
149
150
remoteContent, err := content.CollectRemoteContent(ctx, rs, ps, options.Meta.Owner, options.Initializer)
151
if err != nil {
152
return nil, "remote content error", xerrors.Errorf("remote content error: %w", err)
153
}
154
155
// Initialize workspace.
156
// FWB workspaces initialize without the help of ws-daemon, but using their supervisor or the registry-facade.
157
opts := content.RunInitializerOpts{
158
Command: wso.config.Initializer.Command,
159
Args: wso.config.Initializer.Args,
160
// This is a bit of a hack as it makes hard assumptions about the nature of the UID mapping.
161
// Also, we cannot do this in wsinit because we're dropping all the privileges that would be
162
// required for this operation.
163
//
164
// With FWB this bit becomes unneccesary.
165
UID: (wsinit.GitpodUID + 100000 - 1),
166
GID: (wsinit.GitpodGID + 100000 - 1),
167
IdMappings: []archive.IDMapping{
168
{ContainerID: 0, HostID: wsinit.GitpodUID, Size: 1},
169
{ContainerID: 1, HostID: 100000, Size: 65534},
170
},
171
OWI: content.OWI{
172
Owner: options.Meta.Owner,
173
WorkspaceID: options.Meta.WorkspaceID,
174
InstanceID: options.Meta.InstanceID,
175
},
176
}
177
178
err = ensureCleanSlate(ws.Location)
179
if err != nil {
180
glog.WithFields(ws.OWI()).Warnf("cannot ensure clean slate for workspace %s (this might break content init): %v", ws.InstanceID, err)
181
}
182
183
stats, err := content.RunInitializer(ctx, ws.Location, options.Initializer, remoteContent, opts)
184
if err != nil {
185
glog.WithFields(ws.OWI()).Infof("error running initializer %v", err)
186
return nil, err.Error(), err
187
}
188
189
err = ws.Persist()
190
if err != nil {
191
return nil, "cannot persist workspace", err
192
}
193
194
glog.WithFields(ws.OWI()).Debug("content init done")
195
196
return stats, "", nil
197
}
198
199
func (wso *DefaultWorkspaceOperations) creator(owner, workspaceID, instanceID string, init *csapi.WorkspaceInitializer, storageDisabled bool, storageQuota int) WorkspaceFactory {
200
var checkoutLocation string
201
allLocations := csapi.GetCheckoutLocationsFromInitializer(init)
202
if len(allLocations) > 0 {
203
checkoutLocation = allLocations[0]
204
}
205
206
serviceDirName := instanceID + "-daemon"
207
return func(ctx context.Context, location string) (res *session.Workspace, err error) {
208
return &session.Workspace{
209
Location: location,
210
CheckoutLocation: checkoutLocation,
211
CreatedAt: time.Now(),
212
Owner: owner,
213
WorkspaceID: workspaceID,
214
InstanceID: instanceID,
215
RemoteStorageDisabled: storageDisabled,
216
StorageQuota: storageQuota,
217
218
ServiceLocDaemon: filepath.Join(wso.config.WorkingArea, serviceDirName),
219
ServiceLocNode: filepath.Join(wso.config.WorkingAreaNode, serviceDirName),
220
}, nil
221
}
222
}
223
224
func (wso *DefaultWorkspaceOperations) SetupWorkspace(ctx context.Context, instanceID string, imageInfo *workspacev1.WorkspaceImageInfo) error {
225
ws, err := wso.provider.GetAndConnect(ctx, instanceID)
226
if err != nil {
227
return fmt.Errorf("cannot setup workspace %s: %w", instanceID, err)
228
}
229
err = wso.writeImageInfo(ctx, ws, imageInfo)
230
if err != nil {
231
glog.WithError(err).WithFields(ws.OWI()).Error("cannot write image info")
232
}
233
return nil
234
}
235
236
func (wso *DefaultWorkspaceOperations) BackupWorkspace(ctx context.Context, opts BackupOptions) (*csapi.GitStatus, error) {
237
ws, err := wso.provider.GetAndConnect(ctx, opts.Meta.InstanceID)
238
if err != nil {
239
return nil, fmt.Errorf("cannot find workspace %s during DisposeWorkspace: %w", opts.Meta.InstanceID, err)
240
}
241
242
if ws.RemoteStorageDisabled {
243
return nil, fmt.Errorf("workspace has no remote storage")
244
}
245
246
if opts.BackupLogs {
247
err := wso.uploadWorkspaceLogs(ctx, opts, ws.Location)
248
if err != nil {
249
// we do not fail the workspace yet because we still might succeed with its content!
250
glog.WithError(err).WithFields(ws.OWI()).Error("log backup failed")
251
}
252
}
253
254
if opts.SkipBackupContent {
255
return nil, nil
256
}
257
258
err = wso.uploadWorkspaceContent(ctx, ws, opts.SnapshotName)
259
if err != nil {
260
glog.WithError(err).WithFields(ws.OWI()).Error("final backup failed for workspace")
261
return nil, fmt.Errorf("final backup failed for workspace %s", opts.Meta.InstanceID)
262
}
263
264
var repo *csapi.GitStatus
265
if opts.UpdateGitStatus {
266
// Update the git status prior to deleting the workspace
267
repo, err = ws.UpdateGitStatus(ctx)
268
if err != nil {
269
// do not fail workspace because we were unable to get git status
270
// which can happen for various reasons, including user corrupting his .git folder somehow
271
// instead we log the error and continue cleaning up workspace
272
// todo(pavel): it would be great if we can somehow bubble this up to user without failing workspace
273
glog.WithError(err).WithFields(ws.OWI()).Warn("cannot get git status")
274
}
275
}
276
277
return repo, nil
278
}
279
280
func (wso *DefaultWorkspaceOperations) DeleteWorkspace(ctx context.Context, instanceID string) error {
281
ws, err := wso.provider.GetAndConnect(ctx, instanceID)
282
if err != nil {
283
return fmt.Errorf("cannot find workspace %s during DisposeWorkspace: %w", instanceID, err)
284
}
285
286
if err = ws.Dispose(ctx, wso.provider.hooks[session.WorkspaceDisposed]); err != nil {
287
glog.WithError(err).WithFields(ws.OWI()).Error("cannot dispose session")
288
return err
289
}
290
291
// remove workspace daemon directory in the node
292
if err := os.RemoveAll(ws.ServiceLocDaemon); err != nil {
293
glog.WithError(err).WithFields(ws.OWI()).Error("cannot delete workspace daemon directory")
294
return err
295
}
296
wso.provider.Remove(ctx, instanceID)
297
298
return nil
299
}
300
301
func (wso *DefaultWorkspaceOperations) WipeWorkspace(ctx context.Context, instanceID string) error {
302
log := log.New().WithContext(ctx)
303
304
ws, err := wso.provider.GetAndConnect(ctx, instanceID)
305
if err != nil {
306
// we have to assume everything is fine, and this workspace has already been completely wiped
307
return nil
308
}
309
log = log.WithFields(ws.OWI())
310
311
// mark this session as being wiped
312
ws.DoWipe = true
313
314
if err = ws.Dispose(ctx, wso.provider.hooks[session.WorkspaceDisposed]); err != nil {
315
log.WithError(err).Error("cannot dispose session")
316
return err
317
}
318
319
// dispose all running "dispatch handlers", e.g. all code running on the "pod informer"-triggered part of ws-daemon
320
wso.dispatch.DisposeWorkspace(ctx, instanceID)
321
322
// remove workspace daemon directory in the node
323
removedChan := make(chan struct{}, 1)
324
go func() {
325
defer close(removedChan)
326
327
if err := os.RemoveAll(ws.ServiceLocDaemon); err != nil {
328
log.WithError(err).Warn("cannot delete workspace daemon directory, leaving it dangling...")
329
}
330
}()
331
332
// We never want the "RemoveAll" to block the workspace from being delete, so we'll resort to make this a best-effort approach, and time out after 10s.
333
timeout := time.NewTicker(10 * time.Second)
334
defer timeout.Stop()
335
select {
336
case <-timeout.C:
337
case <-removedChan:
338
log.Debug("successfully removed workspace daemon directory")
339
}
340
341
// remove the reference from the WorkspaceProvider, e.g. the "workspace controller" part of ws-daemon
342
wso.provider.Remove(ctx, instanceID)
343
344
return nil
345
}
346
347
func (wso *DefaultWorkspaceOperations) SnapshotIDs(ctx context.Context, instanceID string) (snapshotUrl, snapshotName string, err error) {
348
sess, err := wso.provider.GetAndConnect(ctx, instanceID)
349
if err != nil {
350
return "", "", fmt.Errorf("cannot find workspace %s during SnapshotName: %w", instanceID, err)
351
}
352
353
baseName := fmt.Sprintf("snapshot-%d", time.Now().UnixNano())
354
snapshotName = baseName + ".tar"
355
356
rs, ok := sess.NonPersistentAttrs[session.AttrRemoteStorage].(storage.DirectAccess)
357
if rs == nil || !ok {
358
return "", "", fmt.Errorf("no remote storage configured")
359
}
360
361
return rs.Qualify(snapshotName), snapshotName, nil
362
}
363
364
func (wso *DefaultWorkspaceOperations) Snapshot(ctx context.Context, workspaceID, snapshotName string) (err error) {
365
//nolint:ineffassign
366
span, ctx := opentracing.StartSpanFromContext(ctx, "TakeSnapshot")
367
span.SetTag("workspace", workspaceID)
368
defer tracing.FinishSpan(span, &err)
369
370
if workspaceID == "" {
371
return fmt.Errorf("workspaceID is required")
372
}
373
374
ws, err := wso.provider.GetAndConnect(ctx, workspaceID)
375
if err != nil {
376
return fmt.Errorf("cannot find workspace %s during DisposeWorkspace", workspaceID)
377
}
378
379
if ws.RemoteStorageDisabled {
380
return fmt.Errorf("workspace has no remote storage")
381
}
382
383
err = wso.uploadWorkspaceContent(ctx, ws, snapshotName)
384
if err != nil {
385
glog.WithError(err).WithFields(ws.OWI()).Error("snapshot failed for workspace")
386
return fmt.Errorf("snapshot failed for workspace %s", workspaceID)
387
}
388
389
return nil
390
}
391
392
func ensureCleanSlate(location string) error {
393
// do not remove the location itself but only
394
// the children
395
files, err := os.ReadDir(location)
396
if err != nil {
397
return err
398
}
399
400
for _, f := range files {
401
path := filepath.Join(location, f.Name())
402
err = os.RemoveAll(path)
403
if err != nil {
404
return err
405
}
406
}
407
408
return nil
409
}
410
411
func (wso *DefaultWorkspaceOperations) uploadWorkspaceLogs(ctx context.Context, opts BackupOptions, location string) (err error) {
412
// currently we're only uploading prebuild log files
413
logFiles, err := logs.ListPrebuildLogFiles(ctx, location)
414
if err != nil {
415
return err
416
}
417
418
rs, err := storage.NewDirectAccess(&wso.config.Storage)
419
if err != nil {
420
return xerrors.Errorf("cannot use configured storage: %w", err)
421
}
422
423
err = rs.Init(ctx, opts.Meta.Owner, opts.Meta.WorkspaceID, opts.Meta.InstanceID)
424
if err != nil {
425
return xerrors.Errorf("cannot use configured storage: %w", err)
426
}
427
428
err = rs.EnsureExists(ctx)
429
if err != nil {
430
return err
431
}
432
433
for _, absLogPath := range logFiles {
434
taskID, parseErr := logs.ParseTaskIDFromPrebuildLogFilePath(absLogPath)
435
owi := glog.OWI(opts.Meta.Owner, opts.Meta.WorkspaceID, opts.Meta.InstanceID)
436
if parseErr != nil {
437
glog.WithError(parseErr).WithFields(owi).Warn("cannot parse headless workspace log file name")
438
continue
439
}
440
441
err = retryIfErr(ctx, 5, glog.WithField("op", "upload log").WithFields(owi), func(ctx context.Context) (err error) {
442
_, _, err = rs.UploadInstance(ctx, absLogPath, logs.UploadedHeadlessLogPath(taskID))
443
if err != nil {
444
return
445
}
446
447
return
448
})
449
if err != nil {
450
return xerrors.Errorf("cannot upload workspace logs: %w", err)
451
}
452
}
453
return err
454
}
455
456
func (wso *DefaultWorkspaceOperations) uploadWorkspaceContent(ctx context.Context, sess *session.Workspace, backupName string) error {
457
// Avoid too many simultaneous backups in order to avoid excessive memory utilization.
458
var timedOut bool
459
waitStart := time.Now()
460
select {
461
case wso.backupWorkspaceLimiter <- struct{}{}:
462
case <-time.After(15 * time.Minute):
463
// we timed out on the rate limit - let's upload anyways, because we don't want to actually block
464
// an upload. If we reach this point, chances are other things are broken. No upload should ever
465
// take this long.
466
timedOut = true
467
wso.metrics.BackupWaitingTimeoutCounter.Inc()
468
}
469
470
waitTime := time.Since(waitStart)
471
wso.metrics.BackupWaitingTimeHist.Observe(waitTime.Seconds())
472
473
defer func() {
474
// timeout -> we did not add to the limiter
475
if timedOut {
476
return
477
}
478
479
<-wso.backupWorkspaceLimiter
480
}()
481
482
var (
483
loc = sess.Location
484
opts []storage.UploadOption
485
)
486
487
err := os.Remove(filepath.Join(sess.Location, wsinit.WorkspaceReadyFile))
488
if err != nil && !errors.Is(err, fs.ErrNotExist) {
489
// We'll still upload the backup, well aware that the UX during restart will be broken.
490
// But it's better to have a backup with all files (albeit one too many), than having no backup at all.
491
glog.WithError(err).WithFields(sess.OWI()).Warn("cannot remove workspace ready file")
492
}
493
494
rs, ok := sess.NonPersistentAttrs[session.AttrRemoteStorage].(storage.DirectAccess)
495
if rs == nil || !ok {
496
return xerrors.Errorf("no remote storage configured")
497
}
498
499
var (
500
tmpf *os.File
501
tmpfSize int64
502
)
503
504
defer func() {
505
if tmpf != nil {
506
os.Remove(tmpf.Name())
507
}
508
}()
509
510
err = retryIfErr(ctx, wso.config.Backup.Attempts, glog.WithFields(sess.OWI()).WithField("op", "create archive"), func(ctx context.Context) (err error) {
511
tmpf, err = os.CreateTemp(wso.config.TmpDir, fmt.Sprintf("wsbkp-%s-*.tar", sess.InstanceID))
512
if err != nil {
513
return
514
}
515
516
defer func() {
517
tmpf.Close()
518
if err != nil {
519
os.Remove(tmpf.Name())
520
}
521
}()
522
523
var opts []archive.TarOption
524
opts = append(opts)
525
mappings := []archive.IDMapping{
526
{ContainerID: 0, HostID: wsinit.GitpodUID, Size: 1},
527
{ContainerID: 1, HostID: 100000, Size: 65534},
528
}
529
opts = append(opts,
530
archive.WithUIDMapping(mappings),
531
archive.WithGIDMapping(mappings),
532
)
533
534
err = content.BuildTarbal(ctx, loc, tmpf.Name(), opts...)
535
if err != nil {
536
return
537
}
538
err = tmpf.Sync()
539
if err != nil {
540
return
541
}
542
_, err = tmpf.Seek(0, 0)
543
if err != nil {
544
return
545
}
546
547
stat, err := tmpf.Stat()
548
if err != nil {
549
return
550
}
551
tmpfSize = stat.Size()
552
glog.WithField("size", tmpfSize).WithField("location", tmpf.Name()).WithFields(sess.OWI()).Debug("created temp file for workspace backup upload")
553
554
return
555
})
556
if err != nil {
557
return xerrors.Errorf("cannot create archive: %w", err)
558
}
559
560
err = retryIfErr(ctx, wso.config.Backup.Attempts, glog.WithFields(sess.OWI()).WithField("op", "upload layer"), func(ctx context.Context) (err error) {
561
_, _, err = rs.Upload(ctx, tmpf.Name(), backupName, opts...)
562
if err != nil {
563
return
564
}
565
566
return
567
})
568
if err != nil {
569
return xerrors.Errorf("cannot upload workspace content: %w", err)
570
}
571
572
return nil
573
}
574
575
func (wso *DefaultWorkspaceOperations) writeImageInfo(_ context.Context, ws *session.Workspace, imageInfo *workspacev1.WorkspaceImageInfo) error {
576
if imageInfo == nil {
577
return nil
578
}
579
580
b, err := json.Marshal(imageInfo)
581
if err != nil {
582
return fmt.Errorf("cannot marshal image info: %w", err)
583
}
584
uid := (wsinit.GitpodUID + 100000 - 1)
585
gid := (wsinit.GitpodGID + 100000 - 1)
586
fp := filepath.Join(ws.Location, ".gitpod/image")
587
err = os.WriteFile(fp, b, 0644)
588
if err != nil {
589
return fmt.Errorf("cannot write image info: %w", err)
590
}
591
os.Chown(fp, uid, gid)
592
return nil
593
}
594
595
func retryIfErr(ctx context.Context, attempts int, log *logrus.Entry, op func(ctx context.Context) error) (err error) {
596
//nolint:ineffassign
597
span, ctx := opentracing.StartSpanFromContext(ctx, "retryIfErr")
598
defer tracing.FinishSpan(span, &err)
599
for k, v := range log.Data {
600
span.LogKV(k, v)
601
}
602
603
if attempts == 0 {
604
attempts = 1
605
}
606
607
backoff := 1 * time.Second
608
for i := 0; i < attempts; i++ {
609
span.LogKV("attempt", i)
610
611
if cerr := ctx.Err(); cerr != nil {
612
return cerr
613
}
614
615
bctx, cancel := context.WithCancel(ctx)
616
err = op(bctx)
617
cancel()
618
if err == nil {
619
break
620
}
621
622
log.WithError(err).Error("op failed")
623
span.LogKV("error", err.Error())
624
if i < attempts-1 {
625
log.WithField("backoff", backoff.String()).Debug("retrying op after backoff")
626
if cerr := ctx.Err(); cerr != nil {
627
return cerr
628
}
629
time.Sleep(backoff)
630
backoff = 2 * backoff
631
}
632
}
633
return
634
}
635
636