Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
alist-org
GitHub Repository: alist-org/alist
Path: blob/main/pkg/mq/mq.go
1560 views
1
package mq
2
3
import (
4
"sync"
5
6
"github.com/alist-org/alist/v3/pkg/generic"
7
)
8
9
type Message[T any] struct {
10
Content T
11
}
12
13
type BasicConsumer[T any] func(Message[T])
14
type AllConsumer[T any] func([]Message[T])
15
16
type MQ[T any] interface {
17
Publish(Message[T])
18
Consume(BasicConsumer[T])
19
ConsumeAll(AllConsumer[T])
20
Clear()
21
Len() int
22
}
23
24
type inMemoryMQ[T any] struct {
25
queue generic.Queue[Message[T]]
26
sync.Mutex
27
}
28
29
func NewInMemoryMQ[T any]() MQ[T] {
30
return &inMemoryMQ[T]{queue: *generic.NewQueue[Message[T]]()}
31
}
32
33
func (mq *inMemoryMQ[T]) Publish(msg Message[T]) {
34
mq.Lock()
35
defer mq.Unlock()
36
mq.queue.Push(msg)
37
}
38
39
func (mq *inMemoryMQ[T]) Consume(consumer BasicConsumer[T]) {
40
mq.Lock()
41
defer mq.Unlock()
42
for !mq.queue.IsEmpty() {
43
consumer(mq.queue.Pop())
44
}
45
}
46
47
func (mq *inMemoryMQ[T]) ConsumeAll(consumer AllConsumer[T]) {
48
mq.Lock()
49
defer mq.Unlock()
50
consumer(mq.queue.PopAll())
51
}
52
53
func (mq *inMemoryMQ[T]) Clear() {
54
mq.Lock()
55
defer mq.Unlock()
56
mq.queue.Clear()
57
}
58
59
func (mq *inMemoryMQ[T]) Len() int {
60
mq.Lock()
61
defer mq.Unlock()
62
return mq.queue.Len()
63
}
64
65