Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
alist-org
GitHub Repository: alist-org/alist
Path: blob/main/pkg/errgroup/errgroup.go
1560 views
1
package errgroup
2
3
import (
4
"context"
5
"fmt"
6
"sync"
7
"sync/atomic"
8
9
"github.com/avast/retry-go"
10
)
11
12
type token struct{}
13
type Group struct {
14
cancel func(error)
15
ctx context.Context
16
opts []retry.Option
17
18
success uint64
19
20
wg sync.WaitGroup
21
sem chan token
22
}
23
24
func NewGroupWithContext(ctx context.Context, limit int, retryOpts ...retry.Option) (*Group, context.Context) {
25
ctx, cancel := context.WithCancelCause(ctx)
26
return (&Group{cancel: cancel, ctx: ctx, opts: append(retryOpts, retry.Context(ctx))}).SetLimit(limit), ctx
27
}
28
29
func (g *Group) done() {
30
if g.sem != nil {
31
<-g.sem
32
}
33
g.wg.Done()
34
atomic.AddUint64(&g.success, 1)
35
}
36
37
func (g *Group) Wait() error {
38
g.wg.Wait()
39
return context.Cause(g.ctx)
40
}
41
42
func (g *Group) Go(f func(ctx context.Context) error) {
43
if g.sem != nil {
44
g.sem <- token{}
45
}
46
47
g.wg.Add(1)
48
go func() {
49
defer g.done()
50
if err := retry.Do(func() error { return f(g.ctx) }, g.opts...); err != nil {
51
g.cancel(err)
52
}
53
}()
54
}
55
56
func (g *Group) TryGo(f func(ctx context.Context) error) bool {
57
if g.sem != nil {
58
select {
59
case g.sem <- token{}:
60
default:
61
return false
62
}
63
}
64
65
g.wg.Add(1)
66
go func() {
67
defer g.done()
68
if err := retry.Do(func() error { return f(g.ctx) }, g.opts...); err != nil {
69
g.cancel(err)
70
}
71
}()
72
return true
73
}
74
75
func (g *Group) SetLimit(n int) *Group {
76
if len(g.sem) != 0 {
77
panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
78
}
79
if n > 0 {
80
g.sem = make(chan token, n)
81
} else {
82
g.sem = nil
83
}
84
return g
85
}
86
87
func (g *Group) Success() uint64 {
88
return atomic.LoadUint64(&g.success)
89
}
90
91
func (g *Group) Err() error {
92
return context.Cause(g.ctx)
93
}
94
95