Path: blob/main/components/content-service/pkg/storage/minio.go
2501 views
// Copyright (c) 2020 Gitpod GmbH. All rights reserved.1// Licensed under the GNU Affero General Public License (AGPL).2// See License.AGPL.txt in the project root for license information.34package storage56import (7"context"8"fmt"9"io"10"net/http"11"os"12"path/filepath"13"strings"14"time"1516validation "github.com/go-ozzo/ozzo-validation"17minio "github.com/minio/minio-go/v7"18"github.com/minio/minio-go/v7/pkg/credentials"19"github.com/opentracing/opentracing-go"20"golang.org/x/xerrors"2122"github.com/gitpod-io/gitpod/common-go/log"23"github.com/gitpod-io/gitpod/common-go/tracing"24config "github.com/gitpod-io/gitpod/content-service/api/config"25"github.com/gitpod-io/gitpod/content-service/pkg/archive"26)2728var _ DirectAccess = &DirectMinIOStorage{}2930// Validate checks if the GCloud storage MinIOconfig is valid31func ValidateMinIOConfig(c *config.MinIOConfig) error {32return validation.ValidateStruct(c,33validation.Field(&c.Endpoint, validation.Required),34validation.Field(&c.AccessKeyID, validation.Required),35validation.Field(&c.SecretAccessKey, validation.Required),36validation.Field(&c.Region, validation.Required),37)38}3940// addMinioParamsFromMounts allows for access/secret key to be read from a file41func addMinioParamsFromMounts(c *config.MinIOConfig) error {42// Allow volume mounts to be passed in for access/secret key43if c.AccessKeyIdFile != "" {44value, err := os.ReadFile(c.AccessKeyIdFile)45if err != nil {46return err47}48c.AccessKeyID = string(value)49}50if c.SecretAccessKeyFile != "" {51value, err := os.ReadFile(c.SecretAccessKeyFile)52if err != nil {53return err54}55c.SecretAccessKey = string(value)56}57return nil58}5960// MinIOClient produces a new minio client based on this configuration61func NewMinIOClient(c *config.MinIOConfig) (*minio.Client, error) {62if c.ParallelUpload == 0 {63c.ParallelUpload = 164}6566err := addMinioParamsFromMounts(c)67if err != nil {68return nil, err69}7071// now that we have all the information complete, validate if we're good to go72err = ValidateMinIOConfig(c)73if err != nil {74return nil, err75}7677minioClient, err := minio.New(c.Endpoint, &minio.Options{78Creds: credentials.NewStaticV4(c.AccessKeyID, c.SecretAccessKey, ""),79Secure: c.Secure,80})81if err != nil {82return nil, err83}8485return minioClient, nil86}8788// newDirectMinIOAccess provides direct access to the remote storage system89func newDirectMinIOAccess(cfg config.MinIOConfig) (*DirectMinIOStorage, error) {90err := addMinioParamsFromMounts(&cfg)91if err != nil {92return nil, err93}9495if err = ValidateMinIOConfig(&cfg); err != nil {96return nil, err97}98return &DirectMinIOStorage{MinIOConfig: cfg}, nil99}100101// DirectMinIOStorage implements MinIO as remote storage backend102type DirectMinIOStorage struct {103Username string104WorkspaceName string105InstanceID string106MinIOConfig config.MinIOConfig107108client *minio.Client109110// ObjectAccess just exists so that we can swap out the stream access during testing111ObjectAccess func(ctx context.Context, btk, obj string) (io.ReadCloser, error)112}113114// Validate checks if the GCloud storage is MinIOconfigured properly115func (rs *DirectMinIOStorage) Validate() error {116err := ValidateMinIOConfig(&rs.MinIOConfig)117if err != nil {118return err119}120121return validation.ValidateStruct(rs,122validation.Field(&rs.Username, validation.Required),123validation.Field(&rs.WorkspaceName, validation.Required),124)125}126127// Init initializes the remote storage - call this before calling anything else on the interface128func (rs *DirectMinIOStorage) Init(ctx context.Context, owner, workspace, instance string) (err error) {129rs.Username = owner130rs.WorkspaceName = workspace131rs.InstanceID = instance132133err = rs.Validate()134if err != nil {135return err136}137138cl, err := NewMinIOClient(&rs.MinIOConfig)139if err != nil {140return err141}142rs.client = cl143144if rs.ObjectAccess == nil {145rs.ObjectAccess = rs.defaultObjectAccess146}147148return nil149}150151func (rs *DirectMinIOStorage) defaultObjectAccess(ctx context.Context, bkt, obj string) (io.ReadCloser, error) {152if rs.client == nil {153return nil, xerrors.Errorf("no MinIO client available - did you call Init()?")154}155156object, err := rs.client.GetObject(ctx, bkt, obj, minio.GetObjectOptions{})157if err != nil {158return nil, translateMinioError(err)159}160_, err = object.Stat()161if err != nil {162return nil, translateMinioError(err)163}164165return object, nil166}167168// EnsureExists makes sure that the remote storage location exists and can be up- or downloaded from169func (rs *DirectMinIOStorage) EnsureExists(ctx context.Context) (err error) {170return minioEnsureExists(ctx, rs.client, rs.bucketName(), rs.MinIOConfig)171}172173func minioEnsureExists(ctx context.Context, client *minio.Client, bucketName string, miniIOConfig config.MinIOConfig) (err error) {174//nolint:staticcheck,ineffassign175span, ctx := opentracing.StartSpanFromContext(ctx, "DirectEnsureExists")176defer tracing.FinishSpan(span, &err)177178if client == nil {179return xerrors.Errorf("no MinIO client available - did you call Init()?")180}181182exists, err := client.BucketExists(ctx, bucketName)183if err != nil {184return err185}186if exists {187// bucket exists already - we're fine188return nil189}190191log.WithField("bucketName", bucketName).Debug("Creating bucket")192err = client.MakeBucket(ctx, bucketName, minio.MakeBucketOptions{Region: miniIOConfig.Region})193if err != nil {194return xerrors.Errorf("cannot create bucket: %w", err)195}196197return nil198}199200func (rs *DirectMinIOStorage) download(ctx context.Context, destination string, bkt string, obj string, mappings []archive.IDMapping) (found bool, err error) {201//nolint:ineffassign202span, ctx := opentracing.StartSpanFromContext(ctx, "download")203span.SetTag("bucket", bkt)204span.SetTag("object", obj)205defer tracing.FinishSpan(span, &err)206207rc, err := rs.ObjectAccess(ctx, bkt, obj)208if rc == nil {209return false, err210}211defer rc.Close()212213err = extractTarbal(ctx, destination, rc, mappings)214if err != nil {215return true, err216}217218return true, nil219}220221// Download takes the latest state from the remote storage and downloads it to a local path222func (rs *DirectMinIOStorage) Download(ctx context.Context, destination string, name string, mappings []archive.IDMapping) (bool, error) {223return rs.download(ctx, destination, rs.bucketName(), rs.objectName(name), mappings)224}225226// DownloadSnapshot downloads a snapshot. The snapshot name is expected to be one produced by Qualify227func (rs *DirectMinIOStorage) DownloadSnapshot(ctx context.Context, destination string, name string, mappings []archive.IDMapping) (bool, error) {228bkt, obj, err := ParseSnapshotName(name)229if err != nil {230return false, err231}232233return rs.download(ctx, destination, bkt, obj, mappings)234}235236// ListObjects returns all objects found with the given prefix. Returns an empty list if the bucket does not exuist (yet).237func (rs *DirectMinIOStorage) ListObjects(ctx context.Context, prefix string) (objects []string, err error) {238ctx, cancel := context.WithTimeout(ctx, 10*time.Second)239defer cancel()240241bucketName := rs.bucketName()242exists, err := rs.client.BucketExists(ctx, bucketName)243if err != nil {244return nil, xerrors.Errorf("cannot list objects: %w", err)245}246if !exists {247// bucket does not exist: nothing to list248return nil, nil249}250251objectCh := rs.client.ListObjects(ctx, bucketName, minio.ListObjectsOptions{252Prefix: prefix,253Recursive: true,254})255for object := range objectCh {256if object.Err != nil {257return nil, xerrors.Errorf("cannot iterate list objects: %w", object.Err)258}259objects = append(objects, object.Key)260}261return objects, nil262}263264// Qualify fully qualifies a snapshot name so that it can be downloaded using DownloadSnapshot265func (rs *DirectMinIOStorage) Qualify(name string) string {266return fmt.Sprintf("%s@%s", rs.objectName(name), rs.bucketName())267}268269// UploadInstance takes all files from a local location and uploads it to the per-instance remote storage270func (rs *DirectMinIOStorage) UploadInstance(ctx context.Context, source string, name string, opts ...UploadOption) (bucket, object string, err error) {271if rs.InstanceID == "" {272return "", "", xerrors.Errorf("instanceID is required to comput object name")273}274return rs.Upload(ctx, source, InstanceObjectName(rs.InstanceID, name), opts...)275}276277// Upload takes all files from a local location and uploads it to the remote storage278func (rs *DirectMinIOStorage) Upload(ctx context.Context, source string, name string, opts ...UploadOption) (bucket, obj string, err error) {279//nolint:ineffassign280span, ctx := opentracing.StartSpanFromContext(ctx, "DirectUpload")281defer tracing.FinishSpan(span, &err)282283options, err := GetUploadOptions(opts)284if err != nil {285err = xerrors.Errorf("cannot get options: %w", err)286return287}288289if rs.client == nil {290err = xerrors.Errorf("no minio client available - did you call Init()?")291return292}293294// upload the thing295bucket = rs.bucketName()296obj = rs.objectName(name)297span.LogKV("bucket", bucket)298span.LogKV("obj", obj)299span.LogKV("endpoint", rs.MinIOConfig.Endpoint)300span.LogKV("region", rs.MinIOConfig.Region)301span.LogKV("key", rs.MinIOConfig.AccessKeyID)302_, err = rs.client.FPutObject(ctx, bucket, obj, source, minio.PutObjectOptions{303NumThreads: rs.MinIOConfig.ParallelUpload,304UserMetadata: options.Annotations,305ContentType: options.ContentType,306})307if err != nil {308return309}310311return312}313314func minioBucketName(ownerID, bucketName string) string {315if bucketName != "" {316return bucketName317}318319return fmt.Sprintf("gitpod-user-%s", ownerID)320}321322func minioWorkspaceBackupObjectName(ownerID, workspaceID, name string) string {323return filepath.Join(ownerID, "workspaces", workspaceID, name)324}325326// Bucket provides the bucket name for a particular user327func (rs *DirectMinIOStorage) Bucket(ownerID string) string {328return minioBucketName(ownerID, rs.MinIOConfig.BucketName)329}330331// BackupObject returns a backup's object name that a direct downloader would download332func (rs *DirectMinIOStorage) BackupObject(name string) string {333return rs.objectName(name)334}335336func (rs *DirectMinIOStorage) bucketName() string {337return minioBucketName(rs.Username, rs.MinIOConfig.BucketName)338}339340func (rs *DirectMinIOStorage) objectName(name string) string {341var username string342if rs.MinIOConfig.BucketName != "" {343username = rs.Username344}345return minioWorkspaceBackupObjectName(username, rs.WorkspaceName, name)346}347348func newPresignedMinIOAccess(cfg config.MinIOConfig) (*presignedMinIOStorage, error) {349cl, err := NewMinIOClient(&cfg)350if err != nil {351return nil, err352}353return &presignedMinIOStorage{client: cl, MinIOConfig: cfg}, nil354}355356type presignedMinIOStorage struct {357client *minio.Client358MinIOConfig config.MinIOConfig359}360361// EnsureExists makes sure that the remote storage location exists and can be up- or downloaded from362func (s *presignedMinIOStorage) EnsureExists(ctx context.Context, bucket string) (err error) {363return minioEnsureExists(ctx, s.client, bucket, s.MinIOConfig)364}365366func (s *presignedMinIOStorage) DiskUsage(ctx context.Context, bucket string, prefix string) (size int64, err error) {367//nolint:ineffassign368span, ctx := opentracing.StartSpanFromContext(ctx, "minio.DiskUsage")369defer tracing.FinishSpan(span, &err)370371ctx, cancel := context.WithTimeout(ctx, 10*time.Second)372defer cancel()373374objectCh := s.client.ListObjects(ctx, bucket, minio.ListObjectsOptions{375Prefix: prefix,376Recursive: true,377})378var total int64379for object := range objectCh {380if object.Err != nil {381return 0, object.Err382}383total += object.Size384}385return total, nil386}387388func (s *presignedMinIOStorage) SignDownload(ctx context.Context, bucket, object string, options *SignedURLOptions) (info *DownloadInfo, err error) {389//nolint:ineffassign390span, ctx := opentracing.StartSpanFromContext(ctx, "minio.SignDownload")391defer func() {392if err == ErrNotFound {393span.LogKV("found", false)394tracing.FinishSpan(span, nil)395return396}397398tracing.FinishSpan(span, &err)399}()400401obj, err := s.client.GetObject(ctx, bucket, object, minio.GetObjectOptions{})402if err != nil {403return nil, translateMinioError(err)404}405stat, err := obj.Stat()406if err != nil {407return nil, translateMinioError(err)408}409url, err := s.client.PresignedGetObject(ctx, bucket, object, 30*time.Minute, nil)410if err != nil {411return nil, translateMinioError(err)412}413414return &DownloadInfo{415Meta: ObjectMeta{416ContentType: stat.ContentType,417OCIMediaType: stat.Metadata.Get(annotationToAmzMetaHeader(ObjectAnnotationOCIContentType)),418Digest: stat.Metadata.Get(annotationToAmzMetaHeader(ObjectAnnotationDigest)),419UncompressedDigest: stat.Metadata.Get(annotationToAmzMetaHeader(ObjectAnnotationUncompressedDigest)),420},421Size: stat.Size,422URL: url.String(),423}, nil424}425426// SignUpload describes an object for upload427func (s *presignedMinIOStorage) SignUpload(ctx context.Context, bucket, obj string, options *SignedURLOptions) (info *UploadInfo, err error) {428//nolint:ineffassign429span, ctx := opentracing.StartSpanFromContext(ctx, "minio.SignUpload")430defer func() {431if err == ErrNotFound {432span.LogKV("found", false)433tracing.FinishSpan(span, nil)434return435}436437tracing.FinishSpan(span, &err)438}()439440url, err := s.client.PresignedPutObject(ctx, bucket, obj, 30*time.Minute)441if err != nil {442return nil, translateMinioError(err)443}444return &UploadInfo{URL: url.String()}, nil445}446447func (s *presignedMinIOStorage) DeleteObject(ctx context.Context, bucket string, query *DeleteObjectQuery) (err error) {448//nolint:ineffassign449span, ctx := opentracing.StartSpanFromContext(ctx, "minio.DeleteObject")450defer tracing.FinishSpan(span, &err)451452if query.Name != "" {453err = s.client.RemoveObject(ctx, bucket, query.Name, minio.RemoveObjectOptions{})454if err != nil {455log.WithField("bucket", bucket).WithField("object", query.Name).Error(err)456return translateMinioError(err)457}458return nil459}460if query.Prefix != "" {461objectsCh := make(chan minio.ObjectInfo)462go func() {463defer close(objectsCh)464for object := range s.client.ListObjects(ctx, bucket, minio.ListObjectsOptions{465Prefix: query.Prefix,466Recursive: true,467}) {468objectsCh <- object469}470}()471for removeErr := range s.client.RemoveObjects(ctx, bucket, objectsCh, minio.RemoveObjectsOptions{}) {472err = removeErr.Err473log.WithField("bucket", bucket).WithField("object", removeErr.ObjectName).Error(err)474}475}476return translateMinioError(err)477}478479// DeleteBucket deletes a bucket480func (s *presignedMinIOStorage) DeleteBucket(ctx context.Context, userID, bucket string) (err error) {481span, ctx := opentracing.StartSpanFromContext(ctx, "minio.DeleteBucket")482defer tracing.FinishSpan(span, &err)483484err = s.DeleteObject(ctx, bucket, &DeleteObjectQuery{Prefix: "/"})485if err != nil {486return translateMinioError(err)487}488489err = s.client.RemoveBucket(ctx, bucket)490if err != nil {491return translateMinioError(err)492}493return nil494}495496// ObjectHash gets a hash value of an object497func (s *presignedMinIOStorage) ObjectHash(ctx context.Context, bucket string, obj string) (hash string, err error) {498span, ctx := opentracing.StartSpanFromContext(ctx, "minio.ObjectHash")499defer tracing.FinishSpan(span, &err)500501info, err := s.client.StatObject(ctx, bucket, obj, minio.StatObjectOptions{})502if err != nil {503return "", translateMinioError(err)504}505return info.ETag, nil506}507508func (s *presignedMinIOStorage) ObjectExists(ctx context.Context, bucket, obj string) (exists bool, err error) {509span, ctx := opentracing.StartSpanFromContext(ctx, "minio.ObjectExists")510defer tracing.FinishSpan(span, &err)511512_, err = s.client.StatObject(ctx, bucket, obj, minio.StatObjectOptions{})513if err != nil {514e := translateMinioError(err)515if e == ErrNotFound {516return false, nil517}518return false, e519}520return true, nil521}522523func annotationToAmzMetaHeader(annotation string) string {524return http.CanonicalHeaderKey(fmt.Sprintf("X-Amz-Meta-%s", annotation))525}526527// Bucket provides the bucket name for a particular user528func (s *presignedMinIOStorage) Bucket(ownerID string) string {529return minioBucketName(ownerID, s.MinIOConfig.BucketName)530}531532// BlobObject returns a blob's object name533func (s *presignedMinIOStorage) BlobObject(userID, name string) (string, error) {534return blobObjectName(name)535}536537// BackupObject returns a backup's object name that a direct downloader would download538func (s *presignedMinIOStorage) BackupObject(ownerID string, workspaceID, name string) string {539var username string540if s.MinIOConfig.BucketName != "" {541username = ownerID542}543return minioWorkspaceBackupObjectName(username, workspaceID, name)544}545546// InstanceObject returns a instance's object name that a direct downloader would download547func (s *presignedMinIOStorage) InstanceObject(ownerID string, workspaceID string, instanceID string, name string) string {548return s.BackupObject(ownerID, workspaceID, InstanceObjectName(instanceID, name))549}550551func translateMinioError(err error) error {552if err == nil {553return nil554}555556aerr, ok := err.(*minio.ErrorResponse)557if ok {558if aerr.StatusCode == http.StatusNotFound || aerr.Code == "NoSuchKey" || aerr.Code == "NoSuchBucket" {559return ErrNotFound560}561}562563if strings.Contains(err.Error(), "bucket does not exist") {564return ErrNotFound565}566if strings.Contains(err.Error(), "key does not exist") {567return ErrNotFound568}569570return err571}572573574