Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/content-service/pkg/storage/minio.go
2501 views
1
// Copyright (c) 2020 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package storage
6
7
import (
8
"context"
9
"fmt"
10
"io"
11
"net/http"
12
"os"
13
"path/filepath"
14
"strings"
15
"time"
16
17
validation "github.com/go-ozzo/ozzo-validation"
18
minio "github.com/minio/minio-go/v7"
19
"github.com/minio/minio-go/v7/pkg/credentials"
20
"github.com/opentracing/opentracing-go"
21
"golang.org/x/xerrors"
22
23
"github.com/gitpod-io/gitpod/common-go/log"
24
"github.com/gitpod-io/gitpod/common-go/tracing"
25
config "github.com/gitpod-io/gitpod/content-service/api/config"
26
"github.com/gitpod-io/gitpod/content-service/pkg/archive"
27
)
28
29
var _ DirectAccess = &DirectMinIOStorage{}
30
31
// Validate checks if the GCloud storage MinIOconfig is valid
32
func ValidateMinIOConfig(c *config.MinIOConfig) error {
33
return validation.ValidateStruct(c,
34
validation.Field(&c.Endpoint, validation.Required),
35
validation.Field(&c.AccessKeyID, validation.Required),
36
validation.Field(&c.SecretAccessKey, validation.Required),
37
validation.Field(&c.Region, validation.Required),
38
)
39
}
40
41
// addMinioParamsFromMounts allows for access/secret key to be read from a file
42
func addMinioParamsFromMounts(c *config.MinIOConfig) error {
43
// Allow volume mounts to be passed in for access/secret key
44
if c.AccessKeyIdFile != "" {
45
value, err := os.ReadFile(c.AccessKeyIdFile)
46
if err != nil {
47
return err
48
}
49
c.AccessKeyID = string(value)
50
}
51
if c.SecretAccessKeyFile != "" {
52
value, err := os.ReadFile(c.SecretAccessKeyFile)
53
if err != nil {
54
return err
55
}
56
c.SecretAccessKey = string(value)
57
}
58
return nil
59
}
60
61
// MinIOClient produces a new minio client based on this configuration
62
func NewMinIOClient(c *config.MinIOConfig) (*minio.Client, error) {
63
if c.ParallelUpload == 0 {
64
c.ParallelUpload = 1
65
}
66
67
err := addMinioParamsFromMounts(c)
68
if err != nil {
69
return nil, err
70
}
71
72
// now that we have all the information complete, validate if we're good to go
73
err = ValidateMinIOConfig(c)
74
if err != nil {
75
return nil, err
76
}
77
78
minioClient, err := minio.New(c.Endpoint, &minio.Options{
79
Creds: credentials.NewStaticV4(c.AccessKeyID, c.SecretAccessKey, ""),
80
Secure: c.Secure,
81
})
82
if err != nil {
83
return nil, err
84
}
85
86
return minioClient, nil
87
}
88
89
// newDirectMinIOAccess provides direct access to the remote storage system
90
func newDirectMinIOAccess(cfg config.MinIOConfig) (*DirectMinIOStorage, error) {
91
err := addMinioParamsFromMounts(&cfg)
92
if err != nil {
93
return nil, err
94
}
95
96
if err = ValidateMinIOConfig(&cfg); err != nil {
97
return nil, err
98
}
99
return &DirectMinIOStorage{MinIOConfig: cfg}, nil
100
}
101
102
// DirectMinIOStorage implements MinIO as remote storage backend
103
type DirectMinIOStorage struct {
104
Username string
105
WorkspaceName string
106
InstanceID string
107
MinIOConfig config.MinIOConfig
108
109
client *minio.Client
110
111
// ObjectAccess just exists so that we can swap out the stream access during testing
112
ObjectAccess func(ctx context.Context, btk, obj string) (io.ReadCloser, error)
113
}
114
115
// Validate checks if the GCloud storage is MinIOconfigured properly
116
func (rs *DirectMinIOStorage) Validate() error {
117
err := ValidateMinIOConfig(&rs.MinIOConfig)
118
if err != nil {
119
return err
120
}
121
122
return validation.ValidateStruct(rs,
123
validation.Field(&rs.Username, validation.Required),
124
validation.Field(&rs.WorkspaceName, validation.Required),
125
)
126
}
127
128
// Init initializes the remote storage - call this before calling anything else on the interface
129
func (rs *DirectMinIOStorage) Init(ctx context.Context, owner, workspace, instance string) (err error) {
130
rs.Username = owner
131
rs.WorkspaceName = workspace
132
rs.InstanceID = instance
133
134
err = rs.Validate()
135
if err != nil {
136
return err
137
}
138
139
cl, err := NewMinIOClient(&rs.MinIOConfig)
140
if err != nil {
141
return err
142
}
143
rs.client = cl
144
145
if rs.ObjectAccess == nil {
146
rs.ObjectAccess = rs.defaultObjectAccess
147
}
148
149
return nil
150
}
151
152
func (rs *DirectMinIOStorage) defaultObjectAccess(ctx context.Context, bkt, obj string) (io.ReadCloser, error) {
153
if rs.client == nil {
154
return nil, xerrors.Errorf("no MinIO client available - did you call Init()?")
155
}
156
157
object, err := rs.client.GetObject(ctx, bkt, obj, minio.GetObjectOptions{})
158
if err != nil {
159
return nil, translateMinioError(err)
160
}
161
_, err = object.Stat()
162
if err != nil {
163
return nil, translateMinioError(err)
164
}
165
166
return object, nil
167
}
168
169
// EnsureExists makes sure that the remote storage location exists and can be up- or downloaded from
170
func (rs *DirectMinIOStorage) EnsureExists(ctx context.Context) (err error) {
171
return minioEnsureExists(ctx, rs.client, rs.bucketName(), rs.MinIOConfig)
172
}
173
174
func minioEnsureExists(ctx context.Context, client *minio.Client, bucketName string, miniIOConfig config.MinIOConfig) (err error) {
175
//nolint:staticcheck,ineffassign
176
span, ctx := opentracing.StartSpanFromContext(ctx, "DirectEnsureExists")
177
defer tracing.FinishSpan(span, &err)
178
179
if client == nil {
180
return xerrors.Errorf("no MinIO client available - did you call Init()?")
181
}
182
183
exists, err := client.BucketExists(ctx, bucketName)
184
if err != nil {
185
return err
186
}
187
if exists {
188
// bucket exists already - we're fine
189
return nil
190
}
191
192
log.WithField("bucketName", bucketName).Debug("Creating bucket")
193
err = client.MakeBucket(ctx, bucketName, minio.MakeBucketOptions{Region: miniIOConfig.Region})
194
if err != nil {
195
return xerrors.Errorf("cannot create bucket: %w", err)
196
}
197
198
return nil
199
}
200
201
func (rs *DirectMinIOStorage) download(ctx context.Context, destination string, bkt string, obj string, mappings []archive.IDMapping) (found bool, err error) {
202
//nolint:ineffassign
203
span, ctx := opentracing.StartSpanFromContext(ctx, "download")
204
span.SetTag("bucket", bkt)
205
span.SetTag("object", obj)
206
defer tracing.FinishSpan(span, &err)
207
208
rc, err := rs.ObjectAccess(ctx, bkt, obj)
209
if rc == nil {
210
return false, err
211
}
212
defer rc.Close()
213
214
err = extractTarbal(ctx, destination, rc, mappings)
215
if err != nil {
216
return true, err
217
}
218
219
return true, nil
220
}
221
222
// Download takes the latest state from the remote storage and downloads it to a local path
223
func (rs *DirectMinIOStorage) Download(ctx context.Context, destination string, name string, mappings []archive.IDMapping) (bool, error) {
224
return rs.download(ctx, destination, rs.bucketName(), rs.objectName(name), mappings)
225
}
226
227
// DownloadSnapshot downloads a snapshot. The snapshot name is expected to be one produced by Qualify
228
func (rs *DirectMinIOStorage) DownloadSnapshot(ctx context.Context, destination string, name string, mappings []archive.IDMapping) (bool, error) {
229
bkt, obj, err := ParseSnapshotName(name)
230
if err != nil {
231
return false, err
232
}
233
234
return rs.download(ctx, destination, bkt, obj, mappings)
235
}
236
237
// ListObjects returns all objects found with the given prefix. Returns an empty list if the bucket does not exuist (yet).
238
func (rs *DirectMinIOStorage) ListObjects(ctx context.Context, prefix string) (objects []string, err error) {
239
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
240
defer cancel()
241
242
bucketName := rs.bucketName()
243
exists, err := rs.client.BucketExists(ctx, bucketName)
244
if err != nil {
245
return nil, xerrors.Errorf("cannot list objects: %w", err)
246
}
247
if !exists {
248
// bucket does not exist: nothing to list
249
return nil, nil
250
}
251
252
objectCh := rs.client.ListObjects(ctx, bucketName, minio.ListObjectsOptions{
253
Prefix: prefix,
254
Recursive: true,
255
})
256
for object := range objectCh {
257
if object.Err != nil {
258
return nil, xerrors.Errorf("cannot iterate list objects: %w", object.Err)
259
}
260
objects = append(objects, object.Key)
261
}
262
return objects, nil
263
}
264
265
// Qualify fully qualifies a snapshot name so that it can be downloaded using DownloadSnapshot
266
func (rs *DirectMinIOStorage) Qualify(name string) string {
267
return fmt.Sprintf("%s@%s", rs.objectName(name), rs.bucketName())
268
}
269
270
// UploadInstance takes all files from a local location and uploads it to the per-instance remote storage
271
func (rs *DirectMinIOStorage) UploadInstance(ctx context.Context, source string, name string, opts ...UploadOption) (bucket, object string, err error) {
272
if rs.InstanceID == "" {
273
return "", "", xerrors.Errorf("instanceID is required to comput object name")
274
}
275
return rs.Upload(ctx, source, InstanceObjectName(rs.InstanceID, name), opts...)
276
}
277
278
// Upload takes all files from a local location and uploads it to the remote storage
279
func (rs *DirectMinIOStorage) Upload(ctx context.Context, source string, name string, opts ...UploadOption) (bucket, obj string, err error) {
280
//nolint:ineffassign
281
span, ctx := opentracing.StartSpanFromContext(ctx, "DirectUpload")
282
defer tracing.FinishSpan(span, &err)
283
284
options, err := GetUploadOptions(opts)
285
if err != nil {
286
err = xerrors.Errorf("cannot get options: %w", err)
287
return
288
}
289
290
if rs.client == nil {
291
err = xerrors.Errorf("no minio client available - did you call Init()?")
292
return
293
}
294
295
// upload the thing
296
bucket = rs.bucketName()
297
obj = rs.objectName(name)
298
span.LogKV("bucket", bucket)
299
span.LogKV("obj", obj)
300
span.LogKV("endpoint", rs.MinIOConfig.Endpoint)
301
span.LogKV("region", rs.MinIOConfig.Region)
302
span.LogKV("key", rs.MinIOConfig.AccessKeyID)
303
_, err = rs.client.FPutObject(ctx, bucket, obj, source, minio.PutObjectOptions{
304
NumThreads: rs.MinIOConfig.ParallelUpload,
305
UserMetadata: options.Annotations,
306
ContentType: options.ContentType,
307
})
308
if err != nil {
309
return
310
}
311
312
return
313
}
314
315
func minioBucketName(ownerID, bucketName string) string {
316
if bucketName != "" {
317
return bucketName
318
}
319
320
return fmt.Sprintf("gitpod-user-%s", ownerID)
321
}
322
323
func minioWorkspaceBackupObjectName(ownerID, workspaceID, name string) string {
324
return filepath.Join(ownerID, "workspaces", workspaceID, name)
325
}
326
327
// Bucket provides the bucket name for a particular user
328
func (rs *DirectMinIOStorage) Bucket(ownerID string) string {
329
return minioBucketName(ownerID, rs.MinIOConfig.BucketName)
330
}
331
332
// BackupObject returns a backup's object name that a direct downloader would download
333
func (rs *DirectMinIOStorage) BackupObject(name string) string {
334
return rs.objectName(name)
335
}
336
337
func (rs *DirectMinIOStorage) bucketName() string {
338
return minioBucketName(rs.Username, rs.MinIOConfig.BucketName)
339
}
340
341
func (rs *DirectMinIOStorage) objectName(name string) string {
342
var username string
343
if rs.MinIOConfig.BucketName != "" {
344
username = rs.Username
345
}
346
return minioWorkspaceBackupObjectName(username, rs.WorkspaceName, name)
347
}
348
349
func newPresignedMinIOAccess(cfg config.MinIOConfig) (*presignedMinIOStorage, error) {
350
cl, err := NewMinIOClient(&cfg)
351
if err != nil {
352
return nil, err
353
}
354
return &presignedMinIOStorage{client: cl, MinIOConfig: cfg}, nil
355
}
356
357
type presignedMinIOStorage struct {
358
client *minio.Client
359
MinIOConfig config.MinIOConfig
360
}
361
362
// EnsureExists makes sure that the remote storage location exists and can be up- or downloaded from
363
func (s *presignedMinIOStorage) EnsureExists(ctx context.Context, bucket string) (err error) {
364
return minioEnsureExists(ctx, s.client, bucket, s.MinIOConfig)
365
}
366
367
func (s *presignedMinIOStorage) DiskUsage(ctx context.Context, bucket string, prefix string) (size int64, err error) {
368
//nolint:ineffassign
369
span, ctx := opentracing.StartSpanFromContext(ctx, "minio.DiskUsage")
370
defer tracing.FinishSpan(span, &err)
371
372
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
373
defer cancel()
374
375
objectCh := s.client.ListObjects(ctx, bucket, minio.ListObjectsOptions{
376
Prefix: prefix,
377
Recursive: true,
378
})
379
var total int64
380
for object := range objectCh {
381
if object.Err != nil {
382
return 0, object.Err
383
}
384
total += object.Size
385
}
386
return total, nil
387
}
388
389
func (s *presignedMinIOStorage) SignDownload(ctx context.Context, bucket, object string, options *SignedURLOptions) (info *DownloadInfo, err error) {
390
//nolint:ineffassign
391
span, ctx := opentracing.StartSpanFromContext(ctx, "minio.SignDownload")
392
defer func() {
393
if err == ErrNotFound {
394
span.LogKV("found", false)
395
tracing.FinishSpan(span, nil)
396
return
397
}
398
399
tracing.FinishSpan(span, &err)
400
}()
401
402
obj, err := s.client.GetObject(ctx, bucket, object, minio.GetObjectOptions{})
403
if err != nil {
404
return nil, translateMinioError(err)
405
}
406
stat, err := obj.Stat()
407
if err != nil {
408
return nil, translateMinioError(err)
409
}
410
url, err := s.client.PresignedGetObject(ctx, bucket, object, 30*time.Minute, nil)
411
if err != nil {
412
return nil, translateMinioError(err)
413
}
414
415
return &DownloadInfo{
416
Meta: ObjectMeta{
417
ContentType: stat.ContentType,
418
OCIMediaType: stat.Metadata.Get(annotationToAmzMetaHeader(ObjectAnnotationOCIContentType)),
419
Digest: stat.Metadata.Get(annotationToAmzMetaHeader(ObjectAnnotationDigest)),
420
UncompressedDigest: stat.Metadata.Get(annotationToAmzMetaHeader(ObjectAnnotationUncompressedDigest)),
421
},
422
Size: stat.Size,
423
URL: url.String(),
424
}, nil
425
}
426
427
// SignUpload describes an object for upload
428
func (s *presignedMinIOStorage) SignUpload(ctx context.Context, bucket, obj string, options *SignedURLOptions) (info *UploadInfo, err error) {
429
//nolint:ineffassign
430
span, ctx := opentracing.StartSpanFromContext(ctx, "minio.SignUpload")
431
defer func() {
432
if err == ErrNotFound {
433
span.LogKV("found", false)
434
tracing.FinishSpan(span, nil)
435
return
436
}
437
438
tracing.FinishSpan(span, &err)
439
}()
440
441
url, err := s.client.PresignedPutObject(ctx, bucket, obj, 30*time.Minute)
442
if err != nil {
443
return nil, translateMinioError(err)
444
}
445
return &UploadInfo{URL: url.String()}, nil
446
}
447
448
func (s *presignedMinIOStorage) DeleteObject(ctx context.Context, bucket string, query *DeleteObjectQuery) (err error) {
449
//nolint:ineffassign
450
span, ctx := opentracing.StartSpanFromContext(ctx, "minio.DeleteObject")
451
defer tracing.FinishSpan(span, &err)
452
453
if query.Name != "" {
454
err = s.client.RemoveObject(ctx, bucket, query.Name, minio.RemoveObjectOptions{})
455
if err != nil {
456
log.WithField("bucket", bucket).WithField("object", query.Name).Error(err)
457
return translateMinioError(err)
458
}
459
return nil
460
}
461
if query.Prefix != "" {
462
objectsCh := make(chan minio.ObjectInfo)
463
go func() {
464
defer close(objectsCh)
465
for object := range s.client.ListObjects(ctx, bucket, minio.ListObjectsOptions{
466
Prefix: query.Prefix,
467
Recursive: true,
468
}) {
469
objectsCh <- object
470
}
471
}()
472
for removeErr := range s.client.RemoveObjects(ctx, bucket, objectsCh, minio.RemoveObjectsOptions{}) {
473
err = removeErr.Err
474
log.WithField("bucket", bucket).WithField("object", removeErr.ObjectName).Error(err)
475
}
476
}
477
return translateMinioError(err)
478
}
479
480
// DeleteBucket deletes a bucket
481
func (s *presignedMinIOStorage) DeleteBucket(ctx context.Context, userID, bucket string) (err error) {
482
span, ctx := opentracing.StartSpanFromContext(ctx, "minio.DeleteBucket")
483
defer tracing.FinishSpan(span, &err)
484
485
err = s.DeleteObject(ctx, bucket, &DeleteObjectQuery{Prefix: "/"})
486
if err != nil {
487
return translateMinioError(err)
488
}
489
490
err = s.client.RemoveBucket(ctx, bucket)
491
if err != nil {
492
return translateMinioError(err)
493
}
494
return nil
495
}
496
497
// ObjectHash gets a hash value of an object
498
func (s *presignedMinIOStorage) ObjectHash(ctx context.Context, bucket string, obj string) (hash string, err error) {
499
span, ctx := opentracing.StartSpanFromContext(ctx, "minio.ObjectHash")
500
defer tracing.FinishSpan(span, &err)
501
502
info, err := s.client.StatObject(ctx, bucket, obj, minio.StatObjectOptions{})
503
if err != nil {
504
return "", translateMinioError(err)
505
}
506
return info.ETag, nil
507
}
508
509
func (s *presignedMinIOStorage) ObjectExists(ctx context.Context, bucket, obj string) (exists bool, err error) {
510
span, ctx := opentracing.StartSpanFromContext(ctx, "minio.ObjectExists")
511
defer tracing.FinishSpan(span, &err)
512
513
_, err = s.client.StatObject(ctx, bucket, obj, minio.StatObjectOptions{})
514
if err != nil {
515
e := translateMinioError(err)
516
if e == ErrNotFound {
517
return false, nil
518
}
519
return false, e
520
}
521
return true, nil
522
}
523
524
func annotationToAmzMetaHeader(annotation string) string {
525
return http.CanonicalHeaderKey(fmt.Sprintf("X-Amz-Meta-%s", annotation))
526
}
527
528
// Bucket provides the bucket name for a particular user
529
func (s *presignedMinIOStorage) Bucket(ownerID string) string {
530
return minioBucketName(ownerID, s.MinIOConfig.BucketName)
531
}
532
533
// BlobObject returns a blob's object name
534
func (s *presignedMinIOStorage) BlobObject(userID, name string) (string, error) {
535
return blobObjectName(name)
536
}
537
538
// BackupObject returns a backup's object name that a direct downloader would download
539
func (s *presignedMinIOStorage) BackupObject(ownerID string, workspaceID, name string) string {
540
var username string
541
if s.MinIOConfig.BucketName != "" {
542
username = ownerID
543
}
544
return minioWorkspaceBackupObjectName(username, workspaceID, name)
545
}
546
547
// InstanceObject returns a instance's object name that a direct downloader would download
548
func (s *presignedMinIOStorage) InstanceObject(ownerID string, workspaceID string, instanceID string, name string) string {
549
return s.BackupObject(ownerID, workspaceID, InstanceObjectName(instanceID, name))
550
}
551
552
func translateMinioError(err error) error {
553
if err == nil {
554
return nil
555
}
556
557
aerr, ok := err.(*minio.ErrorResponse)
558
if ok {
559
if aerr.StatusCode == http.StatusNotFound || aerr.Code == "NoSuchKey" || aerr.Code == "NoSuchBucket" {
560
return ErrNotFound
561
}
562
}
563
564
if strings.Contains(err.Error(), "bucket does not exist") {
565
return ErrNotFound
566
}
567
if strings.Contains(err.Error(), "key does not exist") {
568
return ErrNotFound
569
}
570
571
return err
572
}
573
574