Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
alist-org
GitHub Repository: alist-org/alist
Path: blob/main/internal/fs/archive.go
1560 views
1
package fs
2
3
import (
4
"context"
5
stderrors "errors"
6
"fmt"
7
"io"
8
"math/rand"
9
"mime"
10
"net/http"
11
"os"
12
stdpath "path"
13
"path/filepath"
14
"strconv"
15
"strings"
16
"time"
17
18
"github.com/alist-org/alist/v3/internal/conf"
19
"github.com/alist-org/alist/v3/internal/driver"
20
"github.com/alist-org/alist/v3/internal/errs"
21
"github.com/alist-org/alist/v3/internal/model"
22
"github.com/alist-org/alist/v3/internal/op"
23
"github.com/alist-org/alist/v3/internal/stream"
24
"github.com/alist-org/alist/v3/internal/task"
25
"github.com/pkg/errors"
26
log "github.com/sirupsen/logrus"
27
"github.com/xhofe/tache"
28
)
29
30
type ArchiveDownloadTask struct {
31
task.TaskExtension
32
model.ArchiveDecompressArgs
33
status string
34
SrcObjPath string
35
DstDirPath string
36
srcStorage driver.Driver
37
dstStorage driver.Driver
38
SrcStorageMp string
39
DstStorageMp string
40
}
41
42
func (t *ArchiveDownloadTask) GetName() string {
43
return fmt.Sprintf("decompress [%s](%s)[%s] to [%s](%s) with password <%s>", t.SrcStorageMp, t.SrcObjPath,
44
t.InnerPath, t.DstStorageMp, t.DstDirPath, t.Password)
45
}
46
47
func (t *ArchiveDownloadTask) GetStatus() string {
48
return t.status
49
}
50
51
func (t *ArchiveDownloadTask) Run() error {
52
t.ReinitCtx()
53
t.ClearEndTime()
54
t.SetStartTime(time.Now())
55
defer func() { t.SetEndTime(time.Now()) }()
56
uploadTask, err := t.RunWithoutPushUploadTask()
57
if err != nil {
58
return err
59
}
60
ArchiveContentUploadTaskManager.Add(uploadTask)
61
return nil
62
}
63
64
func (t *ArchiveDownloadTask) RunWithoutPushUploadTask() (*ArchiveContentUploadTask, error) {
65
var err error
66
if t.srcStorage == nil {
67
t.srcStorage, err = op.GetStorageByMountPath(t.SrcStorageMp)
68
}
69
srcObj, tool, ss, err := op.GetArchiveToolAndStream(t.Ctx(), t.srcStorage, t.SrcObjPath, model.LinkArgs{
70
Header: http.Header{},
71
})
72
if err != nil {
73
return nil, err
74
}
75
defer func() {
76
var e error
77
for _, s := range ss {
78
e = stderrors.Join(e, s.Close())
79
}
80
if e != nil {
81
log.Errorf("failed to close file streamer, %v", e)
82
}
83
}()
84
var decompressUp model.UpdateProgress
85
if t.CacheFull {
86
var total, cur int64 = 0, 0
87
for _, s := range ss {
88
total += s.GetSize()
89
}
90
t.SetTotalBytes(total)
91
t.status = "getting src object"
92
for _, s := range ss {
93
if s.GetFile() == nil {
94
_, err = stream.CacheFullInTempFileAndUpdateProgress(s, func(p float64) {
95
t.SetProgress((float64(cur) + float64(s.GetSize())*p/100.0) / float64(total))
96
})
97
}
98
cur += s.GetSize()
99
if err != nil {
100
return nil, err
101
}
102
}
103
t.SetProgress(100.0)
104
decompressUp = func(_ float64) {}
105
} else {
106
decompressUp = t.SetProgress
107
}
108
t.status = "walking and decompressing"
109
dir, err := os.MkdirTemp(conf.Conf.TempDir, "dir-*")
110
if err != nil {
111
return nil, err
112
}
113
err = tool.Decompress(ss, dir, t.ArchiveInnerArgs, decompressUp)
114
if err != nil {
115
return nil, err
116
}
117
baseName := strings.TrimSuffix(srcObj.GetName(), stdpath.Ext(srcObj.GetName()))
118
uploadTask := &ArchiveContentUploadTask{
119
TaskExtension: task.TaskExtension{
120
Creator: t.GetCreator(),
121
},
122
ObjName: baseName,
123
InPlace: !t.PutIntoNewDir,
124
FilePath: dir,
125
DstDirPath: t.DstDirPath,
126
dstStorage: t.dstStorage,
127
DstStorageMp: t.DstStorageMp,
128
}
129
return uploadTask, nil
130
}
131
132
var ArchiveDownloadTaskManager *tache.Manager[*ArchiveDownloadTask]
133
134
type ArchiveContentUploadTask struct {
135
task.TaskExtension
136
status string
137
ObjName string
138
InPlace bool
139
FilePath string
140
DstDirPath string
141
dstStorage driver.Driver
142
DstStorageMp string
143
finalized bool
144
}
145
146
func (t *ArchiveContentUploadTask) GetName() string {
147
return fmt.Sprintf("upload %s to [%s](%s)", t.ObjName, t.DstStorageMp, t.DstDirPath)
148
}
149
150
func (t *ArchiveContentUploadTask) GetStatus() string {
151
return t.status
152
}
153
154
func (t *ArchiveContentUploadTask) Run() error {
155
t.ReinitCtx()
156
t.ClearEndTime()
157
t.SetStartTime(time.Now())
158
defer func() { t.SetEndTime(time.Now()) }()
159
return t.RunWithNextTaskCallback(func(nextTsk *ArchiveContentUploadTask) error {
160
ArchiveContentUploadTaskManager.Add(nextTsk)
161
return nil
162
})
163
}
164
165
func (t *ArchiveContentUploadTask) RunWithNextTaskCallback(f func(nextTsk *ArchiveContentUploadTask) error) error {
166
var err error
167
if t.dstStorage == nil {
168
t.dstStorage, err = op.GetStorageByMountPath(t.DstStorageMp)
169
}
170
info, err := os.Stat(t.FilePath)
171
if err != nil {
172
return err
173
}
174
if info.IsDir() {
175
t.status = "src object is dir, listing objs"
176
nextDstPath := t.DstDirPath
177
if !t.InPlace {
178
nextDstPath = stdpath.Join(nextDstPath, t.ObjName)
179
err = op.MakeDir(t.Ctx(), t.dstStorage, nextDstPath)
180
if err != nil {
181
return err
182
}
183
}
184
entries, err := os.ReadDir(t.FilePath)
185
if err != nil {
186
return err
187
}
188
var es error
189
for _, entry := range entries {
190
var nextFilePath string
191
if entry.IsDir() {
192
nextFilePath, err = moveToTempPath(stdpath.Join(t.FilePath, entry.Name()), "dir-")
193
} else {
194
nextFilePath, err = moveToTempPath(stdpath.Join(t.FilePath, entry.Name()), "file-")
195
}
196
if err != nil {
197
es = stderrors.Join(es, err)
198
continue
199
}
200
err = f(&ArchiveContentUploadTask{
201
TaskExtension: task.TaskExtension{
202
Creator: t.GetCreator(),
203
},
204
ObjName: entry.Name(),
205
InPlace: false,
206
FilePath: nextFilePath,
207
DstDirPath: nextDstPath,
208
dstStorage: t.dstStorage,
209
DstStorageMp: t.DstStorageMp,
210
})
211
if err != nil {
212
es = stderrors.Join(es, err)
213
}
214
}
215
if es != nil {
216
return es
217
}
218
} else {
219
t.SetTotalBytes(info.Size())
220
file, err := os.Open(t.FilePath)
221
if err != nil {
222
return err
223
}
224
fs := &stream.FileStream{
225
Obj: &model.Object{
226
Name: t.ObjName,
227
Size: info.Size(),
228
Modified: time.Now(),
229
},
230
Mimetype: mime.TypeByExtension(filepath.Ext(t.ObjName)),
231
WebPutAsTask: true,
232
Reader: file,
233
}
234
fs.Closers.Add(file)
235
t.status = "uploading"
236
err = op.Put(t.Ctx(), t.dstStorage, t.DstDirPath, fs, t.SetProgress, true)
237
if err != nil {
238
return err
239
}
240
}
241
t.deleteSrcFile()
242
return nil
243
}
244
245
func (t *ArchiveContentUploadTask) Cancel() {
246
t.TaskExtension.Cancel()
247
if !conf.Conf.Tasks.AllowRetryCanceled {
248
t.deleteSrcFile()
249
}
250
}
251
252
func (t *ArchiveContentUploadTask) deleteSrcFile() {
253
if !t.finalized {
254
_ = os.RemoveAll(t.FilePath)
255
t.finalized = true
256
}
257
}
258
259
func moveToTempPath(path, prefix string) (string, error) {
260
newPath, err := genTempFileName(prefix)
261
if err != nil {
262
return "", err
263
}
264
err = os.Rename(path, newPath)
265
if err != nil {
266
return "", err
267
}
268
return newPath, nil
269
}
270
271
func genTempFileName(prefix string) (string, error) {
272
retry := 0
273
for retry < 10000 {
274
newPath := stdpath.Join(conf.Conf.TempDir, prefix+strconv.FormatUint(uint64(rand.Uint32()), 10))
275
if _, err := os.Stat(newPath); err != nil {
276
if os.IsNotExist(err) {
277
return newPath, nil
278
} else {
279
return "", err
280
}
281
}
282
retry++
283
}
284
return "", errors.New("failed to generate temp-file name: too many retries")
285
}
286
287
type archiveContentUploadTaskManagerType struct {
288
*tache.Manager[*ArchiveContentUploadTask]
289
}
290
291
func (m *archiveContentUploadTaskManagerType) Remove(id string) {
292
if t, ok := m.GetByID(id); ok {
293
t.deleteSrcFile()
294
m.Manager.Remove(id)
295
}
296
}
297
298
func (m *archiveContentUploadTaskManagerType) RemoveAll() {
299
tasks := m.GetAll()
300
for _, t := range tasks {
301
m.Remove(t.GetID())
302
}
303
}
304
305
func (m *archiveContentUploadTaskManagerType) RemoveByState(state ...tache.State) {
306
tasks := m.GetByState(state...)
307
for _, t := range tasks {
308
m.Remove(t.GetID())
309
}
310
}
311
312
func (m *archiveContentUploadTaskManagerType) RemoveByCondition(condition func(task *ArchiveContentUploadTask) bool) {
313
tasks := m.GetByCondition(condition)
314
for _, t := range tasks {
315
m.Remove(t.GetID())
316
}
317
}
318
319
var ArchiveContentUploadTaskManager = &archiveContentUploadTaskManagerType{
320
Manager: nil,
321
}
322
323
func archiveMeta(ctx context.Context, path string, args model.ArchiveMetaArgs) (*model.ArchiveMetaProvider, error) {
324
storage, actualPath, err := op.GetStorageAndActualPath(path)
325
if err != nil {
326
return nil, errors.WithMessage(err, "failed get storage")
327
}
328
return op.GetArchiveMeta(ctx, storage, actualPath, args)
329
}
330
331
func archiveList(ctx context.Context, path string, args model.ArchiveListArgs) ([]model.Obj, error) {
332
storage, actualPath, err := op.GetStorageAndActualPath(path)
333
if err != nil {
334
return nil, errors.WithMessage(err, "failed get storage")
335
}
336
return op.ListArchive(ctx, storage, actualPath, args)
337
}
338
339
func archiveDecompress(ctx context.Context, srcObjPath, dstDirPath string, args model.ArchiveDecompressArgs, lazyCache ...bool) (task.TaskExtensionInfo, error) {
340
srcStorage, srcObjActualPath, err := op.GetStorageAndActualPath(srcObjPath)
341
if err != nil {
342
return nil, errors.WithMessage(err, "failed get src storage")
343
}
344
dstStorage, dstDirActualPath, err := op.GetStorageAndActualPath(dstDirPath)
345
if err != nil {
346
return nil, errors.WithMessage(err, "failed get dst storage")
347
}
348
if srcStorage.GetStorage() == dstStorage.GetStorage() {
349
err = op.ArchiveDecompress(ctx, srcStorage, srcObjActualPath, dstDirActualPath, args, lazyCache...)
350
if !errors.Is(err, errs.NotImplement) {
351
return nil, err
352
}
353
}
354
taskCreator, _ := ctx.Value("user").(*model.User)
355
tsk := &ArchiveDownloadTask{
356
TaskExtension: task.TaskExtension{
357
Creator: taskCreator,
358
},
359
ArchiveDecompressArgs: args,
360
srcStorage: srcStorage,
361
dstStorage: dstStorage,
362
SrcObjPath: srcObjActualPath,
363
DstDirPath: dstDirActualPath,
364
SrcStorageMp: srcStorage.GetStorage().MountPath,
365
DstStorageMp: dstStorage.GetStorage().MountPath,
366
}
367
if ctx.Value(conf.NoTaskKey) != nil {
368
uploadTask, err := tsk.RunWithoutPushUploadTask()
369
if err != nil {
370
return nil, errors.WithMessagef(err, "failed download [%s]", srcObjPath)
371
}
372
defer uploadTask.deleteSrcFile()
373
var callback func(t *ArchiveContentUploadTask) error
374
callback = func(t *ArchiveContentUploadTask) error {
375
e := t.RunWithNextTaskCallback(callback)
376
t.deleteSrcFile()
377
return e
378
}
379
return nil, uploadTask.RunWithNextTaskCallback(callback)
380
} else {
381
ArchiveDownloadTaskManager.Add(tsk)
382
return tsk, nil
383
}
384
}
385
386
func archiveDriverExtract(ctx context.Context, path string, args model.ArchiveInnerArgs) (*model.Link, model.Obj, error) {
387
storage, actualPath, err := op.GetStorageAndActualPath(path)
388
if err != nil {
389
return nil, nil, errors.WithMessage(err, "failed get storage")
390
}
391
return op.DriverExtract(ctx, storage, actualPath, args)
392
}
393
394
func archiveInternalExtract(ctx context.Context, path string, args model.ArchiveInnerArgs) (io.ReadCloser, int64, error) {
395
storage, actualPath, err := op.GetStorageAndActualPath(path)
396
if err != nil {
397
return nil, 0, errors.WithMessage(err, "failed get storage")
398
}
399
return op.InternalExtract(ctx, storage, actualPath, args)
400
}
401
402