Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
alist-org
GitHub Repository: alist-org/alist
Path: blob/main/internal/offline_download/tool/transfer.go
1562 views
1
package tool
2
3
import (
4
"context"
5
"fmt"
6
"github.com/alist-org/alist/v3/internal/driver"
7
"github.com/alist-org/alist/v3/internal/model"
8
"github.com/alist-org/alist/v3/internal/op"
9
"github.com/alist-org/alist/v3/internal/stream"
10
"github.com/alist-org/alist/v3/internal/task"
11
"github.com/alist-org/alist/v3/pkg/utils"
12
"github.com/pkg/errors"
13
log "github.com/sirupsen/logrus"
14
"github.com/xhofe/tache"
15
"net/http"
16
"os"
17
stdpath "path"
18
"path/filepath"
19
"time"
20
)
21
22
type TransferTask struct {
23
task.TaskExtension
24
Status string `json:"-"` //don't save status to save space
25
SrcObjPath string `json:"src_obj_path"`
26
DstDirPath string `json:"dst_dir_path"`
27
SrcStorage driver.Driver `json:"-"`
28
DstStorage driver.Driver `json:"-"`
29
SrcStorageMp string `json:"src_storage_mp"`
30
DstStorageMp string `json:"dst_storage_mp"`
31
DeletePolicy DeletePolicy `json:"delete_policy"`
32
}
33
34
func (t *TransferTask) Run() error {
35
t.ReinitCtx()
36
t.ClearEndTime()
37
t.SetStartTime(time.Now())
38
defer func() { t.SetEndTime(time.Now()) }()
39
if t.SrcStorage == nil {
40
return transferStdPath(t)
41
} else {
42
return transferObjPath(t)
43
}
44
}
45
46
func (t *TransferTask) GetName() string {
47
return fmt.Sprintf("transfer [%s](%s) to [%s](%s)", t.SrcStorageMp, t.SrcObjPath, t.DstStorageMp, t.DstDirPath)
48
}
49
50
func (t *TransferTask) GetStatus() string {
51
return t.Status
52
}
53
54
func (t *TransferTask) OnSucceeded() {
55
if t.DeletePolicy == DeleteOnUploadSucceed || t.DeletePolicy == DeleteAlways {
56
if t.SrcStorage == nil {
57
removeStdTemp(t)
58
} else {
59
removeObjTemp(t)
60
}
61
}
62
}
63
64
func (t *TransferTask) OnFailed() {
65
if t.DeletePolicy == DeleteOnUploadFailed || t.DeletePolicy == DeleteAlways {
66
if t.SrcStorage == nil {
67
removeStdTemp(t)
68
} else {
69
removeObjTemp(t)
70
}
71
}
72
}
73
74
var (
75
TransferTaskManager *tache.Manager[*TransferTask]
76
)
77
78
func transferStd(ctx context.Context, tempDir, dstDirPath string, deletePolicy DeletePolicy) error {
79
dstStorage, dstDirActualPath, err := op.GetStorageAndActualPath(dstDirPath)
80
if err != nil {
81
return errors.WithMessage(err, "failed get dst storage")
82
}
83
entries, err := os.ReadDir(tempDir)
84
if err != nil {
85
return err
86
}
87
taskCreator, _ := ctx.Value("user").(*model.User)
88
for _, entry := range entries {
89
t := &TransferTask{
90
TaskExtension: task.TaskExtension{
91
Creator: taskCreator,
92
},
93
SrcObjPath: stdpath.Join(tempDir, entry.Name()),
94
DstDirPath: dstDirActualPath,
95
DstStorage: dstStorage,
96
DstStorageMp: dstStorage.GetStorage().MountPath,
97
DeletePolicy: deletePolicy,
98
}
99
TransferTaskManager.Add(t)
100
}
101
return nil
102
}
103
104
func transferStdPath(t *TransferTask) error {
105
t.Status = "getting src object"
106
info, err := os.Stat(t.SrcObjPath)
107
if err != nil {
108
return err
109
}
110
if info.IsDir() {
111
t.Status = "src object is dir, listing objs"
112
entries, err := os.ReadDir(t.SrcObjPath)
113
if err != nil {
114
return err
115
}
116
for _, entry := range entries {
117
srcRawPath := stdpath.Join(t.SrcObjPath, entry.Name())
118
dstObjPath := stdpath.Join(t.DstDirPath, info.Name())
119
t := &TransferTask{
120
TaskExtension: task.TaskExtension{
121
Creator: t.Creator,
122
},
123
SrcObjPath: srcRawPath,
124
DstDirPath: dstObjPath,
125
DstStorage: t.DstStorage,
126
SrcStorageMp: t.SrcStorageMp,
127
DstStorageMp: t.DstStorageMp,
128
DeletePolicy: t.DeletePolicy,
129
}
130
TransferTaskManager.Add(t)
131
}
132
t.Status = "src object is dir, added all transfer tasks of files"
133
return nil
134
}
135
return transferStdFile(t)
136
}
137
138
func transferStdFile(t *TransferTask) error {
139
rc, err := os.Open(t.SrcObjPath)
140
if err != nil {
141
return errors.Wrapf(err, "failed to open file %s", t.SrcObjPath)
142
}
143
info, err := rc.Stat()
144
if err != nil {
145
return errors.Wrapf(err, "failed to get file %s", t.SrcObjPath)
146
}
147
mimetype := utils.GetMimeType(t.SrcObjPath)
148
s := &stream.FileStream{
149
Ctx: nil,
150
Obj: &model.Object{
151
Name: filepath.Base(t.SrcObjPath),
152
Size: info.Size(),
153
Modified: info.ModTime(),
154
IsFolder: false,
155
},
156
Reader: rc,
157
Mimetype: mimetype,
158
Closers: utils.NewClosers(rc),
159
}
160
t.SetTotalBytes(info.Size())
161
return op.Put(t.Ctx(), t.DstStorage, t.DstDirPath, s, t.SetProgress)
162
}
163
164
func removeStdTemp(t *TransferTask) {
165
info, err := os.Stat(t.SrcObjPath)
166
if err != nil || info.IsDir() {
167
return
168
}
169
if err := os.Remove(t.SrcObjPath); err != nil {
170
log.Errorf("failed to delete temp file %s, error: %s", t.SrcObjPath, err.Error())
171
}
172
}
173
174
func transferObj(ctx context.Context, tempDir, dstDirPath string, deletePolicy DeletePolicy) error {
175
srcStorage, srcObjActualPath, err := op.GetStorageAndActualPath(tempDir)
176
if err != nil {
177
return errors.WithMessage(err, "failed get src storage")
178
}
179
dstStorage, dstDirActualPath, err := op.GetStorageAndActualPath(dstDirPath)
180
if err != nil {
181
return errors.WithMessage(err, "failed get dst storage")
182
}
183
objs, err := op.List(ctx, srcStorage, srcObjActualPath, model.ListArgs{})
184
if err != nil {
185
return errors.WithMessagef(err, "failed list src [%s] objs", tempDir)
186
}
187
taskCreator, _ := ctx.Value("user").(*model.User) // taskCreator is nil when convert failed
188
for _, obj := range objs {
189
t := &TransferTask{
190
TaskExtension: task.TaskExtension{
191
Creator: taskCreator,
192
},
193
SrcObjPath: stdpath.Join(srcObjActualPath, obj.GetName()),
194
DstDirPath: dstDirActualPath,
195
SrcStorage: srcStorage,
196
DstStorage: dstStorage,
197
SrcStorageMp: srcStorage.GetStorage().MountPath,
198
DstStorageMp: dstStorage.GetStorage().MountPath,
199
DeletePolicy: deletePolicy,
200
}
201
TransferTaskManager.Add(t)
202
}
203
return nil
204
}
205
206
func transferObjPath(t *TransferTask) error {
207
t.Status = "getting src object"
208
srcObj, err := op.Get(t.Ctx(), t.SrcStorage, t.SrcObjPath)
209
if err != nil {
210
return errors.WithMessagef(err, "failed get src [%s] file", t.SrcObjPath)
211
}
212
if srcObj.IsDir() {
213
t.Status = "src object is dir, listing objs"
214
objs, err := op.List(t.Ctx(), t.SrcStorage, t.SrcObjPath, model.ListArgs{})
215
if err != nil {
216
return errors.WithMessagef(err, "failed list src [%s] objs", t.SrcObjPath)
217
}
218
for _, obj := range objs {
219
if utils.IsCanceled(t.Ctx()) {
220
return nil
221
}
222
srcObjPath := stdpath.Join(t.SrcObjPath, obj.GetName())
223
dstObjPath := stdpath.Join(t.DstDirPath, srcObj.GetName())
224
TransferTaskManager.Add(&TransferTask{
225
TaskExtension: task.TaskExtension{
226
Creator: t.Creator,
227
},
228
SrcObjPath: srcObjPath,
229
DstDirPath: dstObjPath,
230
SrcStorage: t.SrcStorage,
231
DstStorage: t.DstStorage,
232
SrcStorageMp: t.SrcStorageMp,
233
DstStorageMp: t.DstStorageMp,
234
DeletePolicy: t.DeletePolicy,
235
})
236
}
237
t.Status = "src object is dir, added all transfer tasks of objs"
238
return nil
239
}
240
return transferObjFile(t)
241
}
242
243
func transferObjFile(t *TransferTask) error {
244
srcFile, err := op.Get(t.Ctx(), t.SrcStorage, t.SrcObjPath)
245
if err != nil {
246
return errors.WithMessagef(err, "failed get src [%s] file", t.SrcObjPath)
247
}
248
link, _, err := op.Link(t.Ctx(), t.SrcStorage, t.SrcObjPath, model.LinkArgs{
249
Header: http.Header{},
250
})
251
if err != nil {
252
return errors.WithMessagef(err, "failed get [%s] link", t.SrcObjPath)
253
}
254
fs := stream.FileStream{
255
Obj: srcFile,
256
Ctx: t.Ctx(),
257
}
258
// any link provided is seekable
259
ss, err := stream.NewSeekableStream(fs, link)
260
if err != nil {
261
return errors.WithMessagef(err, "failed get [%s] stream", t.SrcObjPath)
262
}
263
t.SetTotalBytes(srcFile.GetSize())
264
return op.Put(t.Ctx(), t.DstStorage, t.DstDirPath, ss, t.SetProgress)
265
}
266
267
func removeObjTemp(t *TransferTask) {
268
srcObj, err := op.Get(t.Ctx(), t.SrcStorage, t.SrcObjPath)
269
if err != nil || srcObj.IsDir() {
270
return
271
}
272
if err := op.Remove(t.Ctx(), t.SrcStorage, t.SrcObjPath); err != nil {
273
log.Errorf("failed to delete temp obj %s, error: %s", t.SrcObjPath, err.Error())
274
}
275
}
276
277