Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/content-service/pkg/storage/s3.go
2501 views
1
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package storage
6
7
import (
8
"context"
9
"errors"
10
"fmt"
11
"os"
12
"path/filepath"
13
"strings"
14
15
"github.com/gitpod-io/gitpod/common-go/log"
16
"github.com/gitpod-io/gitpod/content-service/pkg/archive"
17
"golang.org/x/xerrors"
18
19
"github.com/aws/aws-sdk-go-v2/aws"
20
v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
21
s3manager "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
22
"github.com/aws/aws-sdk-go-v2/service/s3"
23
"github.com/aws/aws-sdk-go-v2/service/s3/types"
24
)
25
26
const (
27
defaultCopyConcurrency = 10
28
defaultPartSize = 50 // MiB
29
megabytes = 1024 * 1024
30
)
31
32
var _ DirectAccess = &s3Storage{}
33
var _ PresignedAccess = &PresignedS3Storage{}
34
35
type S3Config struct {
36
Bucket string
37
}
38
39
type S3Client interface {
40
ListObjectsV2(ctx context.Context, params *s3.ListObjectsV2Input, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error)
41
DeleteObjects(ctx context.Context, params *s3.DeleteObjectsInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectsOutput, error)
42
GetObjectAttributes(ctx context.Context, params *s3.GetObjectAttributesInput, optFns ...func(*s3.Options)) (*s3.GetObjectAttributesOutput, error)
43
GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error)
44
}
45
46
type PresignedS3Client interface {
47
PresignGetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.PresignOptions)) (*v4.PresignedHTTPRequest, error)
48
PresignPutObject(ctx context.Context, params *s3.PutObjectInput, optFns ...func(*s3.PresignOptions)) (*v4.PresignedHTTPRequest, error)
49
}
50
51
func NewPresignedS3Access(client S3Client, config S3Config) *PresignedS3Storage {
52
return &PresignedS3Storage{
53
Config: config,
54
client: client,
55
PresignedFactory: func() PresignedS3Client {
56
if s3c, ok := client.(*s3.Client); ok {
57
return s3.NewPresignClient(s3c)
58
}
59
return nil
60
},
61
}
62
}
63
64
type PresignedS3Storage struct {
65
Config S3Config
66
67
client S3Client
68
69
// PresignedFactory exists for testing only. DO NOT USE in production.
70
PresignedFactory func() PresignedS3Client
71
}
72
73
// Bucket implements PresignedAccess
74
func (rs *PresignedS3Storage) Bucket(userID string) string {
75
return rs.Config.Bucket
76
}
77
78
// BlobObject implements PresignedAccess
79
func (rs *PresignedS3Storage) BlobObject(userID, name string) (string, error) {
80
blb, err := blobObjectName(name)
81
if err != nil {
82
return "", err
83
}
84
85
return filepath.Join(userID, blb), nil
86
}
87
88
// BackupObject implements PresignedAccess
89
func (rs *PresignedS3Storage) BackupObject(ownerID string, workspaceID string, name string) string {
90
return s3WorkspaceBackupObjectName(ownerID, workspaceID, name)
91
}
92
93
// DeleteBucket implements PresignedAccess
94
func (rs *PresignedS3Storage) DeleteBucket(ctx context.Context, userID, bucket string) error {
95
if bucket != rs.Config.Bucket {
96
log.WithField("requestedBucket", bucket).WithField("configuredBucket", rs.Config.Bucket).Error("can only delete from configured bucket")
97
return xerrors.Errorf("can only delete from configured bucket; this looks like a bug in Gitpod")
98
}
99
100
return rs.DeleteObject(ctx, rs.Config.Bucket, &DeleteObjectQuery{Prefix: userID + "/"})
101
}
102
103
// DeleteObject implements PresignedAccess
104
func (rs *PresignedS3Storage) DeleteObject(ctx context.Context, bucket string, query *DeleteObjectQuery) error {
105
var objects []types.ObjectIdentifier
106
107
switch {
108
case query.Name != "":
109
objects = []types.ObjectIdentifier{{Key: &query.Name}}
110
111
case query.Prefix != "":
112
resp, err := rs.client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
113
Bucket: aws.String(rs.Config.Bucket),
114
Prefix: aws.String(query.Prefix),
115
})
116
if err != nil {
117
return err
118
}
119
for _, e := range resp.Contents {
120
objects = append(objects, types.ObjectIdentifier{
121
Key: e.Key,
122
})
123
}
124
}
125
126
if len(objects) == 0 {
127
return nil
128
}
129
130
resp, err := rs.client.DeleteObjects(ctx, &s3.DeleteObjectsInput{
131
Bucket: &rs.Config.Bucket,
132
Delete: &types.Delete{
133
Objects: objects,
134
Quiet: aws.Bool(true),
135
},
136
})
137
if err != nil {
138
return err
139
}
140
if len(resp.Errors) > 0 {
141
var errs []string
142
for _, e := range resp.Errors {
143
errs = append(errs, fmt.Sprintf("%s: %s", aws.ToString(e.Key), aws.ToString(e.Message)))
144
}
145
return xerrors.Errorf("cannot delete objects: %s", strings.Join(errs, ", "))
146
}
147
148
return nil
149
}
150
151
// DiskUsage implements PresignedAccess
152
func (rs *PresignedS3Storage) DiskUsage(ctx context.Context, bucket string, prefix string) (size int64, err error) {
153
resp, err := rs.client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
154
Bucket: &rs.Config.Bucket,
155
Prefix: aws.String(prefix),
156
})
157
if err != nil {
158
return 0, err
159
}
160
161
for _, r := range resp.Contents {
162
size += *r.Size
163
}
164
return
165
}
166
167
// EnsureExists implements PresignedAccess
168
func (rs *PresignedS3Storage) EnsureExists(ctx context.Context, bucket string) error {
169
return nil
170
}
171
172
// InstanceObject implements PresignedAccess
173
func (rs *PresignedS3Storage) InstanceObject(ownerID string, workspaceID string, instanceID string, name string) string {
174
return rs.BackupObject(ownerID, workspaceID, InstanceObjectName(instanceID, name))
175
}
176
177
// ObjectExists implements PresignedAccess
178
func (rs *PresignedS3Storage) ObjectExists(ctx context.Context, bucket string, path string) (bool, error) {
179
_, err := rs.client.GetObjectAttributes(ctx, &s3.GetObjectAttributesInput{
180
Bucket: &rs.Config.Bucket,
181
Key: aws.String(path),
182
ObjectAttributes: []types.ObjectAttributes{types.ObjectAttributesEtag},
183
})
184
185
var nsk *types.NoSuchKey
186
if errors.As(err, &nsk) {
187
return false, nil
188
}
189
190
if err != nil {
191
return false, err
192
}
193
return true, nil
194
}
195
196
// ObjectHash implements PresignedAccess
197
func (rs *PresignedS3Storage) ObjectHash(ctx context.Context, bucket string, obj string) (string, error) {
198
resp, err := rs.client.GetObjectAttributes(ctx, &s3.GetObjectAttributesInput{
199
Bucket: &rs.Config.Bucket,
200
Key: aws.String(obj),
201
ObjectAttributes: []types.ObjectAttributes{types.ObjectAttributesEtag},
202
})
203
var nsk *types.NoSuchKey
204
if errors.As(err, &nsk) {
205
return "", ErrNotFound
206
}
207
208
if err != nil {
209
return "", err
210
}
211
212
return *resp.ETag, nil
213
}
214
215
// SignDownload implements PresignedAccess
216
func (rs *PresignedS3Storage) SignDownload(ctx context.Context, bucket string, obj string, options *SignedURLOptions) (info *DownloadInfo, err error) {
217
resp, err := rs.client.GetObjectAttributes(ctx, &s3.GetObjectAttributesInput{
218
Bucket: &rs.Config.Bucket,
219
Key: aws.String(obj),
220
ObjectAttributes: []types.ObjectAttributes{types.ObjectAttributesObjectSize},
221
})
222
223
var nsk *types.NoSuchKey
224
if errors.As(err, &nsk) {
225
return nil, ErrNotFound
226
}
227
228
if err != nil {
229
return nil, err
230
}
231
232
req, err := rs.PresignedFactory().PresignGetObject(ctx, &s3.GetObjectInput{
233
Bucket: aws.String(rs.Config.Bucket),
234
Key: aws.String(obj),
235
})
236
if err != nil {
237
return nil, err
238
}
239
240
return &DownloadInfo{
241
Meta: ObjectMeta{
242
// TODO(cw): implement this if we need to support FWB with S3
243
},
244
Size: *resp.ObjectSize,
245
URL: req.URL,
246
}, nil
247
}
248
249
// SignUpload implements PresignedAccess
250
func (rs *PresignedS3Storage) SignUpload(ctx context.Context, bucket string, obj string, options *SignedURLOptions) (info *UploadInfo, err error) {
251
resp, err := rs.PresignedFactory().PresignPutObject(ctx, &s3.PutObjectInput{
252
Bucket: &rs.Config.Bucket,
253
Key: aws.String(obj),
254
})
255
if err != nil {
256
return nil, err
257
}
258
259
return &UploadInfo{
260
URL: resp.URL,
261
}, nil
262
}
263
264
func newDirectS3Access(client S3Client, config S3Config) *s3Storage {
265
return &s3Storage{
266
Config: config,
267
client: client,
268
}
269
}
270
271
type s3Storage struct {
272
Config S3Config
273
274
OwnerID, WorkspaceID, InstanceID string
275
276
client S3Client
277
}
278
279
// Bucket implements DirectAccess
280
func (s3st *s3Storage) Bucket(userID string) string {
281
return s3st.Config.Bucket
282
}
283
284
// BackupObject implements DirectAccess
285
func (s3st *s3Storage) BackupObject(name string) string {
286
return s3st.objectName(name)
287
}
288
289
// Download implements DirectAccess
290
func (s3st *s3Storage) Download(ctx context.Context, destination string, name string, mappings []archive.IDMapping) (found bool, err error) {
291
return s3st.download(ctx, destination, s3st.objectName(name), mappings)
292
}
293
294
// DownloadSnapshot implements DirectAccess
295
func (s3st *s3Storage) DownloadSnapshot(ctx context.Context, destination string, name string, mappings []archive.IDMapping) (found bool, err error) {
296
return s3st.download(ctx, destination, name, mappings)
297
}
298
299
func (s3st *s3Storage) download(ctx context.Context, destination string, obj string, mappings []archive.IDMapping) (found bool, err error) {
300
downloader := s3manager.NewDownloader(s3st.client, func(d *s3manager.Downloader) {
301
d.Concurrency = defaultCopyConcurrency
302
d.PartSize = defaultPartSize * megabytes
303
d.BufferProvider = s3manager.NewPooledBufferedWriterReadFromProvider(25 * megabytes)
304
})
305
306
s3File, err := os.CreateTemp("", "temporal-s3-file")
307
if err != nil {
308
return true, xerrors.Errorf("creating temporal file: %s", err.Error())
309
}
310
defer os.Remove(s3File.Name())
311
312
_, err = downloader.Download(ctx, s3File, &s3.GetObjectInput{
313
Bucket: aws.String(s3st.Config.Bucket),
314
Key: aws.String(obj),
315
})
316
if err != nil {
317
return false, err
318
}
319
320
_, err = s3File.Seek(0, 0)
321
if err != nil {
322
return false, err
323
}
324
325
err = archive.ExtractTarbal(ctx, s3File, destination, archive.WithUIDMapping(mappings), archive.WithGIDMapping(mappings))
326
if err != nil {
327
return true, xerrors.Errorf("tar %s: %s", destination, err.Error())
328
}
329
330
return true, nil
331
}
332
333
// EnsureExists implements DirectAccess
334
func (*s3Storage) EnsureExists(ctx context.Context) error {
335
return nil
336
}
337
338
// Init implements DirectAccess
339
func (s3st *s3Storage) Init(ctx context.Context, owner string, workspace string, instance string) error {
340
s3st.OwnerID = owner
341
s3st.WorkspaceID = workspace
342
s3st.InstanceID = instance
343
return nil
344
}
345
346
// ListObjects implements DirectAccess
347
func (s3st *s3Storage) ListObjects(ctx context.Context, prefix string) ([]string, error) {
348
if !strings.HasPrefix(prefix, s3st.OwnerID+"/") {
349
return nil, xerrors.Errorf("prefix must start with the owner ID")
350
}
351
352
var res []string
353
listParams := &s3.ListObjectsV2Input{
354
Bucket: aws.String(s3st.Config.Bucket),
355
Prefix: aws.String(prefix),
356
}
357
fetchObjects := true
358
for fetchObjects {
359
objs, err := s3st.client.ListObjectsV2(ctx, listParams)
360
361
if err != nil {
362
return nil, xerrors.Errorf("cannot list objects: %w", err)
363
}
364
365
for _, o := range objs.Contents {
366
res = append(res, *o.Key)
367
}
368
369
listParams.ContinuationToken = objs.NextContinuationToken
370
fetchObjects = *objs.IsTruncated
371
}
372
373
return res, nil
374
}
375
376
// Qualify implements DirectAccess
377
func (s3st *s3Storage) Qualify(name string) string {
378
return fmt.Sprintf("%s@%s", s3st.objectName(name), s3st.Config.Bucket)
379
}
380
381
func (s3st *s3Storage) objectName(name string) string {
382
return s3WorkspaceBackupObjectName(s3st.OwnerID, s3st.WorkspaceID, name)
383
}
384
385
func s3WorkspaceBackupObjectName(ownerID, workspaceID, name string) string {
386
return filepath.Join(ownerID, "workspaces", workspaceID, name)
387
}
388
389
// Upload implements DirectAccess
390
func (s3st *s3Storage) Upload(ctx context.Context, source string, name string, opts ...UploadOption) (bucket string, obj string, err error) {
391
options, err := GetUploadOptions(opts)
392
if err != nil {
393
err = xerrors.Errorf("cannot get options: %w", err)
394
return
395
}
396
397
if s3st.client == nil {
398
err = xerrors.Errorf("no s3 client available - did you call Init()?")
399
return
400
}
401
402
f, err := os.Open(source)
403
if err != nil {
404
err = xerrors.Errorf("cannot read backup file: %w", err)
405
}
406
defer f.Close()
407
408
var contentType *string
409
if options.ContentType != "" {
410
contentType = aws.String(options.ContentType)
411
}
412
413
bucket = s3st.Config.Bucket
414
obj = s3st.objectName(name)
415
416
s3c, ok := s3st.client.(*s3.Client)
417
if !ok {
418
err = xerrors.Errorf("Can only upload with actual S3 client")
419
}
420
421
uploader := s3manager.NewUploader(s3c, func(u *s3manager.Uploader) {
422
u.Concurrency = defaultCopyConcurrency
423
u.PartSize = defaultPartSize * megabytes
424
u.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(25 * megabytes)
425
})
426
_, err = uploader.Upload(ctx, &s3.PutObjectInput{
427
Bucket: aws.String(bucket),
428
Key: aws.String(obj),
429
430
// f implements io.ReadSeeker and hence is uploaded in parallel.
431
// cf. https://aws.github.io/aws-sdk-go-v2/docs/sdk-utilities/s3/#putobjectinput-body-field-ioreadseeker-vs-ioreader
432
Body: f,
433
434
Metadata: options.Annotations,
435
ContentType: contentType,
436
})
437
if err != nil {
438
return
439
}
440
441
return
442
}
443
444
// UploadInstance implements DirectAccess
445
func (s3st *s3Storage) UploadInstance(ctx context.Context, source string, name string, opts ...UploadOption) (bucket string, obj string, err error) {
446
if s3st.InstanceID == "" {
447
return "", "", xerrors.Errorf("instanceID is required to comput object name")
448
}
449
return s3st.Upload(ctx, source, InstanceObjectName(s3st.InstanceID, name), opts...)
450
}
451
452