Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
alist-org
GitHub Repository: alist-org/alist
Path: blob/main/drivers/azure_blob/util.go
1987 views
1
package azure_blob
2
3
import (
4
"bytes"
5
"context"
6
"errors"
7
"fmt"
8
"io"
9
"path"
10
"sort"
11
"strings"
12
"time"
13
14
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
15
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
16
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
17
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
18
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
19
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
20
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
21
log "github.com/sirupsen/logrus"
22
)
23
24
const (
25
// MaxRetries defines the maximum number of retry attempts for Azure operations
26
MaxRetries = 3
27
// RetryDelay defines the base delay between retries
28
RetryDelay = 3 * time.Second
29
// MaxBatchSize defines the maximum number of operations in a single batch request
30
MaxBatchSize = 128
31
)
32
33
// extractAccountName 从 Azure 存储 Endpoint 中提取账户名
34
func extractAccountName(endpoint string) string {
35
// 移除协议前缀
36
endpoint = strings.TrimPrefix(endpoint, "https://")
37
endpoint = strings.TrimPrefix(endpoint, "http://")
38
39
// 获取第一个点之前的部分(即账户名)
40
parts := strings.Split(endpoint, ".")
41
if len(parts) > 0 {
42
// to lower case
43
return strings.ToLower(parts[0])
44
}
45
return ""
46
}
47
48
// isNotFoundError checks if the error is a "not found" type error
49
func isNotFoundError(err error) bool {
50
var storageErr *azcore.ResponseError
51
if errors.As(err, &storageErr) {
52
return storageErr.StatusCode == 404
53
}
54
// Fallback to string matching for backwards compatibility
55
return err != nil && strings.Contains(err.Error(), "BlobNotFound")
56
}
57
58
// flattenListBlobs - Optimize blob listing to handle pagination better
59
func (d *AzureBlob) flattenListBlobs(ctx context.Context, prefix string) ([]container.BlobItem, error) {
60
// Standardize prefix format
61
prefix = ensureTrailingSlash(prefix)
62
63
var blobItems []container.BlobItem
64
pager := d.containerClient.NewListBlobsFlatPager(&container.ListBlobsFlatOptions{
65
Prefix: &prefix,
66
Include: container.ListBlobsInclude{
67
Metadata: true,
68
},
69
})
70
71
for pager.More() {
72
page, err := pager.NextPage(ctx)
73
if err != nil {
74
return nil, fmt.Errorf("failed to list blobs: %w", err)
75
}
76
77
for _, blob := range page.Segment.BlobItems {
78
blobItems = append(blobItems, *blob)
79
}
80
}
81
82
return blobItems, nil
83
}
84
85
// batchDeleteBlobs - Simplify batch deletion logic
86
func (d *AzureBlob) batchDeleteBlobs(ctx context.Context, blobPaths []string) error {
87
if len(blobPaths) == 0 {
88
return nil
89
}
90
91
// Process in batches of MaxBatchSize
92
for i := 0; i < len(blobPaths); i += MaxBatchSize {
93
end := min(i+MaxBatchSize, len(blobPaths))
94
currentBatch := blobPaths[i:end]
95
96
// Create batch builder
97
batchBuilder, err := d.containerClient.NewBatchBuilder()
98
if err != nil {
99
return fmt.Errorf("failed to create batch builder: %w", err)
100
}
101
102
// Add delete operations
103
for _, blobPath := range currentBatch {
104
if err := batchBuilder.Delete(blobPath, nil); err != nil {
105
return fmt.Errorf("failed to add delete operation for %s: %w", blobPath, err)
106
}
107
}
108
109
// Submit batch
110
responses, err := d.containerClient.SubmitBatch(ctx, batchBuilder, nil)
111
if err != nil {
112
return fmt.Errorf("batch delete request failed: %w", err)
113
}
114
115
// Check responses
116
for _, resp := range responses.Responses {
117
if resp.Error != nil && !isNotFoundError(resp.Error) {
118
// 获取 blob 名称以提供更好的错误信息
119
blobName := "unknown"
120
if resp.BlobName != nil {
121
blobName = *resp.BlobName
122
}
123
return fmt.Errorf("failed to delete blob %s: %v", blobName, resp.Error)
124
}
125
}
126
}
127
128
return nil
129
}
130
131
// deleteFolder recursively deletes a directory and all its contents
132
func (d *AzureBlob) deleteFolder(ctx context.Context, prefix string) error {
133
// Ensure directory path ends with slash
134
prefix = ensureTrailingSlash(prefix)
135
136
// Get all blobs under the directory using flattenListBlobs
137
globs, err := d.flattenListBlobs(ctx, prefix)
138
if err != nil {
139
return fmt.Errorf("failed to list blobs for deletion: %w", err)
140
}
141
142
// If there are blobs in the directory, delete them
143
if len(globs) > 0 {
144
// 分离文件和目录标记
145
var filePaths []string
146
var dirPaths []string
147
148
for _, blob := range globs {
149
blobName := *blob.Name
150
if isDirectory(blob) {
151
// remove trailing slash for directory names
152
dirPaths = append(dirPaths, strings.TrimSuffix(blobName, "/"))
153
} else {
154
filePaths = append(filePaths, blobName)
155
}
156
}
157
158
// 先删除文件,再删除目录
159
if len(filePaths) > 0 {
160
if err := d.batchDeleteBlobs(ctx, filePaths); err != nil {
161
return err
162
}
163
}
164
if len(dirPaths) > 0 {
165
// 按路径深度分组
166
depthMap := make(map[int][]string)
167
for _, dir := range dirPaths {
168
depth := strings.Count(dir, "/") // 计算目录深度
169
depthMap[depth] = append(depthMap[depth], dir)
170
}
171
172
// 按深度从大到小排序
173
var depths []int
174
for depth := range depthMap {
175
depths = append(depths, depth)
176
}
177
sort.Sort(sort.Reverse(sort.IntSlice(depths)))
178
179
// 按深度逐层批量删除
180
for _, depth := range depths {
181
batch := depthMap[depth]
182
if err := d.batchDeleteBlobs(ctx, batch); err != nil {
183
return err
184
}
185
}
186
}
187
}
188
189
// 最后删除目录标记本身
190
return d.deleteEmptyDirectory(ctx, prefix)
191
}
192
193
// deleteFile deletes a single file or blob with better error handling
194
func (d *AzureBlob) deleteFile(ctx context.Context, path string, isDir bool) error {
195
blobClient := d.containerClient.NewBlobClient(path)
196
_, err := blobClient.Delete(ctx, nil)
197
if err != nil && !(isDir && isNotFoundError(err)) {
198
return err
199
}
200
return nil
201
}
202
203
// copyFile copies a single blob from source path to destination path
204
func (d *AzureBlob) copyFile(ctx context.Context, srcPath, dstPath string) error {
205
srcBlob := d.containerClient.NewBlobClient(srcPath)
206
dstBlob := d.containerClient.NewBlobClient(dstPath)
207
208
// Use configured expiration time for SAS URL
209
expireDuration := time.Hour * time.Duration(d.SignURLExpire)
210
srcURL, err := srcBlob.GetSASURL(sas.BlobPermissions{Read: true}, time.Now().Add(expireDuration), nil)
211
if err != nil {
212
return fmt.Errorf("failed to generate source SAS URL: %w", err)
213
}
214
215
_, err = dstBlob.StartCopyFromURL(ctx, srcURL, nil)
216
return err
217
218
}
219
220
// createContainerIfNotExists - Create container if not exists
221
// Clean up commented code
222
func (d *AzureBlob) createContainerIfNotExists(ctx context.Context, containerName string) error {
223
serviceClient := d.client.ServiceClient()
224
containerClient := serviceClient.NewContainerClient(containerName)
225
226
var options = service.CreateContainerOptions{}
227
_, err := containerClient.Create(ctx, &options)
228
if err != nil {
229
var responseErr *azcore.ResponseError
230
if errors.As(err, &responseErr) && responseErr.ErrorCode != "ContainerAlreadyExists" {
231
return fmt.Errorf("failed to create or access container [%s]: %w", containerName, err)
232
}
233
}
234
235
d.containerClient = containerClient
236
return nil
237
}
238
239
// mkDir creates a virtual directory marker by uploading an empty blob with metadata.
240
func (d *AzureBlob) mkDir(ctx context.Context, fullDirName string) error {
241
dirPath := ensureTrailingSlash(fullDirName)
242
blobClient := d.containerClient.NewBlockBlobClient(dirPath)
243
244
// Upload an empty blob with metadata indicating it's a directory
245
_, err := blobClient.Upload(ctx, struct {
246
*bytes.Reader
247
io.Closer
248
}{
249
Reader: bytes.NewReader([]byte{}),
250
Closer: io.NopCloser(nil),
251
}, &blockblob.UploadOptions{
252
Metadata: map[string]*string{
253
"hdi_isfolder": to.Ptr("true"),
254
},
255
})
256
return err
257
}
258
259
// ensureTrailingSlash ensures the provided path ends with a trailing slash.
260
func ensureTrailingSlash(path string) string {
261
if !strings.HasSuffix(path, "/") {
262
return path + "/"
263
}
264
return path
265
}
266
267
// moveOrRename moves or renames blobs or directories from source to destination.
268
func (d *AzureBlob) moveOrRename(ctx context.Context, srcPath, dstPath string, isDir bool, srcSize int64) error {
269
if isDir {
270
// Normalize paths for directory operations
271
srcPath = ensureTrailingSlash(srcPath)
272
dstPath = ensureTrailingSlash(dstPath)
273
274
// List all blobs under the source directory
275
blobs, err := d.flattenListBlobs(ctx, srcPath)
276
if err != nil {
277
return fmt.Errorf("failed to list blobs: %w", err)
278
}
279
280
// Iterate and copy each blob to the destination
281
for _, item := range blobs {
282
srcBlobName := *item.Name
283
relPath := strings.TrimPrefix(srcBlobName, srcPath)
284
itemDstPath := path.Join(dstPath, relPath)
285
286
if isDirectory(item) {
287
// Create directory marker at destination
288
if err := d.mkDir(ctx, itemDstPath); err != nil {
289
return fmt.Errorf("failed to create directory marker [%s]: %w", itemDstPath, err)
290
}
291
} else {
292
// Copy file blob to destination
293
if err := d.copyFile(ctx, srcBlobName, itemDstPath); err != nil {
294
return fmt.Errorf("failed to copy blob [%s]: %w", srcBlobName, err)
295
}
296
}
297
}
298
299
// Handle empty directories by creating a marker at destination
300
if len(blobs) == 0 {
301
if err := d.mkDir(ctx, dstPath); err != nil {
302
return fmt.Errorf("failed to create directory [%s]: %w", dstPath, err)
303
}
304
}
305
306
// Delete source directory and its contents
307
if err := d.deleteFolder(ctx, srcPath); err != nil {
308
log.Warnf("failed to delete source directory [%s]: %v\n, and try again", srcPath, err)
309
// Retry deletion once more and ignore the result
310
if err := d.deleteFolder(ctx, srcPath); err != nil {
311
log.Errorf("Retry deletion of source directory [%s] failed: %v", srcPath, err)
312
}
313
}
314
315
return nil
316
}
317
318
// Single file move or rename operation
319
if err := d.copyFile(ctx, srcPath, dstPath); err != nil {
320
return fmt.Errorf("failed to copy file: %w", err)
321
}
322
323
// Delete source file after successful copy
324
if err := d.deleteFile(ctx, srcPath, false); err != nil {
325
log.Errorf("Error deleting source file [%s]: %v", srcPath, err)
326
}
327
return nil
328
}
329
330
// optimizedUploadOptions returns the optimal upload options based on file size
331
func optimizedUploadOptions(fileSize int64) *azblob.UploadStreamOptions {
332
options := &azblob.UploadStreamOptions{
333
BlockSize: 4 * 1024 * 1024, // 4MB block size
334
Concurrency: 4, // Default concurrency
335
}
336
337
// For large files, increase block size and concurrency
338
if fileSize > 256*1024*1024 { // For files larger than 256MB
339
options.BlockSize = 8 * 1024 * 1024 // 8MB blocks
340
options.Concurrency = 8 // More concurrent uploads
341
}
342
343
// For very large files (>1GB)
344
if fileSize > 1024*1024*1024 {
345
options.BlockSize = 16 * 1024 * 1024 // 16MB blocks
346
options.Concurrency = 16 // Higher concurrency
347
}
348
349
return options
350
}
351
352
// isDirectory determines if a blob represents a directory
353
// Checks multiple indicators: path suffix, metadata, and content type
354
func isDirectory(blob container.BlobItem) bool {
355
// Check path suffix
356
if strings.HasSuffix(*blob.Name, "/") {
357
return true
358
}
359
360
// Check metadata for directory marker
361
if blob.Metadata != nil {
362
if val, ok := blob.Metadata["hdi_isfolder"]; ok && val != nil && *val == "true" {
363
return true
364
}
365
// Azure Storage Explorer and other tools may use different metadata keys
366
if val, ok := blob.Metadata["is_directory"]; ok && val != nil && strings.ToLower(*val) == "true" {
367
return true
368
}
369
}
370
371
// Check content type (some tools mark directories with specific content types)
372
if blob.Properties != nil && blob.Properties.ContentType != nil {
373
contentType := strings.ToLower(*blob.Properties.ContentType)
374
if blob.Properties.ContentLength != nil && *blob.Properties.ContentLength == 0 && (contentType == "application/directory" || contentType == "directory") {
375
return true
376
}
377
}
378
379
return false
380
}
381
382
// deleteEmptyDirectory deletes a directory only if it's empty
383
func (d *AzureBlob) deleteEmptyDirectory(ctx context.Context, dirPath string) error {
384
// Directory is empty, delete the directory marker
385
blobClient := d.containerClient.NewBlobClient(strings.TrimSuffix(dirPath, "/"))
386
_, err := blobClient.Delete(ctx, nil)
387
388
// Also try deleting with trailing slash (for different directory marker formats)
389
if err != nil && isNotFoundError(err) {
390
blobClient = d.containerClient.NewBlobClient(dirPath)
391
_, err = blobClient.Delete(ctx, nil)
392
}
393
394
// Ignore not found errors
395
if err != nil && isNotFoundError(err) {
396
log.Infof("Directory [%s] not found during deletion: %v", dirPath, err)
397
return nil
398
}
399
400
return err
401
}
402
403