Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
alist-org
GitHub Repository: alist-org/alist
Path: blob/main/internal/search/build.go
1560 views
1
package search
2
3
import (
4
"context"
5
"path"
6
"path/filepath"
7
"strings"
8
"sync"
9
"sync/atomic"
10
"time"
11
12
"github.com/alist-org/alist/v3/internal/conf"
13
"github.com/alist-org/alist/v3/internal/errs"
14
"github.com/alist-org/alist/v3/internal/fs"
15
"github.com/alist-org/alist/v3/internal/model"
16
"github.com/alist-org/alist/v3/internal/op"
17
"github.com/alist-org/alist/v3/internal/search/searcher"
18
"github.com/alist-org/alist/v3/internal/setting"
19
"github.com/alist-org/alist/v3/pkg/mq"
20
"github.com/alist-org/alist/v3/pkg/utils"
21
mapset "github.com/deckarep/golang-set/v2"
22
log "github.com/sirupsen/logrus"
23
)
24
25
var (
26
Quit = atomic.Pointer[chan struct{}]{}
27
)
28
29
func Running() bool {
30
return Quit.Load() != nil
31
}
32
33
func BuildIndex(ctx context.Context, indexPaths, ignorePaths []string, maxDepth int, count bool) error {
34
var (
35
err error
36
objCount uint64 = 0
37
fi model.Obj
38
)
39
log.Infof("build index for: %+v", indexPaths)
40
log.Infof("ignore paths: %+v", ignorePaths)
41
quit := make(chan struct{}, 1)
42
if !Quit.CompareAndSwap(nil, &quit) {
43
// other goroutine is running
44
return errs.BuildIndexIsRunning
45
}
46
var (
47
indexMQ = mq.NewInMemoryMQ[ObjWithParent]()
48
running = atomic.Bool{} // current goroutine running
49
wg = &sync.WaitGroup{}
50
)
51
running.Store(true)
52
wg.Add(1)
53
go func() {
54
ticker := time.NewTicker(time.Second)
55
defer func() {
56
Quit.Store(nil)
57
wg.Done()
58
// notify walk to exit when StopIndex api called
59
running.Store(false)
60
ticker.Stop()
61
}()
62
tickCount := 0
63
for {
64
select {
65
case <-ticker.C:
66
tickCount += 1
67
if indexMQ.Len() < 1000 && tickCount != 5 {
68
continue
69
} else if tickCount >= 5 {
70
tickCount = 0
71
}
72
log.Infof("index obj count: %d", objCount)
73
indexMQ.ConsumeAll(func(messages []mq.Message[ObjWithParent]) {
74
if len(messages) != 0 {
75
log.Debugf("current index: %s", messages[len(messages)-1].Content.Parent)
76
}
77
if err = BatchIndex(ctx, utils.MustSliceConvert(messages,
78
func(src mq.Message[ObjWithParent]) ObjWithParent {
79
return src.Content
80
})); err != nil {
81
log.Errorf("build index in batch error: %+v", err)
82
} else {
83
objCount = objCount + uint64(len(messages))
84
}
85
if count {
86
WriteProgress(&model.IndexProgress{
87
ObjCount: objCount,
88
IsDone: false,
89
LastDoneTime: nil,
90
})
91
}
92
})
93
94
case <-quit:
95
log.Debugf("build index for %+v received quit", indexPaths)
96
eMsg := ""
97
now := time.Now()
98
originErr := err
99
indexMQ.ConsumeAll(func(messages []mq.Message[ObjWithParent]) {
100
if err = BatchIndex(ctx, utils.MustSliceConvert(messages,
101
func(src mq.Message[ObjWithParent]) ObjWithParent {
102
return src.Content
103
})); err != nil {
104
log.Errorf("build index in batch error: %+v", err)
105
} else {
106
objCount = objCount + uint64(len(messages))
107
}
108
if originErr != nil {
109
log.Errorf("build index error: %+v", originErr)
110
eMsg = originErr.Error()
111
} else {
112
log.Infof("success build index, count: %d", objCount)
113
}
114
if count {
115
WriteProgress(&model.IndexProgress{
116
ObjCount: objCount,
117
IsDone: true,
118
LastDoneTime: &now,
119
Error: eMsg,
120
})
121
}
122
})
123
log.Debugf("build index for %+v quit success", indexPaths)
124
return
125
}
126
}
127
}()
128
defer func() {
129
if !running.Load() || Quit.Load() != &quit {
130
log.Debugf("build index for %+v stopped by StopIndex", indexPaths)
131
return
132
}
133
select {
134
// avoid goroutine leak
135
case quit <- struct{}{}:
136
default:
137
}
138
wg.Wait()
139
}()
140
admin, err := op.GetAdmin()
141
if err != nil {
142
return err
143
}
144
if count {
145
WriteProgress(&model.IndexProgress{
146
ObjCount: 0,
147
IsDone: false,
148
})
149
}
150
for _, indexPath := range indexPaths {
151
walkFn := func(indexPath string, info model.Obj) error {
152
if !running.Load() {
153
return filepath.SkipDir
154
}
155
for _, avoidPath := range ignorePaths {
156
if strings.HasPrefix(indexPath, avoidPath) {
157
return filepath.SkipDir
158
}
159
}
160
if storage, _, err := op.GetStorageAndActualPath(indexPath); err == nil {
161
if storage.GetStorage().DisableIndex {
162
return filepath.SkipDir
163
}
164
}
165
// ignore root
166
if indexPath == "/" {
167
return nil
168
}
169
indexMQ.Publish(mq.Message[ObjWithParent]{
170
Content: ObjWithParent{
171
Obj: info,
172
Parent: path.Dir(indexPath),
173
},
174
})
175
return nil
176
}
177
fi, err = fs.Get(ctx, indexPath, &fs.GetArgs{})
178
if err != nil {
179
return err
180
}
181
// TODO: run walkFS concurrently
182
err = fs.WalkFS(context.WithValue(ctx, "user", admin), maxDepth, indexPath, fi, walkFn)
183
if err != nil {
184
return err
185
}
186
}
187
return nil
188
}
189
190
func Del(ctx context.Context, prefix string) error {
191
return instance.Del(ctx, prefix)
192
}
193
194
func Clear(ctx context.Context) error {
195
return instance.Clear(ctx)
196
}
197
198
func Config(ctx context.Context) searcher.Config {
199
return instance.Config()
200
}
201
202
func Update(parent string, objs []model.Obj) {
203
if instance == nil || !instance.Config().AutoUpdate || !setting.GetBool(conf.AutoUpdateIndex) || Running() {
204
return
205
}
206
if isIgnorePath(parent) {
207
return
208
}
209
ctx := context.Background()
210
// only update when index have built
211
progress, err := Progress()
212
if err != nil {
213
log.Errorf("update search index error while get progress: %+v", err)
214
return
215
}
216
if !progress.IsDone {
217
return
218
}
219
nodes, err := instance.Get(ctx, parent)
220
if err != nil {
221
log.Errorf("update search index error while get nodes: %+v", err)
222
return
223
}
224
now := mapset.NewSet[string]()
225
for i := range objs {
226
now.Add(objs[i].GetName())
227
}
228
old := mapset.NewSet[string]()
229
for i := range nodes {
230
old.Add(nodes[i].Name)
231
}
232
// delete data that no longer exists
233
toDelete := old.Difference(now)
234
toAdd := now.Difference(old)
235
for i := range nodes {
236
if toDelete.Contains(nodes[i].Name) && !op.HasStorage(path.Join(parent, nodes[i].Name)) {
237
log.Debugf("delete index: %s", path.Join(parent, nodes[i].Name))
238
err = instance.Del(ctx, path.Join(parent, nodes[i].Name))
239
if err != nil {
240
log.Errorf("update search index error while del old node: %+v", err)
241
return
242
}
243
}
244
}
245
for i := range objs {
246
if toAdd.Contains(objs[i].GetName()) {
247
if !objs[i].IsDir() {
248
log.Debugf("add index: %s", path.Join(parent, objs[i].GetName()))
249
err = Index(ctx, parent, objs[i])
250
if err != nil {
251
log.Errorf("update search index error while index new node: %+v", err)
252
return
253
}
254
} else {
255
// build index if it's a folder
256
dir := path.Join(parent, objs[i].GetName())
257
err = BuildIndex(ctx,
258
[]string{dir},
259
conf.SlicesMap[conf.IgnorePaths],
260
setting.GetInt(conf.MaxIndexDepth, 20)-strings.Count(dir, "/"), false)
261
if err != nil {
262
log.Errorf("update search index error while build index: %+v", err)
263
return
264
}
265
}
266
}
267
}
268
}
269
270
func init() {
271
op.RegisterObjsUpdateHook(Update)
272
}
273
274