Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
alist-org
GitHub Repository: alist-org/alist
Path: blob/main/internal/stream/stream.go
1560 views
1
package stream
2
3
import (
4
"bytes"
5
"context"
6
"errors"
7
"fmt"
8
"io"
9
"math"
10
"os"
11
12
"github.com/alist-org/alist/v3/internal/errs"
13
"github.com/alist-org/alist/v3/internal/model"
14
"github.com/alist-org/alist/v3/pkg/http_range"
15
"github.com/alist-org/alist/v3/pkg/utils"
16
"github.com/sirupsen/logrus"
17
"go4.org/readerutil"
18
)
19
20
type FileStream struct {
21
Ctx context.Context
22
model.Obj
23
io.Reader
24
Mimetype string
25
WebPutAsTask bool
26
ForceStreamUpload bool
27
Exist model.Obj //the file existed in the destination, we can reuse some info since we wil overwrite it
28
utils.Closers
29
tmpFile *os.File //if present, tmpFile has full content, it will be deleted at last
30
peekBuff *bytes.Reader
31
}
32
33
func (f *FileStream) GetSize() int64 {
34
if f.tmpFile != nil {
35
info, err := f.tmpFile.Stat()
36
if err == nil {
37
return info.Size()
38
}
39
}
40
return f.Obj.GetSize()
41
}
42
43
func (f *FileStream) GetMimetype() string {
44
return f.Mimetype
45
}
46
47
func (f *FileStream) NeedStore() bool {
48
return f.WebPutAsTask
49
}
50
51
func (f *FileStream) IsForceStreamUpload() bool {
52
return f.ForceStreamUpload
53
}
54
55
func (f *FileStream) Close() error {
56
var err1, err2 error
57
58
err1 = f.Closers.Close()
59
if errors.Is(err1, os.ErrClosed) {
60
err1 = nil
61
}
62
if f.tmpFile != nil {
63
err2 = os.RemoveAll(f.tmpFile.Name())
64
if err2 != nil {
65
err2 = errs.NewErr(err2, "failed to remove tmpFile [%s]", f.tmpFile.Name())
66
} else {
67
f.tmpFile = nil
68
}
69
}
70
71
return errors.Join(err1, err2)
72
}
73
74
func (f *FileStream) GetExist() model.Obj {
75
return f.Exist
76
}
77
func (f *FileStream) SetExist(obj model.Obj) {
78
f.Exist = obj
79
}
80
81
// CacheFullInTempFile save all data into tmpFile. Not recommended since it wears disk,
82
// and can't start upload until the file is written. It's not thread-safe!
83
func (f *FileStream) CacheFullInTempFile() (model.File, error) {
84
if f.tmpFile != nil {
85
return f.tmpFile, nil
86
}
87
if file, ok := f.Reader.(model.File); ok {
88
return file, nil
89
}
90
tmpF, err := utils.CreateTempFile(f.Reader, f.GetSize())
91
if err != nil {
92
return nil, err
93
}
94
f.Add(tmpF)
95
f.tmpFile = tmpF
96
f.Reader = tmpF
97
return tmpF, nil
98
}
99
100
func (f *FileStream) GetFile() model.File {
101
if f.tmpFile != nil {
102
return f.tmpFile
103
}
104
if file, ok := f.Reader.(model.File); ok {
105
return file
106
}
107
return nil
108
}
109
110
const InMemoryBufMaxSize = 10 // Megabytes
111
const InMemoryBufMaxSizeBytes = InMemoryBufMaxSize * 1024 * 1024
112
113
// RangeRead have to cache all data first since only Reader is provided.
114
// also support a peeking RangeRead at very start, but won't buffer more than 10MB data in memory
115
func (f *FileStream) RangeRead(httpRange http_range.Range) (io.Reader, error) {
116
if httpRange.Length == -1 {
117
// 参考 internal/net/request.go
118
httpRange.Length = f.GetSize() - httpRange.Start
119
}
120
size := httpRange.Start + httpRange.Length
121
if f.peekBuff != nil && size <= int64(f.peekBuff.Len()) {
122
return io.NewSectionReader(f.peekBuff, httpRange.Start, httpRange.Length), nil
123
}
124
var cache io.ReaderAt = f.GetFile()
125
if cache == nil {
126
if size <= InMemoryBufMaxSizeBytes {
127
bufSize := min(size, f.GetSize())
128
// 使用bytes.Buffer作为io.CopyBuffer的写入对象,CopyBuffer会调用Buffer.ReadFrom
129
// 即使被写入的数据量与Buffer.Cap一致,Buffer也会扩大
130
buf := make([]byte, bufSize)
131
n, err := io.ReadFull(f.Reader, buf)
132
if err != nil {
133
return nil, err
134
}
135
if n != int(bufSize) {
136
return nil, fmt.Errorf("stream RangeRead did not get all data in peek, expect =%d ,actual =%d", bufSize, n)
137
}
138
f.peekBuff = bytes.NewReader(buf)
139
f.Reader = io.MultiReader(f.peekBuff, f.Reader)
140
cache = f.peekBuff
141
} else {
142
var err error
143
cache, err = f.CacheFullInTempFile()
144
if err != nil {
145
return nil, err
146
}
147
}
148
}
149
return io.NewSectionReader(cache, httpRange.Start, httpRange.Length), nil
150
}
151
152
var _ model.FileStreamer = (*SeekableStream)(nil)
153
var _ model.FileStreamer = (*FileStream)(nil)
154
155
//var _ seekableStream = (*FileStream)(nil)
156
157
// for most internal stream, which is either RangeReadCloser or MFile
158
// Any functionality implemented based on SeekableStream should implement a Close method,
159
// whose only purpose is to close the SeekableStream object. If such functionality has
160
// additional resources that need to be closed, they should be added to the Closer property of
161
// the SeekableStream object and be closed together when the SeekableStream object is closed.
162
type SeekableStream struct {
163
FileStream
164
Link *model.Link
165
// should have one of belows to support rangeRead
166
rangeReadCloser model.RangeReadCloserIF
167
mFile model.File
168
}
169
170
func NewSeekableStream(fs FileStream, link *model.Link) (*SeekableStream, error) {
171
if len(fs.Mimetype) == 0 {
172
fs.Mimetype = utils.GetMimeType(fs.Obj.GetName())
173
}
174
ss := &SeekableStream{FileStream: fs, Link: link}
175
if ss.Reader != nil {
176
result, ok := ss.Reader.(model.File)
177
if ok {
178
ss.mFile = result
179
ss.Closers.Add(result)
180
return ss, nil
181
}
182
}
183
if ss.Link != nil {
184
if ss.Link.MFile != nil {
185
mFile := ss.Link.MFile
186
if _, ok := mFile.(*os.File); !ok {
187
mFile = &RateLimitFile{
188
File: mFile,
189
Limiter: ServerDownloadLimit,
190
Ctx: fs.Ctx,
191
}
192
}
193
ss.mFile = mFile
194
ss.Reader = mFile
195
ss.Closers.Add(mFile)
196
return ss, nil
197
}
198
if ss.Link.RangeReadCloser != nil {
199
ss.rangeReadCloser = &RateLimitRangeReadCloser{
200
RangeReadCloserIF: ss.Link.RangeReadCloser,
201
Limiter: ServerDownloadLimit,
202
}
203
ss.Add(ss.rangeReadCloser)
204
return ss, nil
205
}
206
if len(ss.Link.URL) > 0 {
207
rrc, err := GetRangeReadCloserFromLink(ss.GetSize(), link)
208
if err != nil {
209
return nil, err
210
}
211
rrc = &RateLimitRangeReadCloser{
212
RangeReadCloserIF: rrc,
213
Limiter: ServerDownloadLimit,
214
}
215
ss.rangeReadCloser = rrc
216
ss.Add(rrc)
217
return ss, nil
218
}
219
}
220
if fs.Reader != nil {
221
return ss, nil
222
}
223
return nil, fmt.Errorf("illegal seekableStream")
224
}
225
226
//func (ss *SeekableStream) Peek(length int) {
227
//
228
//}
229
230
// RangeRead is not thread-safe, pls use it in single thread only.
231
func (ss *SeekableStream) RangeRead(httpRange http_range.Range) (io.Reader, error) {
232
if httpRange.Length == -1 {
233
httpRange.Length = ss.GetSize() - httpRange.Start
234
}
235
if ss.mFile != nil {
236
return io.NewSectionReader(ss.mFile, httpRange.Start, httpRange.Length), nil
237
}
238
if ss.tmpFile != nil {
239
return io.NewSectionReader(ss.tmpFile, httpRange.Start, httpRange.Length), nil
240
}
241
if ss.rangeReadCloser != nil {
242
rc, err := ss.rangeReadCloser.RangeRead(ss.Ctx, httpRange)
243
if err != nil {
244
return nil, err
245
}
246
return rc, nil
247
}
248
return ss.FileStream.RangeRead(httpRange)
249
}
250
251
//func (f *FileStream) GetReader() io.Reader {
252
// return f.Reader
253
//}
254
255
// only provide Reader as full stream when it's demanded. in rapid-upload, we can skip this to save memory
256
func (ss *SeekableStream) Read(p []byte) (n int, err error) {
257
//f.mu.Lock()
258
259
//f.peekedOnce = true
260
//defer f.mu.Unlock()
261
if ss.Reader == nil {
262
if ss.rangeReadCloser == nil {
263
return 0, fmt.Errorf("illegal seekableStream")
264
}
265
rc, err := ss.rangeReadCloser.RangeRead(ss.Ctx, http_range.Range{Length: -1})
266
if err != nil {
267
return 0, nil
268
}
269
ss.Reader = io.NopCloser(rc)
270
}
271
return ss.Reader.Read(p)
272
}
273
274
func (ss *SeekableStream) CacheFullInTempFile() (model.File, error) {
275
if ss.tmpFile != nil {
276
return ss.tmpFile, nil
277
}
278
if ss.mFile != nil {
279
return ss.mFile, nil
280
}
281
tmpF, err := utils.CreateTempFile(ss, ss.GetSize())
282
if err != nil {
283
return nil, err
284
}
285
ss.Add(tmpF)
286
ss.tmpFile = tmpF
287
ss.Reader = tmpF
288
return tmpF, nil
289
}
290
291
func (ss *SeekableStream) GetFile() model.File {
292
if ss.tmpFile != nil {
293
return ss.tmpFile
294
}
295
if ss.mFile != nil {
296
return ss.mFile
297
}
298
return nil
299
}
300
301
func (f *FileStream) SetTmpFile(r *os.File) {
302
f.Add(r)
303
f.tmpFile = r
304
f.Reader = r
305
}
306
307
type ReaderWithSize interface {
308
io.ReadCloser
309
GetSize() int64
310
}
311
312
type SimpleReaderWithSize struct {
313
io.Reader
314
Size int64
315
}
316
317
func (r *SimpleReaderWithSize) GetSize() int64 {
318
return r.Size
319
}
320
321
func (r *SimpleReaderWithSize) Close() error {
322
if c, ok := r.Reader.(io.Closer); ok {
323
return c.Close()
324
}
325
return nil
326
}
327
328
type ReaderUpdatingProgress struct {
329
Reader ReaderWithSize
330
model.UpdateProgress
331
offset int
332
}
333
334
func (r *ReaderUpdatingProgress) Read(p []byte) (n int, err error) {
335
n, err = r.Reader.Read(p)
336
r.offset += n
337
r.UpdateProgress(math.Min(100.0, float64(r.offset)/float64(r.Reader.GetSize())*100.0))
338
return n, err
339
}
340
341
func (r *ReaderUpdatingProgress) Close() error {
342
return r.Reader.Close()
343
}
344
345
type SStreamReadAtSeeker interface {
346
model.File
347
GetRawStream() *SeekableStream
348
}
349
350
type readerCur struct {
351
reader io.Reader
352
cur int64
353
}
354
355
type RangeReadReadAtSeeker struct {
356
ss *SeekableStream
357
masterOff int64
358
readers []*readerCur
359
headCache *headCache
360
}
361
362
type headCache struct {
363
*readerCur
364
bufs [][]byte
365
}
366
367
func (c *headCache) read(p []byte) (n int, err error) {
368
pL := len(p)
369
logrus.Debugf("headCache read_%d", pL)
370
if c.cur < int64(pL) {
371
bufL := int64(pL) - c.cur
372
buf := make([]byte, bufL)
373
lr := io.LimitReader(c.reader, bufL)
374
off := 0
375
for c.cur < int64(pL) {
376
n, err = lr.Read(buf[off:])
377
off += n
378
c.cur += int64(n)
379
if err == io.EOF && off == int(bufL) {
380
err = nil
381
}
382
if err != nil {
383
break
384
}
385
}
386
c.bufs = append(c.bufs, buf)
387
}
388
n = 0
389
if c.cur >= int64(pL) {
390
for i := 0; n < pL; i++ {
391
buf := c.bufs[i]
392
r := len(buf)
393
if n+r > pL {
394
r = pL - n
395
}
396
n += copy(p[n:], buf[:r])
397
}
398
}
399
return
400
}
401
func (r *headCache) Close() error {
402
for i := range r.bufs {
403
r.bufs[i] = nil
404
}
405
r.bufs = nil
406
return nil
407
}
408
409
func (r *RangeReadReadAtSeeker) InitHeadCache() {
410
if r.ss.Link.MFile == nil && r.masterOff == 0 {
411
reader := r.readers[0]
412
r.readers = r.readers[1:]
413
r.headCache = &headCache{readerCur: reader}
414
r.ss.Closers.Add(r.headCache)
415
}
416
}
417
418
func NewReadAtSeeker(ss *SeekableStream, offset int64, forceRange ...bool) (SStreamReadAtSeeker, error) {
419
if ss.mFile != nil {
420
_, err := ss.mFile.Seek(offset, io.SeekStart)
421
if err != nil {
422
return nil, err
423
}
424
return &FileReadAtSeeker{ss: ss}, nil
425
}
426
r := &RangeReadReadAtSeeker{
427
ss: ss,
428
masterOff: offset,
429
}
430
if offset != 0 || utils.IsBool(forceRange...) {
431
if offset < 0 || offset > ss.GetSize() {
432
return nil, errors.New("offset out of range")
433
}
434
_, err := r.getReaderAtOffset(offset)
435
if err != nil {
436
return nil, err
437
}
438
} else {
439
rc := &readerCur{reader: ss, cur: offset}
440
r.readers = append(r.readers, rc)
441
}
442
return r, nil
443
}
444
445
func NewMultiReaderAt(ss []*SeekableStream) (readerutil.SizeReaderAt, error) {
446
readers := make([]readerutil.SizeReaderAt, 0, len(ss))
447
for _, s := range ss {
448
ra, err := NewReadAtSeeker(s, 0)
449
if err != nil {
450
return nil, err
451
}
452
readers = append(readers, io.NewSectionReader(ra, 0, s.GetSize()))
453
}
454
return readerutil.NewMultiReaderAt(readers...), nil
455
}
456
457
func (r *RangeReadReadAtSeeker) GetRawStream() *SeekableStream {
458
return r.ss
459
}
460
461
func (r *RangeReadReadAtSeeker) getReaderAtOffset(off int64) (*readerCur, error) {
462
var rc *readerCur
463
for _, reader := range r.readers {
464
if reader.cur == -1 {
465
continue
466
}
467
if reader.cur == off {
468
// logrus.Debugf("getReaderAtOffset match_%d", off)
469
return reader, nil
470
}
471
if reader.cur > 0 && off >= reader.cur && (rc == nil || reader.cur < rc.cur) {
472
rc = reader
473
}
474
}
475
if rc != nil && off-rc.cur <= utils.MB {
476
n, err := utils.CopyWithBufferN(io.Discard, rc.reader, off-rc.cur)
477
rc.cur += n
478
if err == io.EOF && rc.cur == off {
479
err = nil
480
}
481
if err == nil {
482
logrus.Debugf("getReaderAtOffset old_%d", off)
483
return rc, nil
484
}
485
rc.cur = -1
486
}
487
logrus.Debugf("getReaderAtOffset new_%d", off)
488
489
// Range请求不能超过文件大小,有些云盘处理不了就会返回整个文件
490
reader, err := r.ss.RangeRead(http_range.Range{Start: off, Length: r.ss.GetSize() - off})
491
if err != nil {
492
return nil, err
493
}
494
rc = &readerCur{reader: reader, cur: off}
495
r.readers = append(r.readers, rc)
496
return rc, nil
497
}
498
499
func (r *RangeReadReadAtSeeker) ReadAt(p []byte, off int64) (int, error) {
500
if off == 0 && r.headCache != nil {
501
return r.headCache.read(p)
502
}
503
rc, err := r.getReaderAtOffset(off)
504
if err != nil {
505
return 0, err
506
}
507
n, num := 0, 0
508
for num < len(p) {
509
n, err = rc.reader.Read(p[num:])
510
rc.cur += int64(n)
511
num += n
512
if err == nil {
513
continue
514
}
515
if err == io.EOF {
516
// io.EOF是reader读取完了
517
rc.cur = -1
518
// yeka/zip包 没有处理EOF,我们要兼容
519
// https://github.com/yeka/zip/blob/03d6312748a9d6e0bc0c9a7275385c09f06d9c14/reader.go#L433
520
if num == len(p) {
521
err = nil
522
}
523
}
524
break
525
}
526
return num, err
527
}
528
529
func (r *RangeReadReadAtSeeker) Seek(offset int64, whence int) (int64, error) {
530
switch whence {
531
case io.SeekStart:
532
case io.SeekCurrent:
533
if offset == 0 {
534
return r.masterOff, nil
535
}
536
offset += r.masterOff
537
case io.SeekEnd:
538
offset += r.ss.GetSize()
539
default:
540
return 0, errs.NotSupport
541
}
542
if offset < 0 {
543
return r.masterOff, errors.New("invalid seek: negative position")
544
}
545
if offset > r.ss.GetSize() {
546
return r.masterOff, io.EOF
547
}
548
r.masterOff = offset
549
return offset, nil
550
}
551
552
func (r *RangeReadReadAtSeeker) Read(p []byte) (n int, err error) {
553
if r.masterOff == 0 && r.headCache != nil {
554
return r.headCache.read(p)
555
}
556
rc, err := r.getReaderAtOffset(r.masterOff)
557
if err != nil {
558
return 0, err
559
}
560
n, err = rc.reader.Read(p)
561
rc.cur += int64(n)
562
r.masterOff += int64(n)
563
return n, err
564
}
565
566
func (r *RangeReadReadAtSeeker) Close() error {
567
return r.ss.Close()
568
}
569
570
type FileReadAtSeeker struct {
571
ss *SeekableStream
572
}
573
574
func (f *FileReadAtSeeker) GetRawStream() *SeekableStream {
575
return f.ss
576
}
577
578
func (f *FileReadAtSeeker) Read(p []byte) (n int, err error) {
579
return f.ss.mFile.Read(p)
580
}
581
582
func (f *FileReadAtSeeker) ReadAt(p []byte, off int64) (n int, err error) {
583
return f.ss.mFile.ReadAt(p, off)
584
}
585
586
func (f *FileReadAtSeeker) Seek(offset int64, whence int) (int64, error) {
587
return f.ss.mFile.Seek(offset, whence)
588
}
589
590
func (f *FileReadAtSeeker) Close() error {
591
return f.ss.Close()
592
}
593
594