Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
alist-org
GitHub Repository: alist-org/alist
Path: blob/main/pkg/utils/io.go
1560 views
1
package utils
2
3
import (
4
"bytes"
5
"context"
6
"errors"
7
"fmt"
8
"io"
9
"sync"
10
"time"
11
12
"golang.org/x/exp/constraints"
13
14
log "github.com/sirupsen/logrus"
15
)
16
17
// here is some syntaxic sugar inspired by the Tomas Senart's video,
18
// it allows me to inline the Reader interface
19
type readerFunc func(p []byte) (n int, err error)
20
21
func (rf readerFunc) Read(p []byte) (n int, err error) { return rf(p) }
22
23
// CopyWithCtx slightly modified function signature:
24
// - context has been added in order to propagate cancellation
25
// - I do not return the number of bytes written, has it is not useful in my use case
26
func CopyWithCtx(ctx context.Context, out io.Writer, in io.Reader, size int64, progress func(percentage float64)) error {
27
// Copy will call the Reader and Writer interface multiple time, in order
28
// to copy by chunk (avoiding loading the whole file in memory).
29
// I insert the ability to cancel before read time as it is the earliest
30
// possible in the call process.
31
var finish int64 = 0
32
s := size / 100
33
_, err := CopyWithBuffer(out, readerFunc(func(p []byte) (int, error) {
34
// golang non-blocking channel: https://gobyexample.com/non-blocking-channel-operations
35
select {
36
// if context has been canceled
37
case <-ctx.Done():
38
// stop process and propagate "context canceled" error
39
return 0, ctx.Err()
40
default:
41
// otherwise just run default io.Reader implementation
42
n, err := in.Read(p)
43
if s > 0 && (err == nil || err == io.EOF) {
44
finish += int64(n)
45
progress(float64(finish) / float64(s))
46
}
47
return n, err
48
}
49
}))
50
return err
51
}
52
53
type limitWriter struct {
54
w io.Writer
55
limit int64
56
}
57
58
func (l *limitWriter) Write(p []byte) (n int, err error) {
59
lp := len(p)
60
if l.limit > 0 {
61
if int64(lp) > l.limit {
62
p = p[:l.limit]
63
}
64
l.limit -= int64(len(p))
65
_, err = l.w.Write(p)
66
}
67
return lp, err
68
}
69
70
func LimitWriter(w io.Writer, limit int64) io.Writer {
71
return &limitWriter{w: w, limit: limit}
72
}
73
74
type ReadCloser struct {
75
io.Reader
76
io.Closer
77
}
78
79
type CloseFunc func() error
80
81
func (c CloseFunc) Close() error {
82
return c()
83
}
84
85
func NewReadCloser(reader io.Reader, close CloseFunc) io.ReadCloser {
86
return ReadCloser{
87
Reader: reader,
88
Closer: close,
89
}
90
}
91
92
func NewLimitReadCloser(reader io.Reader, close CloseFunc, limit int64) io.ReadCloser {
93
return NewReadCloser(io.LimitReader(reader, limit), close)
94
}
95
96
type MultiReadable struct {
97
originReader io.Reader
98
reader io.Reader
99
cache *bytes.Buffer
100
}
101
102
func NewMultiReadable(reader io.Reader) *MultiReadable {
103
return &MultiReadable{
104
originReader: reader,
105
reader: reader,
106
}
107
}
108
109
func (mr *MultiReadable) Read(p []byte) (int, error) {
110
n, err := mr.reader.Read(p)
111
if _, ok := mr.reader.(io.Seeker); !ok && n > 0 {
112
if mr.cache == nil {
113
mr.cache = &bytes.Buffer{}
114
}
115
mr.cache.Write(p[:n])
116
}
117
return n, err
118
}
119
120
func (mr *MultiReadable) Reset() error {
121
if seeker, ok := mr.reader.(io.Seeker); ok {
122
_, err := seeker.Seek(0, io.SeekStart)
123
return err
124
}
125
if mr.cache != nil && mr.cache.Len() > 0 {
126
mr.reader = io.MultiReader(mr.cache, mr.reader)
127
mr.cache = nil
128
}
129
return nil
130
}
131
132
func (mr *MultiReadable) Close() error {
133
if closer, ok := mr.originReader.(io.Closer); ok {
134
return closer.Close()
135
}
136
return nil
137
}
138
139
func Retry(attempts int, sleep time.Duration, f func() error) (err error) {
140
for i := 0; i < attempts; i++ {
141
//fmt.Println("This is attempt number", i)
142
if i > 0 {
143
log.Println("retrying after error:", err)
144
time.Sleep(sleep)
145
sleep *= 2
146
}
147
err = f()
148
if err == nil {
149
return nil
150
}
151
}
152
return fmt.Errorf("after %d attempts, last error: %s", attempts, err)
153
}
154
155
type ClosersIF interface {
156
io.Closer
157
Add(closer io.Closer)
158
AddClosers(closers Closers)
159
GetClosers() Closers
160
}
161
162
type Closers struct {
163
closers []io.Closer
164
}
165
166
func (c *Closers) GetClosers() Closers {
167
return *c
168
}
169
170
var _ ClosersIF = (*Closers)(nil)
171
172
func (c *Closers) Close() error {
173
var errs []error
174
for _, closer := range c.closers {
175
if closer != nil {
176
errs = append(errs, closer.Close())
177
}
178
}
179
return errors.Join(errs...)
180
}
181
func (c *Closers) Add(closer io.Closer) {
182
c.closers = append(c.closers, closer)
183
184
}
185
func (c *Closers) AddClosers(closers Closers) {
186
c.closers = append(c.closers, closers.closers...)
187
}
188
189
func EmptyClosers() Closers {
190
return Closers{[]io.Closer{}}
191
}
192
func NewClosers(c ...io.Closer) Closers {
193
return Closers{c}
194
}
195
196
func Min[T constraints.Ordered](a, b T) T {
197
if a < b {
198
return a
199
}
200
return b
201
}
202
func Max[T constraints.Ordered](a, b T) T {
203
if a < b {
204
return b
205
}
206
return a
207
}
208
209
var IoBuffPool = &sync.Pool{
210
New: func() interface{} {
211
return make([]byte, 32*1024*2) // Two times of size in io package
212
},
213
}
214
215
func CopyWithBuffer(dst io.Writer, src io.Reader) (written int64, err error) {
216
buff := IoBuffPool.Get().([]byte)
217
defer IoBuffPool.Put(buff)
218
written, err = io.CopyBuffer(dst, src, buff)
219
if err != nil {
220
return
221
}
222
return written, nil
223
}
224
225
func CopyWithBufferN(dst io.Writer, src io.Reader, n int64) (written int64, err error) {
226
written, err = CopyWithBuffer(dst, io.LimitReader(src, n))
227
if written == n {
228
return n, nil
229
}
230
if written < n && err == nil {
231
// src stopped early; must have been EOF.
232
err = io.EOF
233
}
234
return
235
}
236
237