Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
alist-org
GitHub Repository: alist-org/alist
Path: blob/main/pkg/task/manager.go
1560 views
1
package task
2
3
import (
4
"github.com/alist-org/alist/v3/pkg/generic_sync"
5
"github.com/alist-org/alist/v3/pkg/utils"
6
"github.com/pkg/errors"
7
log "github.com/sirupsen/logrus"
8
)
9
10
type Manager[K comparable] struct {
11
curID K
12
workerC chan struct{}
13
updateID func(*K)
14
tasks generic_sync.MapOf[K, *Task[K]]
15
}
16
17
func (tm *Manager[K]) Submit(task *Task[K]) K {
18
if tm.updateID != nil {
19
tm.updateID(&tm.curID)
20
task.ID = tm.curID
21
}
22
tm.tasks.Store(task.ID, task)
23
tm.do(task)
24
return task.ID
25
}
26
27
func (tm *Manager[K]) do(task *Task[K]) {
28
go func() {
29
log.Debugf("task [%s] waiting for worker", task.Name)
30
select {
31
case <-tm.workerC:
32
log.Debugf("task [%s] starting", task.Name)
33
task.run()
34
log.Debugf("task [%s] ended", task.Name)
35
case <-task.Ctx.Done():
36
log.Debugf("task [%s] canceled", task.Name)
37
return
38
}
39
// return worker
40
tm.workerC <- struct{}{}
41
}()
42
}
43
44
func (tm *Manager[K]) GetAll() []*Task[K] {
45
return tm.tasks.Values()
46
}
47
48
func (tm *Manager[K]) Get(tid K) (*Task[K], bool) {
49
return tm.tasks.Load(tid)
50
}
51
52
func (tm *Manager[K]) MustGet(tid K) *Task[K] {
53
task, _ := tm.Get(tid)
54
return task
55
}
56
57
func (tm *Manager[K]) Retry(tid K) error {
58
t, ok := tm.Get(tid)
59
if !ok {
60
return errors.WithStack(ErrTaskNotFound)
61
}
62
tm.do(t)
63
return nil
64
}
65
66
func (tm *Manager[K]) Cancel(tid K) error {
67
t, ok := tm.Get(tid)
68
if !ok {
69
return errors.WithStack(ErrTaskNotFound)
70
}
71
t.Cancel()
72
return nil
73
}
74
75
func (tm *Manager[K]) Remove(tid K) error {
76
t, ok := tm.Get(tid)
77
if !ok {
78
return errors.WithStack(ErrTaskNotFound)
79
}
80
if !t.Done() {
81
return errors.WithStack(ErrTaskRunning)
82
}
83
tm.tasks.Delete(tid)
84
return nil
85
}
86
87
// RemoveAll removes all tasks from the manager, this maybe shouldn't be used
88
// because the task maybe still running.
89
func (tm *Manager[K]) RemoveAll() {
90
tm.tasks.Clear()
91
}
92
93
func (tm *Manager[K]) RemoveByStates(states ...string) {
94
tasks := tm.GetAll()
95
for _, task := range tasks {
96
if utils.SliceContains(states, task.GetState()) {
97
_ = tm.Remove(task.ID)
98
}
99
}
100
}
101
102
func (tm *Manager[K]) GetByStates(states ...string) []*Task[K] {
103
var tasks []*Task[K]
104
tm.tasks.Range(func(key K, value *Task[K]) bool {
105
if utils.SliceContains(states, value.GetState()) {
106
tasks = append(tasks, value)
107
}
108
return true
109
})
110
return tasks
111
}
112
113
func (tm *Manager[K]) ListUndone() []*Task[K] {
114
return tm.GetByStates(PENDING, RUNNING, CANCELING)
115
}
116
117
func (tm *Manager[K]) ListDone() []*Task[K] {
118
return tm.GetByStates(SUCCEEDED, CANCELED, ERRORED)
119
}
120
121
func (tm *Manager[K]) ClearDone() {
122
tm.RemoveByStates(SUCCEEDED, CANCELED, ERRORED)
123
}
124
125
func (tm *Manager[K]) ClearSucceeded() {
126
tm.RemoveByStates(SUCCEEDED)
127
}
128
129
func (tm *Manager[K]) RawTasks() *generic_sync.MapOf[K, *Task[K]] {
130
return &tm.tasks
131
}
132
133
func NewTaskManager[K comparable](maxWorker int, updateID ...func(*K)) *Manager[K] {
134
tm := &Manager[K]{
135
tasks: generic_sync.MapOf[K, *Task[K]]{},
136
workerC: make(chan struct{}, maxWorker),
137
}
138
for i := 0; i < maxWorker; i++ {
139
tm.workerC <- struct{}{}
140
}
141
if len(updateID) > 0 {
142
tm.updateID = updateID[0]
143
}
144
return tm
145
}
146
147