Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
alist-org
GitHub Repository: alist-org/alist
Path: blob/main/drivers/123/upload.go
1987 views
1
package _123
2
3
import (
4
"context"
5
"fmt"
6
"io"
7
"net/http"
8
"strconv"
9
10
"github.com/alist-org/alist/v3/drivers/base"
11
"github.com/alist-org/alist/v3/internal/driver"
12
"github.com/alist-org/alist/v3/internal/model"
13
"github.com/alist-org/alist/v3/pkg/utils"
14
"github.com/go-resty/resty/v2"
15
)
16
17
func (d *Pan123) getS3PreSignedUrls(ctx context.Context, upReq *UploadResp, start, end int) (*S3PreSignedURLs, error) {
18
data := base.Json{
19
"bucket": upReq.Data.Bucket,
20
"key": upReq.Data.Key,
21
"partNumberEnd": end,
22
"partNumberStart": start,
23
"uploadId": upReq.Data.UploadId,
24
"StorageNode": upReq.Data.StorageNode,
25
}
26
var s3PreSignedUrls S3PreSignedURLs
27
_, err := d.Request(S3PreSignedUrls, http.MethodPost, func(req *resty.Request) {
28
req.SetBody(data).SetContext(ctx)
29
}, &s3PreSignedUrls)
30
if err != nil {
31
return nil, err
32
}
33
return &s3PreSignedUrls, nil
34
}
35
36
func (d *Pan123) getS3Auth(ctx context.Context, upReq *UploadResp, start, end int) (*S3PreSignedURLs, error) {
37
data := base.Json{
38
"StorageNode": upReq.Data.StorageNode,
39
"bucket": upReq.Data.Bucket,
40
"key": upReq.Data.Key,
41
"partNumberEnd": end,
42
"partNumberStart": start,
43
"uploadId": upReq.Data.UploadId,
44
}
45
var s3PreSignedUrls S3PreSignedURLs
46
_, err := d.Request(S3Auth, http.MethodPost, func(req *resty.Request) {
47
req.SetBody(data).SetContext(ctx)
48
}, &s3PreSignedUrls)
49
if err != nil {
50
return nil, err
51
}
52
return &s3PreSignedUrls, nil
53
}
54
55
func (d *Pan123) completeS3(ctx context.Context, upReq *UploadResp, file model.FileStreamer, isMultipart bool) error {
56
data := base.Json{
57
"StorageNode": upReq.Data.StorageNode,
58
"bucket": upReq.Data.Bucket,
59
"fileId": upReq.Data.FileId,
60
"fileSize": file.GetSize(),
61
"isMultipart": isMultipart,
62
"key": upReq.Data.Key,
63
"uploadId": upReq.Data.UploadId,
64
}
65
_, err := d.Request(UploadCompleteV2, http.MethodPost, func(req *resty.Request) {
66
req.SetBody(data).SetContext(ctx)
67
}, nil)
68
return err
69
}
70
71
func (d *Pan123) newUpload(ctx context.Context, upReq *UploadResp, file model.FileStreamer, up driver.UpdateProgress) error {
72
tmpF, err := file.CacheFullInTempFile()
73
if err != nil {
74
return err
75
}
76
// fetch s3 pre signed urls
77
size := file.GetSize()
78
chunkSize := min(size, 16*utils.MB)
79
chunkCount := int(size / chunkSize)
80
lastChunkSize := size % chunkSize
81
if lastChunkSize > 0 {
82
chunkCount++
83
} else {
84
lastChunkSize = chunkSize
85
}
86
// only 1 batch is allowed
87
batchSize := 1
88
getS3UploadUrl := d.getS3Auth
89
if chunkCount > 1 {
90
batchSize = 10
91
getS3UploadUrl = d.getS3PreSignedUrls
92
}
93
for i := 1; i <= chunkCount; i += batchSize {
94
if utils.IsCanceled(ctx) {
95
return ctx.Err()
96
}
97
start := i
98
end := min(i+batchSize, chunkCount+1)
99
s3PreSignedUrls, err := getS3UploadUrl(ctx, upReq, start, end)
100
if err != nil {
101
return err
102
}
103
// upload each chunk
104
for j := start; j < end; j++ {
105
if utils.IsCanceled(ctx) {
106
return ctx.Err()
107
}
108
curSize := chunkSize
109
if j == chunkCount {
110
curSize = lastChunkSize
111
}
112
err = d.uploadS3Chunk(ctx, upReq, s3PreSignedUrls, j, end, io.NewSectionReader(tmpF, chunkSize*int64(j-1), curSize), curSize, false, getS3UploadUrl)
113
if err != nil {
114
return err
115
}
116
up(float64(j) * 100 / float64(chunkCount))
117
}
118
}
119
// complete s3 upload
120
return d.completeS3(ctx, upReq, file, chunkCount > 1)
121
}
122
123
func (d *Pan123) uploadS3Chunk(ctx context.Context, upReq *UploadResp, s3PreSignedUrls *S3PreSignedURLs, cur, end int, reader *io.SectionReader, curSize int64, retry bool, getS3UploadUrl func(ctx context.Context, upReq *UploadResp, start int, end int) (*S3PreSignedURLs, error)) error {
124
uploadUrl := s3PreSignedUrls.Data.PreSignedUrls[strconv.Itoa(cur)]
125
if uploadUrl == "" {
126
return fmt.Errorf("upload url is empty, s3PreSignedUrls: %+v", s3PreSignedUrls)
127
}
128
req, err := http.NewRequest("PUT", uploadUrl, driver.NewLimitedUploadStream(ctx, reader))
129
if err != nil {
130
return err
131
}
132
req = req.WithContext(ctx)
133
req.ContentLength = curSize
134
//req.Header.Set("Content-Length", strconv.FormatInt(curSize, 10))
135
res, err := base.HttpClient.Do(req)
136
if err != nil {
137
return err
138
}
139
defer res.Body.Close()
140
if res.StatusCode == http.StatusForbidden {
141
if retry {
142
return fmt.Errorf("upload s3 chunk %d failed, status code: %d", cur, res.StatusCode)
143
}
144
// refresh s3 pre signed urls
145
newS3PreSignedUrls, err := getS3UploadUrl(ctx, upReq, cur, end)
146
if err != nil {
147
return err
148
}
149
s3PreSignedUrls.Data.PreSignedUrls = newS3PreSignedUrls.Data.PreSignedUrls
150
// retry
151
reader.Seek(0, io.SeekStart)
152
return d.uploadS3Chunk(ctx, upReq, s3PreSignedUrls, cur, end, reader, curSize, true, getS3UploadUrl)
153
}
154
if res.StatusCode != http.StatusOK {
155
body, err := io.ReadAll(res.Body)
156
if err != nil {
157
return err
158
}
159
return fmt.Errorf("upload s3 chunk %d failed, status code: %d, body: %s", cur, res.StatusCode, body)
160
}
161
return nil
162
}
163
164