Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
alist-org
GitHub Repository: alist-org/alist
Path: blob/main/pkg/singleflight/singleflight.go
1560 views
1
// Copyright 2013 The Go Authors. All rights reserved.
2
// Use of this source code is governed by a BSD-style
3
// license that can be found in the LICENSE file.
4
5
// Package singleflight provides a duplicate function call suppression
6
// mechanism.
7
package singleflight
8
9
import (
10
"bytes"
11
"errors"
12
"fmt"
13
"runtime"
14
"runtime/debug"
15
"sync"
16
)
17
18
// errGoexit indicates the runtime.Goexit was called in
19
// the user given function.
20
var errGoexit = errors.New("runtime.Goexit was called")
21
22
// A panicError is an arbitrary value recovered from a panic
23
// with the stack trace during the execution of given function.
24
type panicError struct {
25
value any
26
stack []byte
27
}
28
29
// Error implements error interface.
30
func (p *panicError) Error() string {
31
return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
32
}
33
34
func newPanicError(v any) error {
35
stack := debug.Stack()
36
37
// The first line of the stack trace is of the form "goroutine N [status]:"
38
// but by the time the panic reaches Do the goroutine may no longer exist
39
// and its status will have changed. Trim out the misleading line.
40
if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
41
stack = stack[line+1:]
42
}
43
return &panicError{value: v, stack: stack}
44
}
45
46
// call is an in-flight or completed singleflight.Do call
47
type call[T any] struct {
48
wg sync.WaitGroup
49
50
// These fields are written once before the WaitGroup is done
51
// and are only read after the WaitGroup is done.
52
val T
53
err error
54
55
// forgotten indicates whether Forget was called with this call's key
56
// while the call was still in flight.
57
forgotten bool
58
59
// These fields are read and written with the singleflight
60
// mutex held before the WaitGroup is done, and are read but
61
// not written after the WaitGroup is done.
62
dups int
63
chans []chan<- Result[T]
64
}
65
66
// Group represents a class of work and forms a namespace in
67
// which units of work can be executed with duplicate suppression.
68
type Group[T any] struct {
69
mu sync.Mutex // protects m
70
m map[string]*call[T] // lazily initialized
71
}
72
73
// Result holds the results of Do, so they can be passed
74
// on a channel.
75
type Result[T any] struct {
76
Val T
77
Err error
78
Shared bool
79
}
80
81
// Do executes and returns the results of the given function, making
82
// sure that only one execution is in-flight for a given key at a
83
// time. If a duplicate comes in, the duplicate caller waits for the
84
// original to complete and receives the same results.
85
// The return value shared indicates whether v was given to multiple callers.
86
func (g *Group[T]) Do(key string, fn func() (T, error)) (v T, err error, shared bool) {
87
g.mu.Lock()
88
if g.m == nil {
89
g.m = make(map[string]*call[T])
90
}
91
if c, ok := g.m[key]; ok {
92
c.dups++
93
g.mu.Unlock()
94
c.wg.Wait()
95
96
if e, ok := c.err.(*panicError); ok {
97
panic(e)
98
} else if c.err == errGoexit {
99
runtime.Goexit()
100
}
101
return c.val, c.err, true
102
}
103
c := new(call[T])
104
c.wg.Add(1)
105
g.m[key] = c
106
g.mu.Unlock()
107
108
g.doCall(c, key, fn)
109
return c.val, c.err, c.dups > 0
110
}
111
112
// DoChan is like Do but returns a channel that will receive the
113
// results when they are ready.
114
//
115
// The returned channel will not be closed.
116
func (g *Group[T]) DoChan(key string, fn func() (T, error)) <-chan Result[T] {
117
ch := make(chan Result[T], 1)
118
g.mu.Lock()
119
if g.m == nil {
120
g.m = make(map[string]*call[T])
121
}
122
if c, ok := g.m[key]; ok {
123
c.dups++
124
c.chans = append(c.chans, ch)
125
g.mu.Unlock()
126
return ch
127
}
128
c := &call[T]{chans: []chan<- Result[T]{ch}}
129
c.wg.Add(1)
130
g.m[key] = c
131
g.mu.Unlock()
132
133
go g.doCall(c, key, fn)
134
135
return ch
136
}
137
138
// doCall handles the single call for a key.
139
func (g *Group[T]) doCall(c *call[T], key string, fn func() (T, error)) {
140
normalReturn := false
141
recovered := false
142
143
// use double-defer to distinguish panic from runtime.Goexit,
144
// more details see https://golang.org/cl/134395
145
defer func() {
146
// the given function invoked runtime.Goexit
147
if !normalReturn && !recovered {
148
c.err = errGoexit
149
}
150
151
c.wg.Done()
152
g.mu.Lock()
153
defer g.mu.Unlock()
154
if !c.forgotten {
155
delete(g.m, key)
156
}
157
158
if e, ok := c.err.(*panicError); ok {
159
// In order to prevent the waiting channels from being blocked forever,
160
// needs to ensure that this panic cannot be recovered.
161
if len(c.chans) > 0 {
162
go panic(e)
163
select {} // Keep this goroutine around so that it will appear in the crash dump.
164
} else {
165
panic(e)
166
}
167
} else if c.err == errGoexit {
168
// Already in the process of goexit, no need to call again
169
} else {
170
// Normal return
171
for _, ch := range c.chans {
172
ch <- Result[T]{c.val, c.err, c.dups > 0}
173
}
174
}
175
}()
176
177
func() {
178
defer func() {
179
if !normalReturn {
180
// Ideally, we would wait to take a stack trace until we've determined
181
// whether this is a panic or a runtime.Goexit.
182
//
183
// Unfortunately, the only way we can distinguish the two is to see
184
// whether the recover stopped the goroutine from terminating, and by
185
// the time we know that, the part of the stack trace relevant to the
186
// panic has been discarded.
187
if r := recover(); r != nil {
188
c.err = newPanicError(r)
189
}
190
}
191
}()
192
193
c.val, c.err = fn()
194
normalReturn = true
195
}()
196
197
if !normalReturn {
198
recovered = true
199
}
200
}
201
202
// Forget tells the singleflight to forget about a key. Future calls
203
// to Do for this key will call the function rather than waiting for
204
// an earlier call to complete.
205
func (g *Group[T]) Forget(key string) {
206
g.mu.Lock()
207
if c, ok := g.m[key]; ok {
208
c.forgotten = true
209
}
210
delete(g.m, key)
211
g.mu.Unlock()
212
}
213
214