Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/content-service/pkg/storage/gcloud.go
2501 views
1
// Copyright (c) 2020 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package storage
6
7
import (
8
"context"
9
"encoding/hex"
10
"errors"
11
"fmt"
12
"io"
13
"io/fs"
14
"net/http"
15
"os"
16
"os/exec"
17
"path/filepath"
18
"strings"
19
"sync"
20
"time"
21
22
gcpstorage "cloud.google.com/go/storage"
23
validation "github.com/go-ozzo/ozzo-validation"
24
"github.com/opentracing/opentracing-go"
25
"golang.org/x/oauth2/google"
26
"golang.org/x/xerrors"
27
"google.golang.org/api/googleapi"
28
"google.golang.org/api/iterator"
29
"google.golang.org/api/option"
30
31
"github.com/gitpod-io/gitpod/common-go/log"
32
"github.com/gitpod-io/gitpod/common-go/tracing"
33
config "github.com/gitpod-io/gitpod/content-service/api/config"
34
"github.com/gitpod-io/gitpod/content-service/pkg/archive"
35
)
36
37
var _ DirectAccess = &DirectGCPStorage{}
38
39
var validateExistsInFilesystem = validation.By(func(o interface{}) error {
40
s, ok := o.(string)
41
if !ok {
42
return xerrors.Errorf("field should be string")
43
}
44
45
if s == "" {
46
// don't make this field required
47
return nil
48
}
49
50
_, err := os.Stat(s)
51
return err
52
})
53
54
// Validate checks if the GCloud storage GCPconfig is valid
55
func ValidateGCPConfig(c *config.GCPConfig) error {
56
return validation.ValidateStruct(c,
57
validation.Field(&c.CredentialsFile, validateExistsInFilesystem),
58
validation.Field(&c.Region, validation.Required),
59
validation.Field(&c.Project, validation.Required),
60
)
61
}
62
63
// newDirectGCPAccess provides direct access to the remote storage system
64
func newDirectGCPAccess(cfg config.GCPConfig, stage config.Stage) (*DirectGCPStorage, error) {
65
if err := ValidateGCPConfig(&cfg); err != nil {
66
return nil, err
67
}
68
69
return &DirectGCPStorage{
70
Stage: stage,
71
GCPConfig: cfg,
72
}, nil
73
}
74
75
// DirectGCPStorage stores data in Google Cloud buckets, following a particular naming scheme
76
type DirectGCPStorage struct {
77
Username string
78
WorkspaceName string
79
InstanceID string
80
GCPConfig config.GCPConfig
81
Stage config.Stage
82
83
client *gcpstorage.Client
84
85
// ObjectAccess just exists so that we can swap out the stream access during testing
86
ObjectAccess func(ctx context.Context, btk, obj string) (io.ReadCloser, bool, error)
87
}
88
89
// Validate checks if the GCloud storage is GCPconfigured properly
90
func (rs *DirectGCPStorage) Validate() error {
91
err := ValidateGCPConfig(&rs.GCPConfig)
92
if err != nil {
93
return err
94
}
95
96
return validation.ValidateStruct(rs,
97
validation.Field(&rs.Username, validation.Required),
98
validation.Field(&rs.WorkspaceName, validation.Required),
99
validation.Field(&rs.Stage, validation.Required),
100
)
101
}
102
103
// Init initializes the remote storage - call this before calling anything else on the interface
104
func (rs *DirectGCPStorage) Init(ctx context.Context, owner, workspace, instance string) (err error) {
105
//nolint:ineffassign
106
span, ctx := opentracing.StartSpanFromContext(ctx, "GCloudBucketRemotegcpStorage.Init")
107
defer tracing.FinishSpan(span, &err)
108
109
rs.Username = owner
110
rs.WorkspaceName = workspace
111
rs.InstanceID = instance
112
113
// now that we have all the information complete, validate if we're good to go
114
err = rs.Validate()
115
if err != nil {
116
return xerrors.Errorf("invalid GCloud remote storage GCPconfig: %w", err)
117
}
118
119
client, err := newGCPClient(ctx, rs.GCPConfig)
120
if err != nil {
121
return err
122
}
123
rs.client = client
124
125
if rs.ObjectAccess == nil {
126
rs.ObjectAccess = rs.defaultObjectAccess
127
}
128
129
return nil
130
}
131
132
// EnsureExists makes sure that the remote storage location exists and can be up- or downloaded from
133
func (rs *DirectGCPStorage) EnsureExists(ctx context.Context) (err error) {
134
return gcpEnsureExists(ctx, rs.client, rs.bucketName(), rs.GCPConfig)
135
}
136
137
func gcpEnsureExists(ctx context.Context, client *gcpstorage.Client, bucketName string, gcpConfig config.GCPConfig) (err error) {
138
//nolint:ineffassign
139
span, ctx := opentracing.StartSpanFromContext(ctx, "GCloudBucketRemotegcpStorage.EnsureExists")
140
defer tracing.FinishSpan(span, &err)
141
142
if client == nil {
143
return xerrors.Errorf("no gcloud client available - did you call Init()?")
144
}
145
146
hdl := client.Bucket(bucketName)
147
_, err = hdl.Attrs(ctx)
148
if err == nil {
149
// bucket exists and everything is fine - we're done here
150
return
151
}
152
if err != nil && err != gcpstorage.ErrBucketNotExist {
153
return xerrors.Errorf("cannot ensure storage exists: %w", err)
154
}
155
156
log.WithField("bucketName", bucketName).Debug("Creating bucket")
157
err = hdl.Create(ctx, gcpConfig.Project, &gcpstorage.BucketAttrs{
158
Location: gcpConfig.Region,
159
})
160
if e, ok := err.(*googleapi.Error); ok && e.Code == http.StatusConflict && strings.Contains(strings.ToLower(e.Message), "you already own") {
161
// Looks like we had a bucket creation race and lost.
162
// That's ok - at least the bucket exists now and is still owned by us.
163
} else if err != nil {
164
return xerrors.Errorf("cannot create bucket: %w", err)
165
}
166
167
return nil
168
}
169
170
func (rs *DirectGCPStorage) defaultObjectAccess(ctx context.Context, bkt, obj string) (io.ReadCloser, bool, error) {
171
if rs.client == nil {
172
return nil, false, xerrors.Errorf("no gcloud client available - did you call Init()?")
173
}
174
175
objHandle := rs.client.Bucket(bkt).Object(obj)
176
rc, err := objHandle.NewReader(ctx)
177
if err != nil {
178
return nil, false, err
179
}
180
181
return rc, false, nil
182
}
183
184
func (rs *DirectGCPStorage) download(ctx context.Context, destination string, bkt string, obj string, mappings []archive.IDMapping) (found bool, err error) {
185
//nolint:ineffassign
186
span, ctx := opentracing.StartSpanFromContext(ctx, "download")
187
span.SetTag("gcsBkt", bkt)
188
span.SetTag("gcsObj", obj)
189
defer tracing.FinishSpan(span, &err)
190
191
backupDir, err := os.MkdirTemp("", "backup-")
192
if err != nil {
193
return true, err
194
}
195
defer os.RemoveAll(backupDir)
196
197
var wg sync.WaitGroup
198
199
wg.Add(1)
200
backupSpan := opentracing.StartSpan("downloadBackup", opentracing.ChildOf(span.Context()))
201
202
go func() {
203
defer wg.Done()
204
205
sa := ""
206
if rs.GCPConfig.CredentialsFile != "" {
207
sa = fmt.Sprintf(`-o "Credentials:gs_service_key_file=%v"`, rs.GCPConfig.CredentialsFile)
208
}
209
210
args := fmt.Sprintf(`gsutil -q -m %v\
211
-o "GSUtil:sliced_object_download_max_components=8" \
212
-o "GSUtil:parallel_thread_count=1" \
213
cp gs://%s %s`, sa, filepath.Join(bkt, obj), backupDir)
214
215
log.WithField("flags", args).Debug("gsutil flags")
216
217
cmd := exec.Command("/bin/bash", []string{"-c", args}...)
218
var out []byte
219
out, err = cmd.CombinedOutput()
220
if err != nil {
221
log.WithError(err).WithField("out", string(out)).Error("unexpected error downloading file to GCS using gsutil")
222
err = xerrors.Errorf("unexpected error downloading backup")
223
return
224
}
225
}()
226
227
wg.Wait()
228
tracing.FinishSpan(backupSpan, &err)
229
230
rc, err := os.Open(filepath.Join(backupDir, obj))
231
if err != nil {
232
return true, err
233
}
234
defer rc.Close()
235
236
err = extractTarbal(ctx, destination, rc, mappings)
237
if err != nil {
238
return true, err
239
}
240
241
if err := rs.fixLegacyFilenames(ctx, destination); err != nil {
242
return true, err
243
}
244
245
return true, nil
246
}
247
248
/* tar files produced by the previous sync process contain their workspace ID in the filenames.
249
* This behavior is difficult for snapshot backups, thus ws-daemond does not do that. However,
250
* we need to be able to handle the "old" tar files, hence this legacy mode. See #1559.
251
*/
252
func (rs *DirectGCPStorage) fixLegacyFilenames(ctx context.Context, destination string) (err error) {
253
//nolint:staticcheck,ineffassign
254
span, ctx := opentracing.StartSpanFromContext(ctx, "fixLegacyFilenames")
255
defer tracing.FinishSpan(span, &err)
256
257
legacyPath := filepath.Join(destination, rs.WorkspaceName)
258
if fi, err := os.Stat(legacyPath); errors.Is(err, fs.ErrNotExist) {
259
// legacy path does not exist, nothing to do here
260
return nil
261
} else if fi.IsDir() {
262
log.WithField("destination", destination).WithField("legacyPath", legacyPath).Info("Handling legacy backup")
263
/* legacy path exists and is a directory - move it's content and remove the legacy path.
264
*
265
* Using mv here is difficult as the wildcard expansion is done by the shell and not mv,
266
* thus we'd need to wrap the mv call in a sh call -> too many dependencies to the outside world.
267
*/
268
fis, err := os.ReadDir(legacyPath)
269
if err != nil {
270
return err
271
}
272
for _, fi := range fis {
273
src := filepath.Join(legacyPath, fi.Name())
274
dst := filepath.Join(destination, fi.Name())
275
log.WithField("src", src).WithField("dst", dst).Debug("moving file")
276
if err := os.Rename(src, dst); err != nil {
277
return xerrors.Errorf("mv %s %s: %s", src, dst, err)
278
}
279
}
280
281
if err := os.Remove(legacyPath); err != nil {
282
return err
283
}
284
}
285
286
return nil
287
}
288
289
// Download takes the latest state from the remote storage and downloads it to a local path
290
func (rs *DirectGCPStorage) Download(ctx context.Context, destination string, name string, mappings []archive.IDMapping) (bool, error) {
291
return rs.download(ctx, destination, rs.bucketName(), rs.objectName(name), mappings)
292
}
293
294
// DownloadSnapshot downloads a snapshot. The snapshot name is expected to be one produced by Qualify
295
func (rs *DirectGCPStorage) DownloadSnapshot(ctx context.Context, destination string, name string, mappings []archive.IDMapping) (bool, error) {
296
bkt, obj, err := ParseSnapshotName(name)
297
if err != nil {
298
return false, err
299
}
300
301
return rs.download(ctx, destination, bkt, obj, mappings)
302
}
303
304
// ParseSnapshotName parses the name of a snapshot into bucket and object
305
func ParseSnapshotName(name string) (bkt, obj string, err error) {
306
segments := strings.Split(name, "@")
307
if len(segments) != 2 {
308
err = xerrors.Errorf("%s is not a valid GCloud remote storage FQN", name)
309
return
310
}
311
312
obj = segments[0]
313
bkt = segments[1]
314
return
315
}
316
317
// ListObjects returns all objects found with the given prefix. Returns an empty list if the bucket does not exuist (yet).
318
func (rs *DirectGCPStorage) ListObjects(ctx context.Context, prefix string) (objects []string, err error) {
319
bkt := rs.client.Bucket(rs.bucketName())
320
_, err = bkt.Attrs(ctx)
321
if errors.Is(err, gcpstorage.ErrBucketNotExist) {
322
// bucket does not exist: nothing to list
323
return nil, nil
324
}
325
if err != nil {
326
return nil, xerrors.Errorf("cannot list objects: %w", err)
327
}
328
329
iter := bkt.Objects(ctx, &gcpstorage.Query{Prefix: prefix})
330
var obj *gcpstorage.ObjectAttrs
331
for obj, err = iter.Next(); obj != nil; obj, err = iter.Next() {
332
objects = append(objects, obj.Name)
333
}
334
if err != iterator.Done && err != nil {
335
return nil, xerrors.Errorf("cannot iterate list objects: %w", err)
336
}
337
338
return objects, nil
339
}
340
341
// Qualify fully qualifies a snapshot name so that it can be downloaded using DownloadSnapshot
342
func (rs *DirectGCPStorage) Qualify(name string) string {
343
return fmt.Sprintf("%s@%s", rs.objectName(name), rs.bucketName())
344
}
345
346
// UploadInstance takes all files from a local location and uploads it to the per-instance remote storage
347
func (rs *DirectGCPStorage) UploadInstance(ctx context.Context, source string, name string, opts ...UploadOption) (bucket, object string, err error) {
348
if rs.InstanceID == "" {
349
return "", "", xerrors.Errorf("instanceID is required to comput object name")
350
}
351
return rs.Upload(ctx, source, InstanceObjectName(rs.InstanceID, name), opts...)
352
}
353
354
// Upload takes all files from a local location and uploads it to the remote storage
355
func (rs *DirectGCPStorage) Upload(ctx context.Context, source string, name string, opts ...UploadOption) (bucket, object string, err error) {
356
//nolint:ineffassign
357
span, ctx := opentracing.StartSpanFromContext(ctx, "GCloudBucketRemotegcpStorage.Upload")
358
defer tracing.FinishSpan(span, &err)
359
log := log.WithFields(log.OWI(rs.Username, rs.WorkspaceName, ""))
360
361
if rs.client == nil {
362
err = xerrors.Errorf("no gcloud client available - did you call Init()?")
363
return
364
}
365
366
sfn, err := os.Open(source)
367
if err != nil {
368
err = xerrors.Errorf("cannot open file for uploading: %w", err)
369
return
370
}
371
defer sfn.Close()
372
373
stat, err := sfn.Stat()
374
if err != nil {
375
return
376
}
377
378
totalSize := stat.Size()
379
span.SetTag("totalSize", totalSize)
380
381
bucket = rs.bucketName()
382
object = rs.objectName(name)
383
384
uploadSpan := opentracing.StartSpan("remote-upload", opentracing.ChildOf(span.Context()))
385
uploadSpan.SetTag("bucket", bucket)
386
uploadSpan.SetTag("obj", object)
387
388
err = gcpEnsureExists(ctx, rs.client, bucket, rs.GCPConfig)
389
if err != nil {
390
err = xerrors.Errorf("unexpected error: %w", err)
391
return
392
}
393
394
var wg sync.WaitGroup
395
396
wg.Add(1)
397
398
go func() {
399
defer wg.Done()
400
401
sa := ""
402
if rs.GCPConfig.CredentialsFile != "" {
403
sa = fmt.Sprintf(`-o "Credentials:gs_service_key_file=%v"`, rs.GCPConfig.CredentialsFile)
404
}
405
406
args := fmt.Sprintf(`gsutil -q -m %v\
407
-o "GSUtil:parallel_composite_upload_threshold=150M" \
408
-o "GSUtil:parallel_process_count=3" \
409
-o "GSUtil:parallel_thread_count=6" \
410
cp %s gs://%s`, sa, source, filepath.Join(bucket, object))
411
412
log.WithField("flags", args).Debug("gsutil flags")
413
414
cmd := exec.Command("/bin/bash", []string{"-c", args}...)
415
var out []byte
416
out, err = cmd.CombinedOutput()
417
if err != nil {
418
log.WithError(err).WithField("out", string(out)).Error("unexpected error uploading file to GCS using gsutil")
419
err = xerrors.Errorf("unexpected error uploading backup")
420
return
421
}
422
}()
423
424
wg.Wait()
425
426
uploadSpan.Finish()
427
428
err = nil
429
return
430
}
431
432
func (rs *DirectGCPStorage) bucketName() string {
433
return gcpBucketName(rs.Stage, rs.Username)
434
}
435
436
// Bucket provides the bucket name for a particular user
437
func (rs *DirectGCPStorage) Bucket(ownerID string) string {
438
return gcpBucketName(rs.Stage, ownerID)
439
}
440
441
// BackupObject returns a backup's object name that a direct downloader would download
442
func (rs *DirectGCPStorage) BackupObject(name string) string {
443
return rs.objectName(name)
444
}
445
446
func gcpBucketName(stage config.Stage, ownerID string) string {
447
return fmt.Sprintf("gitpod-%s-user-%s", stage, ownerID)
448
}
449
450
func gcpWorkspaceBackupObjectName(workspaceID string, name string) string {
451
return fmt.Sprintf("%s/%s", workspaceID, name)
452
}
453
454
func (rs *DirectGCPStorage) workspacePrefix() string {
455
return fmt.Sprintf("workspaces/%s", rs.WorkspaceName)
456
}
457
458
func (rs *DirectGCPStorage) objectName(name string) string {
459
return gcpWorkspaceBackupObjectName(rs.workspacePrefix(), name)
460
}
461
462
func newGCPClient(ctx context.Context, cfg config.GCPConfig) (*gcpstorage.Client, error) {
463
credfile := cfg.CredentialsFile
464
if tproot := os.Getenv("TELEPRESENCE_ROOT"); tproot != "" {
465
credfile = filepath.Join(tproot, credfile)
466
}
467
468
client, err := gcpstorage.NewClient(ctx, option.WithCredentialsFile(credfile))
469
if err != nil {
470
return nil, xerrors.Errorf("cannot create GCP storage client: %w", err)
471
}
472
return client, nil
473
}
474
475
func newPresignedGCPAccess(config config.GCPConfig, stage config.Stage) (*PresignedGCPStorage, error) {
476
err := ValidateGCPConfig(&config)
477
if err != nil {
478
return nil, xerrors.Errorf("invalid config: %w", err)
479
}
480
481
credfile := config.CredentialsFile
482
if tproot := os.Getenv("TELEPRESENCE_ROOT"); tproot != "" {
483
credfile = filepath.Join(tproot, credfile)
484
}
485
486
jsonKey, err := os.ReadFile(credfile)
487
if err != nil {
488
return nil, xerrors.Errorf("cannot read private key: %w", err)
489
}
490
privateKey, err := google.JWTConfigFromJSON(jsonKey)
491
if err != nil {
492
return nil, xerrors.Errorf("cannot get private key: %w", err)
493
}
494
495
ctx, cancel := context.WithCancel(context.Background())
496
defer cancel()
497
498
// We create a client here just to make sure that we can
499
client, err := gcpstorage.NewClient(ctx, option.WithCredentialsFile(credfile))
500
if err != nil {
501
return nil, xerrors.Errorf("cannot create GCP storage client: %w", err)
502
}
503
client.Close()
504
505
if err != nil {
506
return nil, xerrors.Errorf("cannot get Google access ID: %w", err)
507
}
508
509
return &PresignedGCPStorage{
510
config: config,
511
stage: stage,
512
privateKey: privateKey.PrivateKey,
513
accessID: privateKey.Email,
514
}, nil
515
}
516
517
// PresignedGCPStorage provides presigned URLs to access GCP storage objects
518
type PresignedGCPStorage struct {
519
config config.GCPConfig
520
stage config.Stage
521
privateKey []byte
522
accessID string
523
}
524
525
// Bucket provides the bucket name for a particular user
526
func (p *PresignedGCPStorage) Bucket(owner string) string {
527
return gcpBucketName(p.stage, owner)
528
}
529
530
// BlobObject returns a blob's object name
531
func (p *PresignedGCPStorage) BlobObject(userID, name string) (string, error) {
532
return blobObjectName(name)
533
}
534
535
// EnsureExists makes sure that the remote storage location exists and can be up- or downloaded from
536
func (p *PresignedGCPStorage) EnsureExists(ctx context.Context, bucket string) (err error) {
537
client, err := newGCPClient(ctx, p.config)
538
if err != nil {
539
return err
540
}
541
//nolint:staticcheck
542
defer client.Close()
543
544
return gcpEnsureExists(ctx, client, bucket, p.config)
545
}
546
547
// DiskUsage gives the total objects size of objects that have the given prefix
548
func (p *PresignedGCPStorage) DiskUsage(ctx context.Context, bucket string, prefix string) (size int64, err error) {
549
client, err := newGCPClient(ctx, p.config)
550
if err != nil {
551
return
552
}
553
//nolint:staticcheck
554
defer client.Close()
555
556
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
557
defer cancel()
558
559
if !strings.HasSuffix(prefix, "/") {
560
prefix = prefix + "/"
561
}
562
563
var total int64
564
it := client.Bucket(bucket).Objects(ctx, &gcpstorage.Query{
565
Prefix: prefix,
566
})
567
for {
568
attrs, err := it.Next()
569
if err == iterator.Done {
570
break
571
}
572
if err != nil {
573
return 0, err
574
}
575
total += attrs.Size
576
}
577
578
return total, nil
579
}
580
581
// SignDownload provides presigned URLs to access remote storage objects
582
func (p *PresignedGCPStorage) SignDownload(ctx context.Context, bucket, object string, options *SignedURLOptions) (*DownloadInfo, error) {
583
client, err := newGCPClient(ctx, p.config)
584
if err != nil {
585
return nil, err
586
}
587
//nolint:staticcheck
588
defer client.Close()
589
590
bkt := client.Bucket(bucket)
591
_, err = bkt.Attrs(ctx)
592
if errors.Is(err, gcpstorage.ErrBucketNotExist) {
593
return nil, ErrNotFound
594
}
595
if err != nil {
596
return nil, err
597
}
598
599
obj := bkt.Object(object)
600
attrs, err := obj.Attrs(ctx)
601
if errors.Is(err, gcpstorage.ErrObjectNotExist) {
602
return nil, ErrNotFound
603
}
604
if err != nil {
605
return nil, err
606
}
607
res, err := p.downloadInfo(ctx, client, attrs, options)
608
if err != nil {
609
return nil, err
610
}
611
612
return res, nil
613
}
614
615
func (p *PresignedGCPStorage) downloadInfo(ctx context.Context, client *gcpstorage.Client, obj *gcpstorage.ObjectAttrs, options *SignedURLOptions) (*DownloadInfo, error) {
616
meta := &ObjectMeta{
617
ContentType: obj.ContentType,
618
OCIMediaType: obj.Metadata[ObjectAnnotationOCIContentType],
619
Digest: obj.Metadata[ObjectAnnotationDigest],
620
UncompressedDigest: obj.Metadata[ObjectAnnotationUncompressedDigest],
621
}
622
url, err := gcpstorage.SignedURL(obj.Bucket, obj.Name, &gcpstorage.SignedURLOptions{
623
Method: "GET",
624
GoogleAccessID: p.accessID,
625
PrivateKey: p.privateKey,
626
Expires: time.Now().Add(1 * time.Hour),
627
ContentType: options.ContentType,
628
})
629
if err != nil {
630
return nil, err
631
}
632
633
return &DownloadInfo{
634
Meta: *meta,
635
URL: url,
636
Size: obj.Size,
637
}, nil
638
}
639
640
// SignUpload describes an object for upload
641
func (p *PresignedGCPStorage) SignUpload(ctx context.Context, bucket, object string, options *SignedURLOptions) (info *UploadInfo, err error) {
642
client, err := newGCPClient(ctx, p.config)
643
if err != nil {
644
return nil, err
645
}
646
//nolint:staticcheck
647
defer client.Close()
648
649
bkt := client.Bucket(bucket)
650
_, err = bkt.Attrs(ctx)
651
if errors.Is(err, gcpstorage.ErrBucketNotExist) {
652
return nil, ErrNotFound
653
}
654
if err != nil {
655
return nil, err
656
}
657
658
url, err := gcpstorage.SignedURL(bucket, object, &gcpstorage.SignedURLOptions{
659
Method: "PUT",
660
GoogleAccessID: p.accessID,
661
PrivateKey: p.privateKey,
662
Expires: time.Now().Add(30 * time.Minute),
663
ContentType: options.ContentType,
664
})
665
if err != nil {
666
return nil, err
667
}
668
669
return &UploadInfo{
670
URL: url,
671
}, nil
672
}
673
674
// DeleteObject deletes objects in the given bucket specified by the given query
675
func (p *PresignedGCPStorage) DeleteObject(ctx context.Context, bucket string, query *DeleteObjectQuery) (err error) {
676
client, err := newGCPClient(ctx, p.config)
677
if err != nil {
678
return err
679
}
680
//nolint:staticcheck
681
defer client.Close()
682
683
if query.Name != "" {
684
err = client.Bucket(bucket).Object(query.Name).Delete(ctx)
685
if err != nil {
686
if errors.Is(err, gcpstorage.ErrBucketNotExist) || errors.Is(err, gcpstorage.ErrObjectNotExist) {
687
return ErrNotFound
688
}
689
690
log.WithField("bucket", bucket).WithField("object", query.Name).WithError(err).Warn("cannot delete object")
691
return err
692
}
693
return nil
694
}
695
696
prefix := query.Prefix
697
b := client.Bucket(bucket)
698
var it *gcpstorage.ObjectIterator
699
if prefix != "" && prefix != "/" {
700
it = b.Objects(ctx, &gcpstorage.Query{
701
Prefix: prefix,
702
})
703
} else {
704
it = b.Objects(ctx, nil)
705
}
706
for {
707
attrs, err := it.Next()
708
if err == iterator.Done {
709
break
710
}
711
// if we get any error besides "done" the iterator is broken: make sure we don't use it again.
712
if err != nil {
713
if errors.Is(err, gcpstorage.ErrBucketNotExist) {
714
return ErrNotFound
715
}
716
log.WithField("bucket", bucket).WithError(err).Error("error iterating object")
717
break
718
}
719
err = b.Object(attrs.Name).Delete(ctx)
720
if err != nil {
721
if errors.Is(err, gcpstorage.ErrBucketNotExist) || errors.Is(err, gcpstorage.ErrObjectNotExist) {
722
continue
723
}
724
log.WithField("bucket", bucket).WithField("object", attrs.Name).WithError(err).Warn("cannot delete object, continue deleting objects")
725
}
726
}
727
return err
728
}
729
730
// DeleteBucket deletes a bucket
731
func (p *PresignedGCPStorage) DeleteBucket(ctx context.Context, userID, bucket string) (err error) {
732
client, err := newGCPClient(ctx, p.config)
733
if err != nil {
734
return err
735
}
736
//nolint:staticcheck
737
defer client.Close()
738
739
err = p.DeleteObject(ctx, bucket, &DeleteObjectQuery{})
740
if err != nil {
741
return err
742
}
743
744
err = client.Bucket(bucket).Delete(ctx)
745
if err != nil {
746
if e, ok := err.(*googleapi.Error); ok {
747
if e.Code == http.StatusNotFound {
748
return ErrNotFound
749
}
750
}
751
if errors.Is(err, gcpstorage.ErrBucketNotExist) {
752
return ErrNotFound
753
}
754
return err
755
}
756
return nil
757
}
758
759
// ObjectHash gets a hash value of an object
760
func (p *PresignedGCPStorage) ObjectHash(ctx context.Context, bucket string, obj string) (hash string, err error) {
761
client, err := newGCPClient(ctx, p.config)
762
if err != nil {
763
return "", err
764
}
765
//nolint:staticcheck
766
defer client.Close()
767
768
attr, err := client.Bucket(bucket).Object(obj).Attrs(ctx)
769
if err != nil {
770
if errors.Is(err, gcpstorage.ErrBucketNotExist) {
771
return "", ErrNotFound
772
}
773
return "", err
774
}
775
return hex.EncodeToString(attr.MD5), nil
776
}
777
778
func (p *PresignedGCPStorage) ObjectExists(ctx context.Context, bucket, obj string) (bool, error) {
779
client, err := newGCPClient(ctx, p.config)
780
if err != nil {
781
return false, err
782
}
783
//nolint:staticcheck
784
defer client.Close()
785
786
_, err = client.Bucket(bucket).Object(obj).Attrs(ctx)
787
if err != nil {
788
if errors.Is(err, gcpstorage.ErrBucketNotExist) {
789
return false, nil
790
}
791
if errors.Is(err, gcpstorage.ErrObjectNotExist) {
792
return false, nil
793
}
794
return false, err
795
}
796
return true, nil
797
}
798
799
// BackupObject returns a backup's object name that a direct downloader would download
800
func (p *PresignedGCPStorage) BackupObject(ownerID string, workspaceID string, name string) string {
801
return fmt.Sprintf("workspaces/%s", gcpWorkspaceBackupObjectName(workspaceID, name))
802
}
803
804
// InstanceObject returns a instance's object name that a direct downloader would download
805
func (p *PresignedGCPStorage) InstanceObject(ownerID string, workspaceID string, instanceID string, name string) string {
806
return p.BackupObject(ownerID, workspaceID, InstanceObjectName(instanceID, name))
807
}
808
809