package azure_blob
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"path"
"sort"
"strings"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
log "github.com/sirupsen/logrus"
)
const (
MaxRetries = 3
RetryDelay = 3 * time.Second
MaxBatchSize = 128
)
func extractAccountName(endpoint string) string {
endpoint = strings.TrimPrefix(endpoint, "https://")
endpoint = strings.TrimPrefix(endpoint, "http://")
parts := strings.Split(endpoint, ".")
if len(parts) > 0 {
return strings.ToLower(parts[0])
}
return ""
}
func isNotFoundError(err error) bool {
var storageErr *azcore.ResponseError
if errors.As(err, &storageErr) {
return storageErr.StatusCode == 404
}
return err != nil && strings.Contains(err.Error(), "BlobNotFound")
}
func (d *AzureBlob) flattenListBlobs(ctx context.Context, prefix string) ([]container.BlobItem, error) {
prefix = ensureTrailingSlash(prefix)
var blobItems []container.BlobItem
pager := d.containerClient.NewListBlobsFlatPager(&container.ListBlobsFlatOptions{
Prefix: &prefix,
Include: container.ListBlobsInclude{
Metadata: true,
},
})
for pager.More() {
page, err := pager.NextPage(ctx)
if err != nil {
return nil, fmt.Errorf("failed to list blobs: %w", err)
}
for _, blob := range page.Segment.BlobItems {
blobItems = append(blobItems, *blob)
}
}
return blobItems, nil
}
func (d *AzureBlob) batchDeleteBlobs(ctx context.Context, blobPaths []string) error {
if len(blobPaths) == 0 {
return nil
}
for i := 0; i < len(blobPaths); i += MaxBatchSize {
end := min(i+MaxBatchSize, len(blobPaths))
currentBatch := blobPaths[i:end]
batchBuilder, err := d.containerClient.NewBatchBuilder()
if err != nil {
return fmt.Errorf("failed to create batch builder: %w", err)
}
for _, blobPath := range currentBatch {
if err := batchBuilder.Delete(blobPath, nil); err != nil {
return fmt.Errorf("failed to add delete operation for %s: %w", blobPath, err)
}
}
responses, err := d.containerClient.SubmitBatch(ctx, batchBuilder, nil)
if err != nil {
return fmt.Errorf("batch delete request failed: %w", err)
}
for _, resp := range responses.Responses {
if resp.Error != nil && !isNotFoundError(resp.Error) {
blobName := "unknown"
if resp.BlobName != nil {
blobName = *resp.BlobName
}
return fmt.Errorf("failed to delete blob %s: %v", blobName, resp.Error)
}
}
}
return nil
}
func (d *AzureBlob) deleteFolder(ctx context.Context, prefix string) error {
prefix = ensureTrailingSlash(prefix)
globs, err := d.flattenListBlobs(ctx, prefix)
if err != nil {
return fmt.Errorf("failed to list blobs for deletion: %w", err)
}
if len(globs) > 0 {
var filePaths []string
var dirPaths []string
for _, blob := range globs {
blobName := *blob.Name
if isDirectory(blob) {
dirPaths = append(dirPaths, strings.TrimSuffix(blobName, "/"))
} else {
filePaths = append(filePaths, blobName)
}
}
if len(filePaths) > 0 {
if err := d.batchDeleteBlobs(ctx, filePaths); err != nil {
return err
}
}
if len(dirPaths) > 0 {
depthMap := make(map[int][]string)
for _, dir := range dirPaths {
depth := strings.Count(dir, "/")
depthMap[depth] = append(depthMap[depth], dir)
}
var depths []int
for depth := range depthMap {
depths = append(depths, depth)
}
sort.Sort(sort.Reverse(sort.IntSlice(depths)))
for _, depth := range depths {
batch := depthMap[depth]
if err := d.batchDeleteBlobs(ctx, batch); err != nil {
return err
}
}
}
}
return d.deleteEmptyDirectory(ctx, prefix)
}
func (d *AzureBlob) deleteFile(ctx context.Context, path string, isDir bool) error {
blobClient := d.containerClient.NewBlobClient(path)
_, err := blobClient.Delete(ctx, nil)
if err != nil && !(isDir && isNotFoundError(err)) {
return err
}
return nil
}
func (d *AzureBlob) copyFile(ctx context.Context, srcPath, dstPath string) error {
srcBlob := d.containerClient.NewBlobClient(srcPath)
dstBlob := d.containerClient.NewBlobClient(dstPath)
expireDuration := time.Hour * time.Duration(d.SignURLExpire)
srcURL, err := srcBlob.GetSASURL(sas.BlobPermissions{Read: true}, time.Now().Add(expireDuration), nil)
if err != nil {
return fmt.Errorf("failed to generate source SAS URL: %w", err)
}
_, err = dstBlob.StartCopyFromURL(ctx, srcURL, nil)
return err
}
func (d *AzureBlob) createContainerIfNotExists(ctx context.Context, containerName string) error {
serviceClient := d.client.ServiceClient()
containerClient := serviceClient.NewContainerClient(containerName)
var options = service.CreateContainerOptions{}
_, err := containerClient.Create(ctx, &options)
if err != nil {
var responseErr *azcore.ResponseError
if errors.As(err, &responseErr) && responseErr.ErrorCode != "ContainerAlreadyExists" {
return fmt.Errorf("failed to create or access container [%s]: %w", containerName, err)
}
}
d.containerClient = containerClient
return nil
}
func (d *AzureBlob) mkDir(ctx context.Context, fullDirName string) error {
dirPath := ensureTrailingSlash(fullDirName)
blobClient := d.containerClient.NewBlockBlobClient(dirPath)
_, err := blobClient.Upload(ctx, struct {
*bytes.Reader
io.Closer
}{
Reader: bytes.NewReader([]byte{}),
Closer: io.NopCloser(nil),
}, &blockblob.UploadOptions{
Metadata: map[string]*string{
"hdi_isfolder": to.Ptr("true"),
},
})
return err
}
func ensureTrailingSlash(path string) string {
if !strings.HasSuffix(path, "/") {
return path + "/"
}
return path
}
func (d *AzureBlob) moveOrRename(ctx context.Context, srcPath, dstPath string, isDir bool, srcSize int64) error {
if isDir {
srcPath = ensureTrailingSlash(srcPath)
dstPath = ensureTrailingSlash(dstPath)
blobs, err := d.flattenListBlobs(ctx, srcPath)
if err != nil {
return fmt.Errorf("failed to list blobs: %w", err)
}
for _, item := range blobs {
srcBlobName := *item.Name
relPath := strings.TrimPrefix(srcBlobName, srcPath)
itemDstPath := path.Join(dstPath, relPath)
if isDirectory(item) {
if err := d.mkDir(ctx, itemDstPath); err != nil {
return fmt.Errorf("failed to create directory marker [%s]: %w", itemDstPath, err)
}
} else {
if err := d.copyFile(ctx, srcBlobName, itemDstPath); err != nil {
return fmt.Errorf("failed to copy blob [%s]: %w", srcBlobName, err)
}
}
}
if len(blobs) == 0 {
if err := d.mkDir(ctx, dstPath); err != nil {
return fmt.Errorf("failed to create directory [%s]: %w", dstPath, err)
}
}
if err := d.deleteFolder(ctx, srcPath); err != nil {
log.Warnf("failed to delete source directory [%s]: %v\n, and try again", srcPath, err)
if err := d.deleteFolder(ctx, srcPath); err != nil {
log.Errorf("Retry deletion of source directory [%s] failed: %v", srcPath, err)
}
}
return nil
}
if err := d.copyFile(ctx, srcPath, dstPath); err != nil {
return fmt.Errorf("failed to copy file: %w", err)
}
if err := d.deleteFile(ctx, srcPath, false); err != nil {
log.Errorf("Error deleting source file [%s]: %v", srcPath, err)
}
return nil
}
func optimizedUploadOptions(fileSize int64) *azblob.UploadStreamOptions {
options := &azblob.UploadStreamOptions{
BlockSize: 4 * 1024 * 1024,
Concurrency: 4,
}
if fileSize > 256*1024*1024 {
options.BlockSize = 8 * 1024 * 1024
options.Concurrency = 8
}
if fileSize > 1024*1024*1024 {
options.BlockSize = 16 * 1024 * 1024
options.Concurrency = 16
}
return options
}
func isDirectory(blob container.BlobItem) bool {
if strings.HasSuffix(*blob.Name, "/") {
return true
}
if blob.Metadata != nil {
if val, ok := blob.Metadata["hdi_isfolder"]; ok && val != nil && *val == "true" {
return true
}
if val, ok := blob.Metadata["is_directory"]; ok && val != nil && strings.ToLower(*val) == "true" {
return true
}
}
if blob.Properties != nil && blob.Properties.ContentType != nil {
contentType := strings.ToLower(*blob.Properties.ContentType)
if blob.Properties.ContentLength != nil && *blob.Properties.ContentLength == 0 && (contentType == "application/directory" || contentType == "directory") {
return true
}
}
return false
}
func (d *AzureBlob) deleteEmptyDirectory(ctx context.Context, dirPath string) error {
blobClient := d.containerClient.NewBlobClient(strings.TrimSuffix(dirPath, "/"))
_, err := blobClient.Delete(ctx, nil)
if err != nil && isNotFoundError(err) {
blobClient = d.containerClient.NewBlobClient(dirPath)
_, err = blobClient.Delete(ctx, nil)
}
if err != nil && isNotFoundError(err) {
log.Infof("Directory [%s] not found during deletion: %v", dirPath, err)
return nil
}
return err
}