Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
alist-org
GitHub Repository: alist-org/alist
Path: blob/main/internal/stream/util.go
1560 views
1
package stream
2
3
import (
4
"context"
5
"encoding/hex"
6
"fmt"
7
"io"
8
"net/http"
9
10
"github.com/alist-org/alist/v3/internal/model"
11
"github.com/alist-org/alist/v3/internal/net"
12
"github.com/alist-org/alist/v3/pkg/http_range"
13
"github.com/alist-org/alist/v3/pkg/utils"
14
log "github.com/sirupsen/logrus"
15
)
16
17
func GetRangeReadCloserFromLink(size int64, link *model.Link) (model.RangeReadCloserIF, error) {
18
if len(link.URL) == 0 {
19
return nil, fmt.Errorf("can't create RangeReadCloser since URL is empty in link")
20
}
21
rangeReaderFunc := func(ctx context.Context, r http_range.Range) (io.ReadCloser, error) {
22
if link.Concurrency != 0 || link.PartSize != 0 {
23
header := net.ProcessHeader(nil, link.Header)
24
down := net.NewDownloader(func(d *net.Downloader) {
25
d.Concurrency = link.Concurrency
26
d.PartSize = link.PartSize
27
})
28
req := &net.HttpRequestParams{
29
URL: link.URL,
30
Range: r,
31
Size: size,
32
HeaderRef: header,
33
}
34
rc, err := down.Download(ctx, req)
35
return rc, err
36
37
}
38
response, err := RequestRangedHttp(ctx, link, r.Start, r.Length)
39
if err != nil {
40
if response == nil {
41
return nil, fmt.Errorf("http request failure, err:%s", err)
42
}
43
return nil, err
44
}
45
if r.Start == 0 && (r.Length == -1 || r.Length == size) || response.StatusCode == http.StatusPartialContent ||
46
checkContentRange(&response.Header, r.Start) {
47
return response.Body, nil
48
} else if response.StatusCode == http.StatusOK {
49
log.Warnf("remote http server not supporting range request, expect low perfromace!")
50
readCloser, err := net.GetRangedHttpReader(response.Body, r.Start, r.Length)
51
if err != nil {
52
return nil, err
53
}
54
return readCloser, nil
55
}
56
57
return response.Body, nil
58
}
59
resultRangeReadCloser := model.RangeReadCloser{RangeReader: rangeReaderFunc}
60
return &resultRangeReadCloser, nil
61
}
62
63
func RequestRangedHttp(ctx context.Context, link *model.Link, offset, length int64) (*http.Response, error) {
64
header := net.ProcessHeader(nil, link.Header)
65
header = http_range.ApplyRangeToHttpHeader(http_range.Range{Start: offset, Length: length}, header)
66
67
return net.RequestHttp(ctx, "GET", header, link.URL)
68
}
69
70
// 139 cloud does not properly return 206 http status code, add a hack here
71
func checkContentRange(header *http.Header, offset int64) bool {
72
start, _, err := http_range.ParseContentRange(header.Get("Content-Range"))
73
if err != nil {
74
log.Warnf("exception trying to parse Content-Range, will ignore,err=%s", err)
75
}
76
if start == offset {
77
return true
78
}
79
return false
80
}
81
82
type ReaderWithCtx struct {
83
io.Reader
84
Ctx context.Context
85
}
86
87
func (r *ReaderWithCtx) Read(p []byte) (n int, err error) {
88
if utils.IsCanceled(r.Ctx) {
89
return 0, r.Ctx.Err()
90
}
91
return r.Reader.Read(p)
92
}
93
94
func (r *ReaderWithCtx) Close() error {
95
if c, ok := r.Reader.(io.Closer); ok {
96
return c.Close()
97
}
98
return nil
99
}
100
101
func CacheFullInTempFileAndUpdateProgress(stream model.FileStreamer, up model.UpdateProgress) (model.File, error) {
102
if cache := stream.GetFile(); cache != nil {
103
up(100)
104
return cache, nil
105
}
106
tmpF, err := utils.CreateTempFile(&ReaderUpdatingProgress{
107
Reader: stream,
108
UpdateProgress: up,
109
}, stream.GetSize())
110
if err == nil {
111
stream.SetTmpFile(tmpF)
112
}
113
return tmpF, err
114
}
115
116
func CacheFullInTempFileAndWriter(stream model.FileStreamer, w io.Writer) (model.File, error) {
117
if cache := stream.GetFile(); cache != nil {
118
_, err := cache.Seek(0, io.SeekStart)
119
if err == nil {
120
_, err = utils.CopyWithBuffer(w, cache)
121
if err == nil {
122
_, err = cache.Seek(0, io.SeekStart)
123
}
124
}
125
return cache, err
126
}
127
tmpF, err := utils.CreateTempFile(io.TeeReader(stream, w), stream.GetSize())
128
if err == nil {
129
stream.SetTmpFile(tmpF)
130
}
131
return tmpF, err
132
}
133
134
func CacheFullInTempFileAndHash(stream model.FileStreamer, hashType *utils.HashType, params ...any) (model.File, string, error) {
135
h := hashType.NewFunc(params...)
136
tmpF, err := CacheFullInTempFileAndWriter(stream, h)
137
if err != nil {
138
return nil, "", err
139
}
140
return tmpF, hex.EncodeToString(h.Sum(nil)), err
141
}
142
143