Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
alist-org
GitHub Repository: alist-org/alist
Path: blob/main/drivers/123_open/upload.go
1987 views
1
package _123Open
2
3
import (
4
"bytes"
5
"context"
6
"crypto/md5"
7
"encoding/hex"
8
"encoding/json"
9
"fmt"
10
"github.com/alist-org/alist/v3/drivers/base"
11
"github.com/alist-org/alist/v3/internal/driver"
12
"github.com/alist-org/alist/v3/internal/model"
13
"github.com/alist-org/alist/v3/internal/stream"
14
"github.com/alist-org/alist/v3/pkg/http_range"
15
"github.com/alist-org/alist/v3/pkg/utils"
16
"github.com/go-resty/resty/v2"
17
"golang.org/x/sync/errgroup"
18
"io"
19
"mime/multipart"
20
"net/http"
21
"runtime"
22
"strconv"
23
"time"
24
)
25
26
func (d *Open123) create(parentFileID int64, filename, etag string, size int64, duplicate int, containDir bool) (*UploadCreateResp, error) {
27
var resp UploadCreateResp
28
29
_, err := d.Request(ApiCreateUploadURL, http.MethodPost, func(req *resty.Request) {
30
body := base.Json{
31
"parentFileID": parentFileID,
32
"filename": filename,
33
"etag": etag,
34
"size": size,
35
}
36
if duplicate > 0 {
37
body["duplicate"] = duplicate
38
}
39
if containDir {
40
body["containDir"] = true
41
}
42
req.SetBody(body)
43
}, &resp)
44
45
if err != nil {
46
return nil, err
47
}
48
return &resp, nil
49
}
50
51
func (d *Open123) GetUploadDomains() ([]string, error) {
52
var resp struct {
53
Code int `json:"code"`
54
Message string `json:"message"`
55
Data []string `json:"data"`
56
}
57
58
_, err := d.Request(ApiUploadDomainURL, http.MethodGet, nil, &resp)
59
if err != nil {
60
return nil, err
61
}
62
if resp.Code != 0 {
63
return nil, fmt.Errorf("get upload domain failed: %s", resp.Message)
64
}
65
return resp.Data, nil
66
}
67
68
func (d *Open123) UploadSingle(ctx context.Context, createResp *UploadCreateResp, file model.FileStreamer, parentID int64) error {
69
domain := createResp.Data.Servers[0]
70
71
etag := file.GetHash().GetHash(utils.MD5)
72
if len(etag) < utils.MD5.Width {
73
_, _, err := stream.CacheFullInTempFileAndHash(file, utils.MD5)
74
if err != nil {
75
return err
76
}
77
}
78
79
reader, err := file.RangeRead(http_range.Range{Start: 0, Length: file.GetSize()})
80
if err != nil {
81
return err
82
}
83
reader = driver.NewLimitedUploadStream(ctx, reader)
84
85
var b bytes.Buffer
86
mw := multipart.NewWriter(&b)
87
mw.WriteField("parentFileID", fmt.Sprint(parentID))
88
mw.WriteField("filename", file.GetName())
89
mw.WriteField("etag", etag)
90
mw.WriteField("size", fmt.Sprint(file.GetSize()))
91
fw, _ := mw.CreateFormFile("file", file.GetName())
92
_, err = io.Copy(fw, reader)
93
mw.Close()
94
95
req, err := http.NewRequestWithContext(ctx, "POST", domain+ApiSingleUploadURL, &b)
96
if err != nil {
97
return err
98
}
99
req.Header.Set("Authorization", "Bearer "+d.tm.accessToken)
100
req.Header.Set("Platform", "open_platform")
101
req.Header.Set("Content-Type", mw.FormDataContentType())
102
103
resp, err := http.DefaultClient.Do(req)
104
if err != nil {
105
return err
106
}
107
defer resp.Body.Close()
108
109
var result struct {
110
Code int `json:"code"`
111
Message string `json:"message"`
112
Data struct {
113
FileID int64 `json:"fileID"`
114
Completed bool `json:"completed"`
115
} `json:"data"`
116
}
117
body, _ := io.ReadAll(resp.Body)
118
if err := json.Unmarshal(body, &result); err != nil {
119
return fmt.Errorf("unmarshal response error: %v, body: %s", err, string(body))
120
}
121
if result.Code != 0 {
122
return fmt.Errorf("upload failed: %s", result.Message)
123
}
124
if !result.Data.Completed || result.Data.FileID == 0 {
125
return fmt.Errorf("upload incomplete or missing fileID")
126
}
127
return nil
128
}
129
130
func (d *Open123) Upload(ctx context.Context, file model.FileStreamer, parentID int64, createResp *UploadCreateResp, up driver.UpdateProgress) error {
131
if cacher, ok := file.(interface{ CacheFullInTempFile() (model.File, error) }); ok {
132
if _, err := cacher.CacheFullInTempFile(); err != nil {
133
return err
134
}
135
}
136
137
size := file.GetSize()
138
chunkSize := createResp.Data.SliceSize
139
uploadNums := (size + chunkSize - 1) / chunkSize
140
uploadDomain := createResp.Data.Servers[0]
141
142
if d.UploadThread <= 0 {
143
cpuCores := runtime.NumCPU()
144
threads := cpuCores * 2
145
if threads < 4 {
146
threads = 4
147
}
148
if threads > 16 {
149
threads = 16
150
}
151
d.UploadThread = threads
152
fmt.Printf("[Upload] Auto set upload concurrency: %d (CPU cores=%d)\n", d.UploadThread, cpuCores)
153
}
154
155
fmt.Printf("[Upload] File size: %d bytes, chunk size: %d bytes, total slices: %d, concurrency: %d\n",
156
size, chunkSize, uploadNums, d.UploadThread)
157
158
if size <= 1<<30 {
159
return d.UploadSingle(ctx, createResp, file, parentID)
160
}
161
162
if createResp.Data.Reuse {
163
up(100)
164
return nil
165
}
166
167
client := resty.New()
168
semaphore := make(chan struct{}, d.UploadThread)
169
threadG, _ := errgroup.WithContext(ctx)
170
171
var progressArr = make([]int64, uploadNums)
172
173
for partIndex := int64(0); partIndex < uploadNums; partIndex++ {
174
partIndex := partIndex
175
semaphore <- struct{}{}
176
177
threadG.Go(func() error {
178
defer func() { <-semaphore }()
179
offset := partIndex * chunkSize
180
length := min(chunkSize, size-offset)
181
partNumber := partIndex + 1
182
183
fmt.Printf("[Slice %d] Starting read from offset %d, length %d\n", partNumber, offset, length)
184
reader, err := file.RangeRead(http_range.Range{Start: offset, Length: length})
185
if err != nil {
186
return fmt.Errorf("[Slice %d] RangeRead error: %v", partNumber, err)
187
}
188
189
buf := make([]byte, length)
190
n, err := io.ReadFull(reader, buf)
191
if err != nil && err != io.EOF {
192
return fmt.Errorf("[Slice %d] Read error: %v", partNumber, err)
193
}
194
buf = buf[:n]
195
hash := md5.Sum(buf)
196
sliceMD5Str := hex.EncodeToString(hash[:])
197
198
body := &bytes.Buffer{}
199
writer := multipart.NewWriter(body)
200
writer.WriteField("preuploadID", createResp.Data.PreuploadID)
201
writer.WriteField("sliceNo", strconv.FormatInt(partNumber, 10))
202
writer.WriteField("sliceMD5", sliceMD5Str)
203
partName := fmt.Sprintf("%s.part%d", file.GetName(), partNumber)
204
fw, _ := writer.CreateFormFile("slice", partName)
205
fw.Write(buf)
206
writer.Close()
207
208
resp, err := client.R().
209
SetHeader("Authorization", "Bearer "+d.tm.accessToken).
210
SetHeader("Platform", "open_platform").
211
SetHeader("Content-Type", writer.FormDataContentType()).
212
SetBody(body.Bytes()).
213
Post(uploadDomain + ApiUploadSliceURL)
214
215
if err != nil {
216
return fmt.Errorf("[Slice %d] Upload HTTP error: %v", partNumber, err)
217
}
218
if resp.StatusCode() != 200 {
219
return fmt.Errorf("[Slice %d] Upload failed with status: %s, resp: %s", partNumber, resp.Status(), resp.String())
220
}
221
222
progressArr[partIndex] = length
223
var totalUploaded int64 = 0
224
for _, v := range progressArr {
225
totalUploaded += v
226
}
227
if up != nil {
228
percent := float64(totalUploaded) / float64(size) * 100
229
up(percent)
230
}
231
232
fmt.Printf("[Slice %d] MD5: %s\n", partNumber, sliceMD5Str)
233
fmt.Printf("[Slice %d] Upload finished\n", partNumber)
234
return nil
235
})
236
}
237
238
if err := threadG.Wait(); err != nil {
239
return err
240
}
241
242
var completeResp struct {
243
Code int `json:"code"`
244
Message string `json:"message"`
245
Data struct {
246
Completed bool `json:"completed"`
247
FileID int64 `json:"fileID"`
248
} `json:"data"`
249
}
250
251
for {
252
reqBody := fmt.Sprintf(`{"preuploadID":"%s"}`, createResp.Data.PreuploadID)
253
req, err := http.NewRequestWithContext(ctx, "POST", uploadDomain+ApiUploadCompleteURL, bytes.NewBufferString(reqBody))
254
if err != nil {
255
return err
256
}
257
req.Header.Set("Authorization", "Bearer "+d.tm.accessToken)
258
req.Header.Set("Platform", "open_platform")
259
req.Header.Set("Content-Type", "application/json")
260
261
resp, err := http.DefaultClient.Do(req)
262
if err != nil {
263
return err
264
}
265
body, _ := io.ReadAll(resp.Body)
266
resp.Body.Close()
267
268
if err := json.Unmarshal(body, &completeResp); err != nil {
269
return fmt.Errorf("completion response unmarshal error: %v, body: %s", err, string(body))
270
}
271
if completeResp.Code != 0 {
272
return fmt.Errorf("completion API returned error code %d: %s", completeResp.Code, completeResp.Message)
273
}
274
if completeResp.Data.Completed && completeResp.Data.FileID != 0 {
275
fmt.Printf("[Upload] Upload completed successfully. FileID: %d\n", completeResp.Data.FileID)
276
break
277
}
278
time.Sleep(time.Second)
279
}
280
up(100)
281
return nil
282
}
283
284