Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
alist-org
GitHub Repository: alist-org/alist
Path: blob/main/drivers/aliyundrive_open/upload.go
1987 views
1
package aliyundrive_open
2
3
import (
4
"context"
5
"encoding/base64"
6
"fmt"
7
"io"
8
"math"
9
"net/http"
10
"strconv"
11
"strings"
12
"time"
13
14
"github.com/alist-org/alist/v3/drivers/base"
15
"github.com/alist-org/alist/v3/internal/driver"
16
"github.com/alist-org/alist/v3/internal/model"
17
streamPkg "github.com/alist-org/alist/v3/internal/stream"
18
"github.com/alist-org/alist/v3/pkg/http_range"
19
"github.com/alist-org/alist/v3/pkg/utils"
20
"github.com/avast/retry-go"
21
"github.com/go-resty/resty/v2"
22
log "github.com/sirupsen/logrus"
23
)
24
25
func makePartInfos(size int) []base.Json {
26
partInfoList := make([]base.Json, size)
27
for i := 0; i < size; i++ {
28
partInfoList[i] = base.Json{"part_number": 1 + i}
29
}
30
return partInfoList
31
}
32
33
func calPartSize(fileSize int64) int64 {
34
var partSize int64 = 20 * utils.MB
35
if fileSize > partSize {
36
if fileSize > 1*utils.TB { // file Size over 1TB
37
partSize = 5 * utils.GB // file part size 5GB
38
} else if fileSize > 768*utils.GB { // over 768GB
39
partSize = 109951163 // ≈ 104.8576MB, split 1TB into 10,000 part
40
} else if fileSize > 512*utils.GB { // over 512GB
41
partSize = 82463373 // ≈ 78.6432MB
42
} else if fileSize > 384*utils.GB { // over 384GB
43
partSize = 54975582 // ≈ 52.4288MB
44
} else if fileSize > 256*utils.GB { // over 256GB
45
partSize = 41231687 // ≈ 39.3216MB
46
} else if fileSize > 128*utils.GB { // over 128GB
47
partSize = 27487791 // ≈ 26.2144MB
48
}
49
}
50
return partSize
51
}
52
53
func (d *AliyundriveOpen) getUploadUrl(count int, fileId, uploadId string) ([]PartInfo, error) {
54
partInfoList := makePartInfos(count)
55
var resp CreateResp
56
_, err := d.request("/adrive/v1.0/openFile/getUploadUrl", http.MethodPost, func(req *resty.Request) {
57
req.SetBody(base.Json{
58
"drive_id": d.DriveId,
59
"file_id": fileId,
60
"part_info_list": partInfoList,
61
"upload_id": uploadId,
62
}).SetResult(&resp)
63
})
64
return resp.PartInfoList, err
65
}
66
67
func (d *AliyundriveOpen) uploadPart(ctx context.Context, r io.Reader, partInfo PartInfo) error {
68
uploadUrl := partInfo.UploadUrl
69
if d.InternalUpload {
70
uploadUrl = strings.ReplaceAll(uploadUrl, "https://cn-beijing-data.aliyundrive.net/", "http://ccp-bj29-bj-1592982087.oss-cn-beijing-internal.aliyuncs.com/")
71
}
72
req, err := http.NewRequestWithContext(ctx, "PUT", uploadUrl, r)
73
if err != nil {
74
return err
75
}
76
res, err := base.HttpClient.Do(req)
77
if err != nil {
78
return err
79
}
80
_ = res.Body.Close()
81
if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusConflict {
82
return fmt.Errorf("upload status: %d", res.StatusCode)
83
}
84
return nil
85
}
86
87
func (d *AliyundriveOpen) completeUpload(fileId, uploadId string) (model.Obj, error) {
88
// 3. complete
89
var newFile File
90
_, err := d.request("/adrive/v1.0/openFile/complete", http.MethodPost, func(req *resty.Request) {
91
req.SetBody(base.Json{
92
"drive_id": d.DriveId,
93
"file_id": fileId,
94
"upload_id": uploadId,
95
}).SetResult(&newFile)
96
})
97
if err != nil {
98
return nil, err
99
}
100
return fileToObj(newFile), nil
101
}
102
103
type ProofRange struct {
104
Start int64
105
End int64
106
}
107
108
func getProofRange(input string, size int64) (*ProofRange, error) {
109
if size == 0 {
110
return &ProofRange{}, nil
111
}
112
tmpStr := utils.GetMD5EncodeStr(input)[0:16]
113
tmpInt, err := strconv.ParseUint(tmpStr, 16, 64)
114
if err != nil {
115
return nil, err
116
}
117
index := tmpInt % uint64(size)
118
pr := &ProofRange{
119
Start: int64(index),
120
End: int64(index) + 8,
121
}
122
if pr.End >= size {
123
pr.End = size
124
}
125
return pr, nil
126
}
127
128
func (d *AliyundriveOpen) calProofCode(stream model.FileStreamer) (string, error) {
129
proofRange, err := getProofRange(d.getAccessToken(), stream.GetSize())
130
if err != nil {
131
return "", err
132
}
133
length := proofRange.End - proofRange.Start
134
reader, err := stream.RangeRead(http_range.Range{Start: proofRange.Start, Length: length})
135
if err != nil {
136
return "", err
137
}
138
buf := make([]byte, length)
139
n, err := io.ReadFull(reader, buf)
140
if err == io.ErrUnexpectedEOF {
141
return "", fmt.Errorf("can't read data, expected=%d, got=%d", len(buf), n)
142
}
143
if err != nil {
144
return "", err
145
}
146
return base64.StdEncoding.EncodeToString(buf), nil
147
}
148
149
func (d *AliyundriveOpen) upload(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, up driver.UpdateProgress) (model.Obj, error) {
150
// 1. create
151
// Part Size Unit: Bytes, Default: 20MB,
152
// Maximum number of slices 10,000, ≈195.3125GB
153
var partSize = calPartSize(stream.GetSize())
154
const dateFormat = "2006-01-02T15:04:05.000Z"
155
mtimeStr := stream.ModTime().UTC().Format(dateFormat)
156
ctimeStr := stream.CreateTime().UTC().Format(dateFormat)
157
158
createData := base.Json{
159
"drive_id": d.DriveId,
160
"parent_file_id": dstDir.GetID(),
161
"name": stream.GetName(),
162
"type": "file",
163
"check_name_mode": "ignore",
164
"local_modified_at": mtimeStr,
165
"local_created_at": ctimeStr,
166
}
167
count := int(math.Ceil(float64(stream.GetSize()) / float64(partSize)))
168
createData["part_info_list"] = makePartInfos(count)
169
// rapid upload
170
rapidUpload := !stream.IsForceStreamUpload() && stream.GetSize() > 100*utils.KB && d.RapidUpload
171
if rapidUpload {
172
log.Debugf("[aliyundrive_open] start cal pre_hash")
173
// read 1024 bytes to calculate pre hash
174
reader, err := stream.RangeRead(http_range.Range{Start: 0, Length: 1024})
175
if err != nil {
176
return nil, err
177
}
178
hash, err := utils.HashReader(utils.SHA1, reader)
179
if err != nil {
180
return nil, err
181
}
182
createData["size"] = stream.GetSize()
183
createData["pre_hash"] = hash
184
}
185
var createResp CreateResp
186
_, err, e := d.requestReturnErrResp("/adrive/v1.0/openFile/create", http.MethodPost, func(req *resty.Request) {
187
req.SetBody(createData).SetResult(&createResp)
188
})
189
if err != nil {
190
if e.Code != "PreHashMatched" || !rapidUpload {
191
return nil, err
192
}
193
log.Debugf("[aliyundrive_open] pre_hash matched, start rapid upload")
194
195
hash := stream.GetHash().GetHash(utils.SHA1)
196
if len(hash) != utils.SHA1.Width {
197
_, hash, err = streamPkg.CacheFullInTempFileAndHash(stream, utils.SHA1)
198
if err != nil {
199
return nil, err
200
}
201
}
202
203
delete(createData, "pre_hash")
204
createData["proof_version"] = "v1"
205
createData["content_hash_name"] = "sha1"
206
createData["content_hash"] = hash
207
createData["proof_code"], err = d.calProofCode(stream)
208
if err != nil {
209
return nil, fmt.Errorf("cal proof code error: %s", err.Error())
210
}
211
_, err = d.request("/adrive/v1.0/openFile/create", http.MethodPost, func(req *resty.Request) {
212
req.SetBody(createData).SetResult(&createResp)
213
})
214
if err != nil {
215
return nil, err
216
}
217
}
218
219
if !createResp.RapidUpload {
220
// 2. normal upload
221
log.Debugf("[aliyundive_open] normal upload")
222
223
preTime := time.Now()
224
var offset, length int64 = 0, partSize
225
//var length
226
for i := 0; i < len(createResp.PartInfoList); i++ {
227
if utils.IsCanceled(ctx) {
228
return nil, ctx.Err()
229
}
230
// refresh upload url if 50 minutes passed
231
if time.Since(preTime) > 50*time.Minute {
232
createResp.PartInfoList, err = d.getUploadUrl(count, createResp.FileId, createResp.UploadId)
233
if err != nil {
234
return nil, err
235
}
236
preTime = time.Now()
237
}
238
if remain := stream.GetSize() - offset; length > remain {
239
length = remain
240
}
241
rd := utils.NewMultiReadable(io.LimitReader(stream, partSize))
242
if rapidUpload {
243
srd, err := stream.RangeRead(http_range.Range{Start: offset, Length: length})
244
if err != nil {
245
return nil, err
246
}
247
rd = utils.NewMultiReadable(srd)
248
}
249
err = retry.Do(func() error {
250
_ = rd.Reset()
251
rateLimitedRd := driver.NewLimitedUploadStream(ctx, rd)
252
return d.uploadPart(ctx, rateLimitedRd, createResp.PartInfoList[i])
253
},
254
retry.Attempts(3),
255
retry.DelayType(retry.BackOffDelay),
256
retry.Delay(time.Second))
257
if err != nil {
258
return nil, err
259
}
260
offset += partSize
261
up(float64(i*100) / float64(count))
262
}
263
} else {
264
log.Debugf("[aliyundrive_open] rapid upload success, file id: %s", createResp.FileId)
265
}
266
267
log.Debugf("[aliyundrive_open] create file success, resp: %+v", createResp)
268
// 3. complete
269
return d.completeUpload(createResp.FileId, createResp.UploadId)
270
}
271
272