Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
alist-org
GitHub Repository: alist-org/alist
Path: blob/main/internal/net/request.go
1560 views
1
package net
2
3
import (
4
"bytes"
5
"context"
6
"fmt"
7
"io"
8
"net/http"
9
"strconv"
10
"strings"
11
"sync"
12
"time"
13
14
"github.com/alist-org/alist/v3/pkg/utils"
15
16
"github.com/alist-org/alist/v3/pkg/http_range"
17
"github.com/aws/aws-sdk-go/aws/awsutil"
18
log "github.com/sirupsen/logrus"
19
)
20
21
// DefaultDownloadPartSize is the default range of bytes to get at a time when
22
// using Download().
23
const DefaultDownloadPartSize = utils.MB * 10
24
25
// DefaultDownloadConcurrency is the default number of goroutines to spin up
26
// when using Download().
27
const DefaultDownloadConcurrency = 2
28
29
// DefaultPartBodyMaxRetries is the default number of retries to make when a part fails to download.
30
const DefaultPartBodyMaxRetries = 3
31
32
var DefaultConcurrencyLimit *ConcurrencyLimit
33
34
type Downloader struct {
35
PartSize int
36
37
// PartBodyMaxRetries is the number of retry attempts to make for failed part downloads.
38
PartBodyMaxRetries int
39
40
// The number of goroutines to spin up in parallel when sending parts.
41
// If this is set to zero, the DefaultDownloadConcurrency value will be used.
42
//
43
// Concurrency of 1 will download the parts sequentially.
44
Concurrency int
45
46
//RequestParam HttpRequestParams
47
HttpClient HttpRequestFunc
48
49
*ConcurrencyLimit
50
}
51
type HttpRequestFunc func(ctx context.Context, params *HttpRequestParams) (*http.Response, error)
52
53
func NewDownloader(options ...func(*Downloader)) *Downloader {
54
d := &Downloader{ //允许不设置的选项
55
PartBodyMaxRetries: DefaultPartBodyMaxRetries,
56
ConcurrencyLimit: DefaultConcurrencyLimit,
57
}
58
for _, option := range options {
59
option(d)
60
}
61
return d
62
}
63
64
// Download The Downloader makes multi-thread http requests to remote URL, each chunk(except last one) has PartSize,
65
// cache some data, then return Reader with assembled data
66
// Supports range, do not support unknown FileSize, and will fail if FileSize is incorrect
67
// memory usage is at about Concurrency*PartSize, use this wisely
68
func (d Downloader) Download(ctx context.Context, p *HttpRequestParams) (readCloser io.ReadCloser, err error) {
69
70
var finalP HttpRequestParams
71
awsutil.Copy(&finalP, p)
72
if finalP.Range.Length == -1 {
73
finalP.Range.Length = finalP.Size - finalP.Range.Start
74
}
75
impl := downloader{params: &finalP, cfg: d, ctx: ctx}
76
77
// Ensures we don't need nil checks later on
78
// 必需的选项
79
if impl.cfg.Concurrency == 0 {
80
impl.cfg.Concurrency = DefaultDownloadConcurrency
81
}
82
if impl.cfg.PartSize == 0 {
83
impl.cfg.PartSize = DefaultDownloadPartSize
84
}
85
if impl.cfg.HttpClient == nil {
86
impl.cfg.HttpClient = DefaultHttpRequestFunc
87
}
88
89
return impl.download()
90
}
91
92
// downloader is the implementation structure used internally by Downloader.
93
type downloader struct {
94
ctx context.Context
95
cancel context.CancelCauseFunc
96
cfg Downloader
97
98
params *HttpRequestParams //http request params
99
chunkChannel chan chunk //chunk chanel
100
101
//wg sync.WaitGroup
102
m sync.Mutex
103
104
nextChunk int //next chunk id
105
bufs []*Buf
106
written int64 //total bytes of file downloaded from remote
107
err error
108
109
concurrency int //剩余的并发数,递减。到0时停止并发
110
maxPart int //有多少个分片
111
pos int64
112
maxPos int64
113
m2 sync.Mutex
114
readingID int // 正在被读取的id
115
}
116
117
type ConcurrencyLimit struct {
118
_m sync.Mutex
119
Limit int // 需要大于0
120
}
121
122
var ErrExceedMaxConcurrency = fmt.Errorf("ExceedMaxConcurrency")
123
124
func (l *ConcurrencyLimit) sub() error {
125
l._m.Lock()
126
defer l._m.Unlock()
127
if l.Limit-1 < 0 {
128
return ErrExceedMaxConcurrency
129
}
130
l.Limit--
131
// log.Debugf("ConcurrencyLimit.sub: %d", l.Limit)
132
return nil
133
}
134
func (l *ConcurrencyLimit) add() {
135
l._m.Lock()
136
defer l._m.Unlock()
137
l.Limit++
138
// log.Debugf("ConcurrencyLimit.add: %d", l.Limit)
139
}
140
141
// 检测是否超过限制
142
func (d *downloader) concurrencyCheck() error {
143
if d.cfg.ConcurrencyLimit != nil {
144
return d.cfg.ConcurrencyLimit.sub()
145
}
146
return nil
147
}
148
func (d *downloader) concurrencyFinish() {
149
if d.cfg.ConcurrencyLimit != nil {
150
d.cfg.ConcurrencyLimit.add()
151
}
152
}
153
154
// download performs the implementation of the object download across ranged GETs.
155
func (d *downloader) download() (io.ReadCloser, error) {
156
if err := d.concurrencyCheck(); err != nil {
157
return nil, err
158
}
159
d.ctx, d.cancel = context.WithCancelCause(d.ctx)
160
161
maxPart := int(d.params.Range.Length / int64(d.cfg.PartSize))
162
if d.params.Range.Length%int64(d.cfg.PartSize) > 0 {
163
maxPart++
164
}
165
if maxPart < d.cfg.Concurrency {
166
d.cfg.Concurrency = maxPart
167
}
168
log.Debugf("cfgConcurrency:%d", d.cfg.Concurrency)
169
170
if d.cfg.Concurrency == 1 {
171
if d.cfg.ConcurrencyLimit != nil {
172
go func() {
173
<-d.ctx.Done()
174
d.concurrencyFinish()
175
}()
176
}
177
resp, err := d.cfg.HttpClient(d.ctx, d.params)
178
if err != nil {
179
return nil, err
180
}
181
return resp.Body, nil
182
}
183
184
// workers
185
d.chunkChannel = make(chan chunk, d.cfg.Concurrency)
186
187
d.maxPart = maxPart
188
d.pos = d.params.Range.Start
189
d.maxPos = d.params.Range.Start + d.params.Range.Length
190
d.concurrency = d.cfg.Concurrency
191
d.sendChunkTask(true)
192
193
var rc io.ReadCloser = NewMultiReadCloser(d.bufs[0], d.interrupt, d.finishBuf)
194
195
// Return error
196
return rc, d.err
197
}
198
199
func (d *downloader) sendChunkTask(newConcurrency bool) error {
200
d.m.Lock()
201
defer d.m.Unlock()
202
isNewBuf := d.concurrency > 0
203
if newConcurrency {
204
if d.concurrency <= 0 {
205
return nil
206
}
207
if d.nextChunk > 0 { // 第一个不检查,因为已经检查过了
208
if err := d.concurrencyCheck(); err != nil {
209
return err
210
}
211
}
212
d.concurrency--
213
go d.downloadPart()
214
}
215
216
var buf *Buf
217
if isNewBuf {
218
buf = NewBuf(d.ctx, d.cfg.PartSize)
219
d.bufs = append(d.bufs, buf)
220
} else {
221
buf = d.getBuf(d.nextChunk)
222
}
223
224
if d.pos < d.maxPos {
225
finalSize := int64(d.cfg.PartSize)
226
switch d.nextChunk {
227
case 0:
228
// 最小分片在前面有助视频播放?
229
firstSize := d.params.Range.Length % finalSize
230
if firstSize > 0 {
231
minSize := finalSize / 2
232
if firstSize < minSize { // 最小分片太小就调整到一半
233
finalSize = minSize
234
} else {
235
finalSize = firstSize
236
}
237
}
238
case 1:
239
firstSize := d.params.Range.Length % finalSize
240
minSize := finalSize / 2
241
if firstSize > 0 && firstSize < minSize {
242
finalSize += firstSize - minSize
243
}
244
}
245
buf.Reset(int(finalSize))
246
ch := chunk{
247
start: d.pos,
248
size: finalSize,
249
id: d.nextChunk,
250
buf: buf,
251
252
newConcurrency: newConcurrency,
253
}
254
d.pos += finalSize
255
d.nextChunk++
256
d.chunkChannel <- ch
257
return nil
258
}
259
return nil
260
}
261
262
// when the final reader Close, we interrupt
263
func (d *downloader) interrupt() error {
264
if d.written != d.params.Range.Length {
265
log.Debugf("Downloader interrupt before finish")
266
if d.getErr() == nil {
267
d.setErr(fmt.Errorf("interrupted"))
268
}
269
}
270
d.cancel(d.err)
271
defer func() {
272
close(d.chunkChannel)
273
for _, buf := range d.bufs {
274
buf.Close()
275
}
276
if d.concurrency > 0 {
277
d.concurrency = -d.concurrency
278
}
279
log.Debugf("maxConcurrency:%d", d.cfg.Concurrency+d.concurrency)
280
}()
281
return d.err
282
}
283
func (d *downloader) getBuf(id int) (b *Buf) {
284
return d.bufs[id%len(d.bufs)]
285
}
286
func (d *downloader) finishBuf(id int) (isLast bool, nextBuf *Buf) {
287
id++
288
if id >= d.maxPart {
289
return true, nil
290
}
291
292
d.sendChunkTask(false)
293
294
d.readingID = id
295
return false, d.getBuf(id)
296
}
297
298
// downloadPart is an individual goroutine worker reading from the ch channel
299
// and performing Http request on the data with a given byte range.
300
func (d *downloader) downloadPart() {
301
//defer d.wg.Done()
302
for {
303
c, ok := <-d.chunkChannel
304
if !ok {
305
break
306
}
307
if d.getErr() != nil {
308
// Drain the channel if there is an error, to prevent deadlocking
309
// of download producer.
310
break
311
}
312
if err := d.downloadChunk(&c); err != nil {
313
if err == errCancelConcurrency {
314
break
315
}
316
if err == context.Canceled {
317
if e := context.Cause(d.ctx); e != nil {
318
err = e
319
}
320
}
321
d.setErr(err)
322
d.cancel(err)
323
}
324
}
325
d.concurrencyFinish()
326
}
327
328
// downloadChunk downloads the chunk
329
func (d *downloader) downloadChunk(ch *chunk) error {
330
log.Debugf("start chunk_%d, %+v", ch.id, ch)
331
params := d.getParamsFromChunk(ch)
332
var n int64
333
var err error
334
for retry := 0; retry <= d.cfg.PartBodyMaxRetries; retry++ {
335
if d.getErr() != nil {
336
return nil
337
}
338
n, err = d.tryDownloadChunk(params, ch)
339
if err == nil {
340
d.incrWritten(n)
341
log.Debugf("chunk_%d downloaded", ch.id)
342
break
343
}
344
if d.getErr() != nil {
345
return nil
346
}
347
if utils.IsCanceled(d.ctx) {
348
return d.ctx.Err()
349
}
350
// Check if the returned error is an errNeedRetry.
351
// If this occurs we unwrap the err to set the underlying error
352
// and attempt any remaining retries.
353
if e, ok := err.(*errNeedRetry); ok {
354
err = e.Unwrap()
355
if n > 0 {
356
// 测试:下载时 断开 alist向云盘发起的下载连接
357
// 校验:下载完后校验文件哈希值 一致
358
d.incrWritten(n)
359
ch.start += n
360
ch.size -= n
361
params.Range.Start = ch.start
362
params.Range.Length = ch.size
363
}
364
log.Warnf("err chunk_%d, object part download error %s, retrying attempt %d. %v",
365
ch.id, params.URL, retry, err)
366
} else if err == errInfiniteRetry {
367
retry--
368
continue
369
} else {
370
break
371
}
372
}
373
374
return err
375
}
376
377
var errCancelConcurrency = fmt.Errorf("cancel concurrency")
378
var errInfiniteRetry = fmt.Errorf("infinite retry")
379
380
func (d *downloader) tryDownloadChunk(params *HttpRequestParams, ch *chunk) (int64, error) {
381
resp, err := d.cfg.HttpClient(d.ctx, params)
382
if err != nil {
383
if resp == nil {
384
return 0, err
385
}
386
if resp.StatusCode == http.StatusRequestedRangeNotSatisfiable {
387
return 0, err
388
}
389
if ch.id == 0 { //第1个任务 有限的重试,超过重试就会结束请求
390
switch resp.StatusCode {
391
default:
392
return 0, err
393
case http.StatusTooManyRequests:
394
case http.StatusBadGateway:
395
case http.StatusServiceUnavailable:
396
case http.StatusGatewayTimeout:
397
}
398
<-time.After(time.Millisecond * 200)
399
return 0, &errNeedRetry{err: fmt.Errorf("http request failure,status: %d", resp.StatusCode)}
400
}
401
402
// 来到这 说明第1个分片下载 连接成功了
403
// 后续分片下载出错都当超载处理
404
log.Debugf("err chunk_%d, try downloading:%v", ch.id, err)
405
406
d.m.Lock()
407
isCancelConcurrency := ch.newConcurrency
408
if d.concurrency > 0 { // 取消剩余的并发任务
409
// 用于计算实际的并发数
410
d.concurrency = -d.concurrency
411
isCancelConcurrency = true
412
}
413
if isCancelConcurrency {
414
d.concurrency--
415
d.chunkChannel <- *ch
416
d.m.Unlock()
417
return 0, errCancelConcurrency
418
}
419
d.m.Unlock()
420
if ch.id != d.readingID { //正在被读取的优先重试
421
d.m2.Lock()
422
defer d.m2.Unlock()
423
<-time.After(time.Millisecond * 200)
424
}
425
return 0, errInfiniteRetry
426
}
427
defer resp.Body.Close()
428
//only check file size on the first task
429
if ch.id == 0 {
430
err = d.checkTotalBytes(resp)
431
if err != nil {
432
return 0, err
433
}
434
}
435
d.sendChunkTask(true)
436
n, err := utils.CopyWithBuffer(ch.buf, resp.Body)
437
438
if err != nil {
439
return n, &errNeedRetry{err: err}
440
}
441
if n != ch.size {
442
err = fmt.Errorf("chunk download size incorrect, expected=%d, got=%d", ch.size, n)
443
return n, &errNeedRetry{err: err}
444
}
445
446
return n, nil
447
}
448
func (d *downloader) getParamsFromChunk(ch *chunk) *HttpRequestParams {
449
var params HttpRequestParams
450
awsutil.Copy(&params, d.params)
451
452
// Get the getBuf byte range of data
453
params.Range = http_range.Range{Start: ch.start, Length: ch.size}
454
return &params
455
}
456
457
func (d *downloader) checkTotalBytes(resp *http.Response) error {
458
var err error
459
totalBytes := int64(-1)
460
contentRange := resp.Header.Get("Content-Range")
461
if len(contentRange) == 0 {
462
// ContentRange is nil when the full file contents is provided, and
463
// is not chunked. Use ContentLength instead.
464
if resp.ContentLength > 0 {
465
totalBytes = resp.ContentLength
466
}
467
} else {
468
parts := strings.Split(contentRange, "/")
469
470
total := int64(-1)
471
472
// Checking for whether a numbered total exists
473
// If one does not exist, we will assume the total to be -1, undefined,
474
// and sequentially download each chunk until hitting a 416 error
475
totalStr := parts[len(parts)-1]
476
if totalStr != "*" {
477
total, err = strconv.ParseInt(totalStr, 10, 64)
478
if err != nil {
479
err = fmt.Errorf("failed extracting file size")
480
}
481
} else {
482
err = fmt.Errorf("file size unknown")
483
}
484
485
totalBytes = total
486
}
487
if totalBytes != d.params.Size && err == nil {
488
err = fmt.Errorf("expect file size=%d unmatch remote report size=%d, need refresh cache", d.params.Size, totalBytes)
489
}
490
if err != nil {
491
// _ = d.interrupt()
492
d.setErr(err)
493
d.cancel(err)
494
}
495
return err
496
497
}
498
499
func (d *downloader) incrWritten(n int64) {
500
d.m.Lock()
501
defer d.m.Unlock()
502
503
d.written += n
504
}
505
506
// getErr is a thread-safe getter for the error object
507
func (d *downloader) getErr() error {
508
d.m.Lock()
509
defer d.m.Unlock()
510
511
return d.err
512
}
513
514
// setErr is a thread-safe setter for the error object
515
func (d *downloader) setErr(e error) {
516
d.m.Lock()
517
defer d.m.Unlock()
518
519
d.err = e
520
}
521
522
// Chunk represents a single chunk of data to write by the worker routine.
523
// This structure also implements an io.SectionReader style interface for
524
// io.WriterAt, effectively making it an io.SectionWriter (which does not
525
// exist).
526
type chunk struct {
527
start int64
528
size int64
529
buf *Buf
530
id int
531
532
newConcurrency bool
533
}
534
535
func DefaultHttpRequestFunc(ctx context.Context, params *HttpRequestParams) (*http.Response, error) {
536
header := http_range.ApplyRangeToHttpHeader(params.Range, params.HeaderRef)
537
538
res, err := RequestHttp(ctx, "GET", header, params.URL)
539
if err != nil {
540
return res, err
541
}
542
return res, nil
543
}
544
545
type HttpRequestParams struct {
546
URL string
547
//only want data within this range
548
Range http_range.Range
549
HeaderRef http.Header
550
//total file size
551
Size int64
552
}
553
type errNeedRetry struct {
554
err error
555
}
556
557
func (e *errNeedRetry) Error() string {
558
return e.err.Error()
559
}
560
561
func (e *errNeedRetry) Unwrap() error {
562
return e.err
563
}
564
565
type MultiReadCloser struct {
566
cfg *cfg
567
closer closerFunc
568
finish finishBufFUnc
569
}
570
571
type cfg struct {
572
rPos int //current reader position, start from 0
573
curBuf *Buf
574
}
575
576
type closerFunc func() error
577
type finishBufFUnc func(id int) (isLast bool, buf *Buf)
578
579
// NewMultiReadCloser to save memory, we re-use limited Buf, and feed data to Read()
580
func NewMultiReadCloser(buf *Buf, c closerFunc, fb finishBufFUnc) *MultiReadCloser {
581
return &MultiReadCloser{closer: c, finish: fb, cfg: &cfg{curBuf: buf}}
582
}
583
584
func (mr MultiReadCloser) Read(p []byte) (n int, err error) {
585
if mr.cfg.curBuf == nil {
586
return 0, io.EOF
587
}
588
n, err = mr.cfg.curBuf.Read(p)
589
//log.Debugf("read_%d read current buffer, n=%d ,err=%+v", mr.cfg.rPos, n, err)
590
if err == io.EOF {
591
log.Debugf("read_%d finished current buffer", mr.cfg.rPos)
592
593
isLast, next := mr.finish(mr.cfg.rPos)
594
if isLast {
595
return n, io.EOF
596
}
597
mr.cfg.curBuf = next
598
mr.cfg.rPos++
599
return n, nil
600
}
601
if err == context.Canceled {
602
if e := context.Cause(mr.cfg.curBuf.ctx); e != nil {
603
err = e
604
}
605
}
606
return n, err
607
}
608
func (mr MultiReadCloser) Close() error {
609
return mr.closer()
610
}
611
612
type Buf struct {
613
buffer *bytes.Buffer
614
size int //expected size
615
ctx context.Context
616
off int
617
rw sync.Mutex
618
}
619
620
// NewBuf is a buffer that can have 1 read & 1 write at the same time.
621
// when read is faster write, immediately feed data to read after written
622
func NewBuf(ctx context.Context, maxSize int) *Buf {
623
return &Buf{
624
ctx: ctx,
625
buffer: bytes.NewBuffer(make([]byte, 0, maxSize)),
626
size: maxSize,
627
}
628
}
629
func (br *Buf) Reset(size int) {
630
br.buffer.Reset()
631
br.size = size
632
br.off = 0
633
}
634
635
func (br *Buf) Read(p []byte) (n int, err error) {
636
if err := br.ctx.Err(); err != nil {
637
return 0, err
638
}
639
if len(p) == 0 {
640
return 0, nil
641
}
642
if br.off >= br.size {
643
return 0, io.EOF
644
}
645
br.rw.Lock()
646
n, err = br.buffer.Read(p)
647
br.rw.Unlock()
648
if err == nil {
649
br.off += n
650
return n, err
651
}
652
if err != io.EOF {
653
return n, err
654
}
655
if n != 0 {
656
br.off += n
657
return n, nil
658
}
659
// n==0, err==io.EOF
660
// wait for new write for 200ms
661
select {
662
case <-br.ctx.Done():
663
return 0, br.ctx.Err()
664
case <-time.After(time.Millisecond * 200):
665
return 0, nil
666
}
667
}
668
669
func (br *Buf) Write(p []byte) (n int, err error) {
670
if err := br.ctx.Err(); err != nil {
671
return 0, err
672
}
673
br.rw.Lock()
674
defer br.rw.Unlock()
675
n, err = br.buffer.Write(p)
676
return
677
}
678
679
func (br *Buf) Close() {
680
br.buffer = nil
681
}
682
683