Path: blob/main/internal/offline_download/tool/transfer.go
1562 views
package tool12import (3"context"4"fmt"5"github.com/alist-org/alist/v3/internal/driver"6"github.com/alist-org/alist/v3/internal/model"7"github.com/alist-org/alist/v3/internal/op"8"github.com/alist-org/alist/v3/internal/stream"9"github.com/alist-org/alist/v3/internal/task"10"github.com/alist-org/alist/v3/pkg/utils"11"github.com/pkg/errors"12log "github.com/sirupsen/logrus"13"github.com/xhofe/tache"14"net/http"15"os"16stdpath "path"17"path/filepath"18"time"19)2021type TransferTask struct {22task.TaskExtension23Status string `json:"-"` //don't save status to save space24SrcObjPath string `json:"src_obj_path"`25DstDirPath string `json:"dst_dir_path"`26SrcStorage driver.Driver `json:"-"`27DstStorage driver.Driver `json:"-"`28SrcStorageMp string `json:"src_storage_mp"`29DstStorageMp string `json:"dst_storage_mp"`30DeletePolicy DeletePolicy `json:"delete_policy"`31}3233func (t *TransferTask) Run() error {34t.ReinitCtx()35t.ClearEndTime()36t.SetStartTime(time.Now())37defer func() { t.SetEndTime(time.Now()) }()38if t.SrcStorage == nil {39return transferStdPath(t)40} else {41return transferObjPath(t)42}43}4445func (t *TransferTask) GetName() string {46return fmt.Sprintf("transfer [%s](%s) to [%s](%s)", t.SrcStorageMp, t.SrcObjPath, t.DstStorageMp, t.DstDirPath)47}4849func (t *TransferTask) GetStatus() string {50return t.Status51}5253func (t *TransferTask) OnSucceeded() {54if t.DeletePolicy == DeleteOnUploadSucceed || t.DeletePolicy == DeleteAlways {55if t.SrcStorage == nil {56removeStdTemp(t)57} else {58removeObjTemp(t)59}60}61}6263func (t *TransferTask) OnFailed() {64if t.DeletePolicy == DeleteOnUploadFailed || t.DeletePolicy == DeleteAlways {65if t.SrcStorage == nil {66removeStdTemp(t)67} else {68removeObjTemp(t)69}70}71}7273var (74TransferTaskManager *tache.Manager[*TransferTask]75)7677func transferStd(ctx context.Context, tempDir, dstDirPath string, deletePolicy DeletePolicy) error {78dstStorage, dstDirActualPath, err := op.GetStorageAndActualPath(dstDirPath)79if err != nil {80return errors.WithMessage(err, "failed get dst storage")81}82entries, err := os.ReadDir(tempDir)83if err != nil {84return err85}86taskCreator, _ := ctx.Value("user").(*model.User)87for _, entry := range entries {88t := &TransferTask{89TaskExtension: task.TaskExtension{90Creator: taskCreator,91},92SrcObjPath: stdpath.Join(tempDir, entry.Name()),93DstDirPath: dstDirActualPath,94DstStorage: dstStorage,95DstStorageMp: dstStorage.GetStorage().MountPath,96DeletePolicy: deletePolicy,97}98TransferTaskManager.Add(t)99}100return nil101}102103func transferStdPath(t *TransferTask) error {104t.Status = "getting src object"105info, err := os.Stat(t.SrcObjPath)106if err != nil {107return err108}109if info.IsDir() {110t.Status = "src object is dir, listing objs"111entries, err := os.ReadDir(t.SrcObjPath)112if err != nil {113return err114}115for _, entry := range entries {116srcRawPath := stdpath.Join(t.SrcObjPath, entry.Name())117dstObjPath := stdpath.Join(t.DstDirPath, info.Name())118t := &TransferTask{119TaskExtension: task.TaskExtension{120Creator: t.Creator,121},122SrcObjPath: srcRawPath,123DstDirPath: dstObjPath,124DstStorage: t.DstStorage,125SrcStorageMp: t.SrcStorageMp,126DstStorageMp: t.DstStorageMp,127DeletePolicy: t.DeletePolicy,128}129TransferTaskManager.Add(t)130}131t.Status = "src object is dir, added all transfer tasks of files"132return nil133}134return transferStdFile(t)135}136137func transferStdFile(t *TransferTask) error {138rc, err := os.Open(t.SrcObjPath)139if err != nil {140return errors.Wrapf(err, "failed to open file %s", t.SrcObjPath)141}142info, err := rc.Stat()143if err != nil {144return errors.Wrapf(err, "failed to get file %s", t.SrcObjPath)145}146mimetype := utils.GetMimeType(t.SrcObjPath)147s := &stream.FileStream{148Ctx: nil,149Obj: &model.Object{150Name: filepath.Base(t.SrcObjPath),151Size: info.Size(),152Modified: info.ModTime(),153IsFolder: false,154},155Reader: rc,156Mimetype: mimetype,157Closers: utils.NewClosers(rc),158}159t.SetTotalBytes(info.Size())160return op.Put(t.Ctx(), t.DstStorage, t.DstDirPath, s, t.SetProgress)161}162163func removeStdTemp(t *TransferTask) {164info, err := os.Stat(t.SrcObjPath)165if err != nil || info.IsDir() {166return167}168if err := os.Remove(t.SrcObjPath); err != nil {169log.Errorf("failed to delete temp file %s, error: %s", t.SrcObjPath, err.Error())170}171}172173func transferObj(ctx context.Context, tempDir, dstDirPath string, deletePolicy DeletePolicy) error {174srcStorage, srcObjActualPath, err := op.GetStorageAndActualPath(tempDir)175if err != nil {176return errors.WithMessage(err, "failed get src storage")177}178dstStorage, dstDirActualPath, err := op.GetStorageAndActualPath(dstDirPath)179if err != nil {180return errors.WithMessage(err, "failed get dst storage")181}182objs, err := op.List(ctx, srcStorage, srcObjActualPath, model.ListArgs{})183if err != nil {184return errors.WithMessagef(err, "failed list src [%s] objs", tempDir)185}186taskCreator, _ := ctx.Value("user").(*model.User) // taskCreator is nil when convert failed187for _, obj := range objs {188t := &TransferTask{189TaskExtension: task.TaskExtension{190Creator: taskCreator,191},192SrcObjPath: stdpath.Join(srcObjActualPath, obj.GetName()),193DstDirPath: dstDirActualPath,194SrcStorage: srcStorage,195DstStorage: dstStorage,196SrcStorageMp: srcStorage.GetStorage().MountPath,197DstStorageMp: dstStorage.GetStorage().MountPath,198DeletePolicy: deletePolicy,199}200TransferTaskManager.Add(t)201}202return nil203}204205func transferObjPath(t *TransferTask) error {206t.Status = "getting src object"207srcObj, err := op.Get(t.Ctx(), t.SrcStorage, t.SrcObjPath)208if err != nil {209return errors.WithMessagef(err, "failed get src [%s] file", t.SrcObjPath)210}211if srcObj.IsDir() {212t.Status = "src object is dir, listing objs"213objs, err := op.List(t.Ctx(), t.SrcStorage, t.SrcObjPath, model.ListArgs{})214if err != nil {215return errors.WithMessagef(err, "failed list src [%s] objs", t.SrcObjPath)216}217for _, obj := range objs {218if utils.IsCanceled(t.Ctx()) {219return nil220}221srcObjPath := stdpath.Join(t.SrcObjPath, obj.GetName())222dstObjPath := stdpath.Join(t.DstDirPath, srcObj.GetName())223TransferTaskManager.Add(&TransferTask{224TaskExtension: task.TaskExtension{225Creator: t.Creator,226},227SrcObjPath: srcObjPath,228DstDirPath: dstObjPath,229SrcStorage: t.SrcStorage,230DstStorage: t.DstStorage,231SrcStorageMp: t.SrcStorageMp,232DstStorageMp: t.DstStorageMp,233DeletePolicy: t.DeletePolicy,234})235}236t.Status = "src object is dir, added all transfer tasks of objs"237return nil238}239return transferObjFile(t)240}241242func transferObjFile(t *TransferTask) error {243srcFile, err := op.Get(t.Ctx(), t.SrcStorage, t.SrcObjPath)244if err != nil {245return errors.WithMessagef(err, "failed get src [%s] file", t.SrcObjPath)246}247link, _, err := op.Link(t.Ctx(), t.SrcStorage, t.SrcObjPath, model.LinkArgs{248Header: http.Header{},249})250if err != nil {251return errors.WithMessagef(err, "failed get [%s] link", t.SrcObjPath)252}253fs := stream.FileStream{254Obj: srcFile,255Ctx: t.Ctx(),256}257// any link provided is seekable258ss, err := stream.NewSeekableStream(fs, link)259if err != nil {260return errors.WithMessagef(err, "failed get [%s] stream", t.SrcObjPath)261}262t.SetTotalBytes(srcFile.GetSize())263return op.Put(t.Ctx(), t.DstStorage, t.DstDirPath, ss, t.SetProgress)264}265266func removeObjTemp(t *TransferTask) {267srcObj, err := op.Get(t.Ctx(), t.SrcStorage, t.SrcObjPath)268if err != nil || srcObj.IsDir() {269return270}271if err := op.Remove(t.Ctx(), t.SrcStorage, t.SrcObjPath); err != nil {272log.Errorf("failed to delete temp obj %s, error: %s", t.SrcObjPath, err.Error())273}274}275276277