Path: blob/main/components/content-service/pkg/storage/s3.go
2501 views
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.1// Licensed under the GNU Affero General Public License (AGPL).2// See License.AGPL.txt in the project root for license information.34package storage56import (7"context"8"errors"9"fmt"10"os"11"path/filepath"12"strings"1314"github.com/gitpod-io/gitpod/common-go/log"15"github.com/gitpod-io/gitpod/content-service/pkg/archive"16"golang.org/x/xerrors"1718"github.com/aws/aws-sdk-go-v2/aws"19v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"20s3manager "github.com/aws/aws-sdk-go-v2/feature/s3/manager"21"github.com/aws/aws-sdk-go-v2/service/s3"22"github.com/aws/aws-sdk-go-v2/service/s3/types"23)2425const (26defaultCopyConcurrency = 1027defaultPartSize = 50 // MiB28megabytes = 1024 * 102429)3031var _ DirectAccess = &s3Storage{}32var _ PresignedAccess = &PresignedS3Storage{}3334type S3Config struct {35Bucket string36}3738type S3Client interface {39ListObjectsV2(ctx context.Context, params *s3.ListObjectsV2Input, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error)40DeleteObjects(ctx context.Context, params *s3.DeleteObjectsInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectsOutput, error)41GetObjectAttributes(ctx context.Context, params *s3.GetObjectAttributesInput, optFns ...func(*s3.Options)) (*s3.GetObjectAttributesOutput, error)42GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error)43}4445type PresignedS3Client interface {46PresignGetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.PresignOptions)) (*v4.PresignedHTTPRequest, error)47PresignPutObject(ctx context.Context, params *s3.PutObjectInput, optFns ...func(*s3.PresignOptions)) (*v4.PresignedHTTPRequest, error)48}4950func NewPresignedS3Access(client S3Client, config S3Config) *PresignedS3Storage {51return &PresignedS3Storage{52Config: config,53client: client,54PresignedFactory: func() PresignedS3Client {55if s3c, ok := client.(*s3.Client); ok {56return s3.NewPresignClient(s3c)57}58return nil59},60}61}6263type PresignedS3Storage struct {64Config S3Config6566client S3Client6768// PresignedFactory exists for testing only. DO NOT USE in production.69PresignedFactory func() PresignedS3Client70}7172// Bucket implements PresignedAccess73func (rs *PresignedS3Storage) Bucket(userID string) string {74return rs.Config.Bucket75}7677// BlobObject implements PresignedAccess78func (rs *PresignedS3Storage) BlobObject(userID, name string) (string, error) {79blb, err := blobObjectName(name)80if err != nil {81return "", err82}8384return filepath.Join(userID, blb), nil85}8687// BackupObject implements PresignedAccess88func (rs *PresignedS3Storage) BackupObject(ownerID string, workspaceID string, name string) string {89return s3WorkspaceBackupObjectName(ownerID, workspaceID, name)90}9192// DeleteBucket implements PresignedAccess93func (rs *PresignedS3Storage) DeleteBucket(ctx context.Context, userID, bucket string) error {94if bucket != rs.Config.Bucket {95log.WithField("requestedBucket", bucket).WithField("configuredBucket", rs.Config.Bucket).Error("can only delete from configured bucket")96return xerrors.Errorf("can only delete from configured bucket; this looks like a bug in Gitpod")97}9899return rs.DeleteObject(ctx, rs.Config.Bucket, &DeleteObjectQuery{Prefix: userID + "/"})100}101102// DeleteObject implements PresignedAccess103func (rs *PresignedS3Storage) DeleteObject(ctx context.Context, bucket string, query *DeleteObjectQuery) error {104var objects []types.ObjectIdentifier105106switch {107case query.Name != "":108objects = []types.ObjectIdentifier{{Key: &query.Name}}109110case query.Prefix != "":111resp, err := rs.client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{112Bucket: aws.String(rs.Config.Bucket),113Prefix: aws.String(query.Prefix),114})115if err != nil {116return err117}118for _, e := range resp.Contents {119objects = append(objects, types.ObjectIdentifier{120Key: e.Key,121})122}123}124125if len(objects) == 0 {126return nil127}128129resp, err := rs.client.DeleteObjects(ctx, &s3.DeleteObjectsInput{130Bucket: &rs.Config.Bucket,131Delete: &types.Delete{132Objects: objects,133Quiet: aws.Bool(true),134},135})136if err != nil {137return err138}139if len(resp.Errors) > 0 {140var errs []string141for _, e := range resp.Errors {142errs = append(errs, fmt.Sprintf("%s: %s", aws.ToString(e.Key), aws.ToString(e.Message)))143}144return xerrors.Errorf("cannot delete objects: %s", strings.Join(errs, ", "))145}146147return nil148}149150// DiskUsage implements PresignedAccess151func (rs *PresignedS3Storage) DiskUsage(ctx context.Context, bucket string, prefix string) (size int64, err error) {152resp, err := rs.client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{153Bucket: &rs.Config.Bucket,154Prefix: aws.String(prefix),155})156if err != nil {157return 0, err158}159160for _, r := range resp.Contents {161size += *r.Size162}163return164}165166// EnsureExists implements PresignedAccess167func (rs *PresignedS3Storage) EnsureExists(ctx context.Context, bucket string) error {168return nil169}170171// InstanceObject implements PresignedAccess172func (rs *PresignedS3Storage) InstanceObject(ownerID string, workspaceID string, instanceID string, name string) string {173return rs.BackupObject(ownerID, workspaceID, InstanceObjectName(instanceID, name))174}175176// ObjectExists implements PresignedAccess177func (rs *PresignedS3Storage) ObjectExists(ctx context.Context, bucket string, path string) (bool, error) {178_, err := rs.client.GetObjectAttributes(ctx, &s3.GetObjectAttributesInput{179Bucket: &rs.Config.Bucket,180Key: aws.String(path),181ObjectAttributes: []types.ObjectAttributes{types.ObjectAttributesEtag},182})183184var nsk *types.NoSuchKey185if errors.As(err, &nsk) {186return false, nil187}188189if err != nil {190return false, err191}192return true, nil193}194195// ObjectHash implements PresignedAccess196func (rs *PresignedS3Storage) ObjectHash(ctx context.Context, bucket string, obj string) (string, error) {197resp, err := rs.client.GetObjectAttributes(ctx, &s3.GetObjectAttributesInput{198Bucket: &rs.Config.Bucket,199Key: aws.String(obj),200ObjectAttributes: []types.ObjectAttributes{types.ObjectAttributesEtag},201})202var nsk *types.NoSuchKey203if errors.As(err, &nsk) {204return "", ErrNotFound205}206207if err != nil {208return "", err209}210211return *resp.ETag, nil212}213214// SignDownload implements PresignedAccess215func (rs *PresignedS3Storage) SignDownload(ctx context.Context, bucket string, obj string, options *SignedURLOptions) (info *DownloadInfo, err error) {216resp, err := rs.client.GetObjectAttributes(ctx, &s3.GetObjectAttributesInput{217Bucket: &rs.Config.Bucket,218Key: aws.String(obj),219ObjectAttributes: []types.ObjectAttributes{types.ObjectAttributesObjectSize},220})221222var nsk *types.NoSuchKey223if errors.As(err, &nsk) {224return nil, ErrNotFound225}226227if err != nil {228return nil, err229}230231req, err := rs.PresignedFactory().PresignGetObject(ctx, &s3.GetObjectInput{232Bucket: aws.String(rs.Config.Bucket),233Key: aws.String(obj),234})235if err != nil {236return nil, err237}238239return &DownloadInfo{240Meta: ObjectMeta{241// TODO(cw): implement this if we need to support FWB with S3242},243Size: *resp.ObjectSize,244URL: req.URL,245}, nil246}247248// SignUpload implements PresignedAccess249func (rs *PresignedS3Storage) SignUpload(ctx context.Context, bucket string, obj string, options *SignedURLOptions) (info *UploadInfo, err error) {250resp, err := rs.PresignedFactory().PresignPutObject(ctx, &s3.PutObjectInput{251Bucket: &rs.Config.Bucket,252Key: aws.String(obj),253})254if err != nil {255return nil, err256}257258return &UploadInfo{259URL: resp.URL,260}, nil261}262263func newDirectS3Access(client S3Client, config S3Config) *s3Storage {264return &s3Storage{265Config: config,266client: client,267}268}269270type s3Storage struct {271Config S3Config272273OwnerID, WorkspaceID, InstanceID string274275client S3Client276}277278// Bucket implements DirectAccess279func (s3st *s3Storage) Bucket(userID string) string {280return s3st.Config.Bucket281}282283// BackupObject implements DirectAccess284func (s3st *s3Storage) BackupObject(name string) string {285return s3st.objectName(name)286}287288// Download implements DirectAccess289func (s3st *s3Storage) Download(ctx context.Context, destination string, name string, mappings []archive.IDMapping) (found bool, err error) {290return s3st.download(ctx, destination, s3st.objectName(name), mappings)291}292293// DownloadSnapshot implements DirectAccess294func (s3st *s3Storage) DownloadSnapshot(ctx context.Context, destination string, name string, mappings []archive.IDMapping) (found bool, err error) {295return s3st.download(ctx, destination, name, mappings)296}297298func (s3st *s3Storage) download(ctx context.Context, destination string, obj string, mappings []archive.IDMapping) (found bool, err error) {299downloader := s3manager.NewDownloader(s3st.client, func(d *s3manager.Downloader) {300d.Concurrency = defaultCopyConcurrency301d.PartSize = defaultPartSize * megabytes302d.BufferProvider = s3manager.NewPooledBufferedWriterReadFromProvider(25 * megabytes)303})304305s3File, err := os.CreateTemp("", "temporal-s3-file")306if err != nil {307return true, xerrors.Errorf("creating temporal file: %s", err.Error())308}309defer os.Remove(s3File.Name())310311_, err = downloader.Download(ctx, s3File, &s3.GetObjectInput{312Bucket: aws.String(s3st.Config.Bucket),313Key: aws.String(obj),314})315if err != nil {316return false, err317}318319_, err = s3File.Seek(0, 0)320if err != nil {321return false, err322}323324err = archive.ExtractTarbal(ctx, s3File, destination, archive.WithUIDMapping(mappings), archive.WithGIDMapping(mappings))325if err != nil {326return true, xerrors.Errorf("tar %s: %s", destination, err.Error())327}328329return true, nil330}331332// EnsureExists implements DirectAccess333func (*s3Storage) EnsureExists(ctx context.Context) error {334return nil335}336337// Init implements DirectAccess338func (s3st *s3Storage) Init(ctx context.Context, owner string, workspace string, instance string) error {339s3st.OwnerID = owner340s3st.WorkspaceID = workspace341s3st.InstanceID = instance342return nil343}344345// ListObjects implements DirectAccess346func (s3st *s3Storage) ListObjects(ctx context.Context, prefix string) ([]string, error) {347if !strings.HasPrefix(prefix, s3st.OwnerID+"/") {348return nil, xerrors.Errorf("prefix must start with the owner ID")349}350351var res []string352listParams := &s3.ListObjectsV2Input{353Bucket: aws.String(s3st.Config.Bucket),354Prefix: aws.String(prefix),355}356fetchObjects := true357for fetchObjects {358objs, err := s3st.client.ListObjectsV2(ctx, listParams)359360if err != nil {361return nil, xerrors.Errorf("cannot list objects: %w", err)362}363364for _, o := range objs.Contents {365res = append(res, *o.Key)366}367368listParams.ContinuationToken = objs.NextContinuationToken369fetchObjects = *objs.IsTruncated370}371372return res, nil373}374375// Qualify implements DirectAccess376func (s3st *s3Storage) Qualify(name string) string {377return fmt.Sprintf("%s@%s", s3st.objectName(name), s3st.Config.Bucket)378}379380func (s3st *s3Storage) objectName(name string) string {381return s3WorkspaceBackupObjectName(s3st.OwnerID, s3st.WorkspaceID, name)382}383384func s3WorkspaceBackupObjectName(ownerID, workspaceID, name string) string {385return filepath.Join(ownerID, "workspaces", workspaceID, name)386}387388// Upload implements DirectAccess389func (s3st *s3Storage) Upload(ctx context.Context, source string, name string, opts ...UploadOption) (bucket string, obj string, err error) {390options, err := GetUploadOptions(opts)391if err != nil {392err = xerrors.Errorf("cannot get options: %w", err)393return394}395396if s3st.client == nil {397err = xerrors.Errorf("no s3 client available - did you call Init()?")398return399}400401f, err := os.Open(source)402if err != nil {403err = xerrors.Errorf("cannot read backup file: %w", err)404}405defer f.Close()406407var contentType *string408if options.ContentType != "" {409contentType = aws.String(options.ContentType)410}411412bucket = s3st.Config.Bucket413obj = s3st.objectName(name)414415s3c, ok := s3st.client.(*s3.Client)416if !ok {417err = xerrors.Errorf("Can only upload with actual S3 client")418}419420uploader := s3manager.NewUploader(s3c, func(u *s3manager.Uploader) {421u.Concurrency = defaultCopyConcurrency422u.PartSize = defaultPartSize * megabytes423u.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(25 * megabytes)424})425_, err = uploader.Upload(ctx, &s3.PutObjectInput{426Bucket: aws.String(bucket),427Key: aws.String(obj),428429// f implements io.ReadSeeker and hence is uploaded in parallel.430// cf. https://aws.github.io/aws-sdk-go-v2/docs/sdk-utilities/s3/#putobjectinput-body-field-ioreadseeker-vs-ioreader431Body: f,432433Metadata: options.Annotations,434ContentType: contentType,435})436if err != nil {437return438}439440return441}442443// UploadInstance implements DirectAccess444func (s3st *s3Storage) UploadInstance(ctx context.Context, source string, name string, opts ...UploadOption) (bucket string, obj string, err error) {445if s3st.InstanceID == "" {446return "", "", xerrors.Errorf("instanceID is required to comput object name")447}448return s3st.Upload(ctx, source, InstanceObjectName(s3st.InstanceID, name), opts...)449}450451452