Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
snail007
GitHub Repository: snail007/goproxy
Path: blob/master/utils/map.go
686 views
1
package utils
2
3
import (
4
"encoding/json"
5
"sync"
6
)
7
8
var SHARD_COUNT = 32
9
10
// A "thread" safe map of type string:Anything.
11
// To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards.
12
type ConcurrentMap []*ConcurrentMapShared
13
14
// A "thread" safe string to anything map.
15
type ConcurrentMapShared struct {
16
items map[string]interface{}
17
sync.RWMutex // Read Write mutex, guards access to internal map.
18
}
19
20
// Creates a new concurrent map.
21
func NewConcurrentMap() ConcurrentMap {
22
m := make(ConcurrentMap, SHARD_COUNT)
23
for i := 0; i < SHARD_COUNT; i++ {
24
m[i] = &ConcurrentMapShared{items: make(map[string]interface{})}
25
}
26
return m
27
}
28
29
// GetShard returns shard under given key
30
func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared {
31
return m[uint(fnv32(key))%uint(SHARD_COUNT)]
32
}
33
34
func (m ConcurrentMap) MSet(data map[string]interface{}) {
35
for key, value := range data {
36
shard := m.GetShard(key)
37
shard.Lock()
38
shard.items[key] = value
39
shard.Unlock()
40
}
41
}
42
43
// Sets the given value under the specified key.
44
func (m ConcurrentMap) Set(key string, value interface{}) {
45
// Get map shard.
46
shard := m.GetShard(key)
47
shard.Lock()
48
shard.items[key] = value
49
shard.Unlock()
50
}
51
52
// Callback to return new element to be inserted into the map
53
// It is called while lock is held, therefore it MUST NOT
54
// try to access other keys in same map, as it can lead to deadlock since
55
// Go sync.RWLock is not reentrant
56
type UpsertCb func(exist bool, valueInMap interface{}, newValue interface{}) interface{}
57
58
// Insert or Update - updates existing element or inserts a new one using UpsertCb
59
func (m ConcurrentMap) Upsert(key string, value interface{}, cb UpsertCb) (res interface{}) {
60
shard := m.GetShard(key)
61
shard.Lock()
62
v, ok := shard.items[key]
63
res = cb(ok, v, value)
64
shard.items[key] = res
65
shard.Unlock()
66
return res
67
}
68
69
// Sets the given value under the specified key if no value was associated with it.
70
func (m ConcurrentMap) SetIfAbsent(key string, value interface{}) bool {
71
// Get map shard.
72
shard := m.GetShard(key)
73
shard.Lock()
74
_, ok := shard.items[key]
75
if !ok {
76
shard.items[key] = value
77
}
78
shard.Unlock()
79
return !ok
80
}
81
82
// Get retrieves an element from map under given key.
83
func (m ConcurrentMap) Get(key string) (interface{}, bool) {
84
// Get shard
85
shard := m.GetShard(key)
86
shard.RLock()
87
// Get item from shard.
88
val, ok := shard.items[key]
89
shard.RUnlock()
90
return val, ok
91
}
92
93
// Count returns the number of elements within the map.
94
func (m ConcurrentMap) Count() int {
95
count := 0
96
for i := 0; i < SHARD_COUNT; i++ {
97
shard := m[i]
98
shard.RLock()
99
count += len(shard.items)
100
shard.RUnlock()
101
}
102
return count
103
}
104
105
// Looks up an item under specified key
106
func (m ConcurrentMap) Has(key string) bool {
107
// Get shard
108
shard := m.GetShard(key)
109
shard.RLock()
110
// See if element is within shard.
111
_, ok := shard.items[key]
112
shard.RUnlock()
113
return ok
114
}
115
116
// Remove removes an element from the map.
117
func (m ConcurrentMap) Remove(key string) {
118
// Try to get shard.
119
shard := m.GetShard(key)
120
shard.Lock()
121
delete(shard.items, key)
122
shard.Unlock()
123
}
124
125
// Pop removes an element from the map and returns it
126
func (m ConcurrentMap) Pop(key string) (v interface{}, exists bool) {
127
// Try to get shard.
128
shard := m.GetShard(key)
129
shard.Lock()
130
v, exists = shard.items[key]
131
delete(shard.items, key)
132
shard.Unlock()
133
return v, exists
134
}
135
136
// IsEmpty checks if map is empty.
137
func (m ConcurrentMap) IsEmpty() bool {
138
return m.Count() == 0
139
}
140
141
// Used by the Iter & IterBuffered functions to wrap two variables together over a channel,
142
type Tuple struct {
143
Key string
144
Val interface{}
145
}
146
147
// Iter returns an iterator which could be used in a for range loop.
148
//
149
// Deprecated: using IterBuffered() will get a better performence
150
func (m ConcurrentMap) Iter() <-chan Tuple {
151
chans := snapshot(m)
152
ch := make(chan Tuple)
153
go fanIn(chans, ch)
154
return ch
155
}
156
157
// IterBuffered returns a buffered iterator which could be used in a for range loop.
158
func (m ConcurrentMap) IterBuffered() <-chan Tuple {
159
chans := snapshot(m)
160
total := 0
161
for _, c := range chans {
162
total += cap(c)
163
}
164
ch := make(chan Tuple, total)
165
go fanIn(chans, ch)
166
return ch
167
}
168
169
// Returns a array of channels that contains elements in each shard,
170
// which likely takes a snapshot of `m`.
171
// It returns once the size of each buffered channel is determined,
172
// before all the channels are populated using goroutines.
173
func snapshot(m ConcurrentMap) (chans []chan Tuple) {
174
chans = make([]chan Tuple, SHARD_COUNT)
175
wg := sync.WaitGroup{}
176
wg.Add(SHARD_COUNT)
177
// Foreach shard.
178
for index, shard := range m {
179
go func(index int, shard *ConcurrentMapShared) {
180
// Foreach key, value pair.
181
shard.RLock()
182
chans[index] = make(chan Tuple, len(shard.items))
183
wg.Done()
184
for key, val := range shard.items {
185
chans[index] <- Tuple{key, val}
186
}
187
shard.RUnlock()
188
close(chans[index])
189
}(index, shard)
190
}
191
wg.Wait()
192
return chans
193
}
194
195
// fanIn reads elements from channels `chans` into channel `out`
196
func fanIn(chans []chan Tuple, out chan Tuple) {
197
wg := sync.WaitGroup{}
198
wg.Add(len(chans))
199
for _, ch := range chans {
200
go func(ch chan Tuple) {
201
for t := range ch {
202
out <- t
203
}
204
wg.Done()
205
}(ch)
206
}
207
wg.Wait()
208
close(out)
209
}
210
211
// Items returns all items as map[string]interface{}
212
func (m ConcurrentMap) Items() map[string]interface{} {
213
tmp := make(map[string]interface{})
214
215
// Insert items to temporary map.
216
for item := range m.IterBuffered() {
217
tmp[item.Key] = item.Val
218
}
219
220
return tmp
221
}
222
223
// Iterator callback,called for every key,value found in
224
// maps. RLock is held for all calls for a given shard
225
// therefore callback sess consistent view of a shard,
226
// but not across the shards
227
type IterCb func(key string, v interface{})
228
229
// Callback based iterator, cheapest way to read
230
// all elements in a map.
231
func (m ConcurrentMap) IterCb(fn IterCb) {
232
for idx := range m {
233
shard := (m)[idx]
234
shard.RLock()
235
for key, value := range shard.items {
236
fn(key, value)
237
}
238
shard.RUnlock()
239
}
240
}
241
242
// Keys returns all keys as []string
243
func (m ConcurrentMap) Keys() []string {
244
count := m.Count()
245
ch := make(chan string, count)
246
go func() {
247
// Foreach shard.
248
wg := sync.WaitGroup{}
249
wg.Add(SHARD_COUNT)
250
for _, shard := range m {
251
go func(shard *ConcurrentMapShared) {
252
// Foreach key, value pair.
253
shard.RLock()
254
for key := range shard.items {
255
ch <- key
256
}
257
shard.RUnlock()
258
wg.Done()
259
}(shard)
260
}
261
wg.Wait()
262
close(ch)
263
}()
264
265
// Generate keys
266
keys := make([]string, 0, count)
267
for k := range ch {
268
keys = append(keys, k)
269
}
270
return keys
271
}
272
273
//Reviles ConcurrentMap "private" variables to json marshal.
274
func (m ConcurrentMap) MarshalJSON() ([]byte, error) {
275
// Create a temporary map, which will hold all item spread across shards.
276
tmp := make(map[string]interface{})
277
278
// Insert items to temporary map.
279
for item := range m.IterBuffered() {
280
tmp[item.Key] = item.Val
281
}
282
return json.Marshal(tmp)
283
}
284
285
func fnv32(key string) uint32 {
286
hash := uint32(2166136261)
287
const prime32 = uint32(16777619)
288
for i := 0; i < len(key); i++ {
289
hash *= prime32
290
hash ^= uint32(key[i])
291
}
292
return hash
293
}
294
295
// Concurrent map uses Interface{} as its value, therefor JSON Unmarshal
296
// will probably won't know which to type to unmarshal into, in such case
297
// we'll end up with a value of type map[string]interface{}, In most cases this isn't
298
// out value type, this is why we've decided to remove this functionality.
299
300
// func (m *ConcurrentMap) UnmarshalJSON(b []byte) (err error) {
301
// // Reverse process of Marshal.
302
303
// tmp := make(map[string]interface{})
304
305
// // Unmarshal into a single map.
306
// if err := json.Unmarshal(b, &tmp); err != nil {
307
// return nil
308
// }
309
310
// // foreach key,value pair in temporary map insert into our concurrent map.
311
// for key, val := range tmp {
312
// m.Set(key, val)
313
// }
314
// return nil
315
// }
316
317