package utils12import (3"encoding/json"4"sync"5)67var SHARD_COUNT = 3289// A "thread" safe map of type string:Anything.10// To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards.11type ConcurrentMap []*ConcurrentMapShared1213// A "thread" safe string to anything map.14type ConcurrentMapShared struct {15items map[string]interface{}16sync.RWMutex // Read Write mutex, guards access to internal map.17}1819// Creates a new concurrent map.20func NewConcurrentMap() ConcurrentMap {21m := make(ConcurrentMap, SHARD_COUNT)22for i := 0; i < SHARD_COUNT; i++ {23m[i] = &ConcurrentMapShared{items: make(map[string]interface{})}24}25return m26}2728// GetShard returns shard under given key29func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared {30return m[uint(fnv32(key))%uint(SHARD_COUNT)]31}3233func (m ConcurrentMap) MSet(data map[string]interface{}) {34for key, value := range data {35shard := m.GetShard(key)36shard.Lock()37shard.items[key] = value38shard.Unlock()39}40}4142// Sets the given value under the specified key.43func (m ConcurrentMap) Set(key string, value interface{}) {44// Get map shard.45shard := m.GetShard(key)46shard.Lock()47shard.items[key] = value48shard.Unlock()49}5051// Callback to return new element to be inserted into the map52// It is called while lock is held, therefore it MUST NOT53// try to access other keys in same map, as it can lead to deadlock since54// Go sync.RWLock is not reentrant55type UpsertCb func(exist bool, valueInMap interface{}, newValue interface{}) interface{}5657// Insert or Update - updates existing element or inserts a new one using UpsertCb58func (m ConcurrentMap) Upsert(key string, value interface{}, cb UpsertCb) (res interface{}) {59shard := m.GetShard(key)60shard.Lock()61v, ok := shard.items[key]62res = cb(ok, v, value)63shard.items[key] = res64shard.Unlock()65return res66}6768// Sets the given value under the specified key if no value was associated with it.69func (m ConcurrentMap) SetIfAbsent(key string, value interface{}) bool {70// Get map shard.71shard := m.GetShard(key)72shard.Lock()73_, ok := shard.items[key]74if !ok {75shard.items[key] = value76}77shard.Unlock()78return !ok79}8081// Get retrieves an element from map under given key.82func (m ConcurrentMap) Get(key string) (interface{}, bool) {83// Get shard84shard := m.GetShard(key)85shard.RLock()86// Get item from shard.87val, ok := shard.items[key]88shard.RUnlock()89return val, ok90}9192// Count returns the number of elements within the map.93func (m ConcurrentMap) Count() int {94count := 095for i := 0; i < SHARD_COUNT; i++ {96shard := m[i]97shard.RLock()98count += len(shard.items)99shard.RUnlock()100}101return count102}103104// Looks up an item under specified key105func (m ConcurrentMap) Has(key string) bool {106// Get shard107shard := m.GetShard(key)108shard.RLock()109// See if element is within shard.110_, ok := shard.items[key]111shard.RUnlock()112return ok113}114115// Remove removes an element from the map.116func (m ConcurrentMap) Remove(key string) {117// Try to get shard.118shard := m.GetShard(key)119shard.Lock()120delete(shard.items, key)121shard.Unlock()122}123124// Pop removes an element from the map and returns it125func (m ConcurrentMap) Pop(key string) (v interface{}, exists bool) {126// Try to get shard.127shard := m.GetShard(key)128shard.Lock()129v, exists = shard.items[key]130delete(shard.items, key)131shard.Unlock()132return v, exists133}134135// IsEmpty checks if map is empty.136func (m ConcurrentMap) IsEmpty() bool {137return m.Count() == 0138}139140// Used by the Iter & IterBuffered functions to wrap two variables together over a channel,141type Tuple struct {142Key string143Val interface{}144}145146// Iter returns an iterator which could be used in a for range loop.147//148// Deprecated: using IterBuffered() will get a better performence149func (m ConcurrentMap) Iter() <-chan Tuple {150chans := snapshot(m)151ch := make(chan Tuple)152go fanIn(chans, ch)153return ch154}155156// IterBuffered returns a buffered iterator which could be used in a for range loop.157func (m ConcurrentMap) IterBuffered() <-chan Tuple {158chans := snapshot(m)159total := 0160for _, c := range chans {161total += cap(c)162}163ch := make(chan Tuple, total)164go fanIn(chans, ch)165return ch166}167168// Returns a array of channels that contains elements in each shard,169// which likely takes a snapshot of `m`.170// It returns once the size of each buffered channel is determined,171// before all the channels are populated using goroutines.172func snapshot(m ConcurrentMap) (chans []chan Tuple) {173chans = make([]chan Tuple, SHARD_COUNT)174wg := sync.WaitGroup{}175wg.Add(SHARD_COUNT)176// Foreach shard.177for index, shard := range m {178go func(index int, shard *ConcurrentMapShared) {179// Foreach key, value pair.180shard.RLock()181chans[index] = make(chan Tuple, len(shard.items))182wg.Done()183for key, val := range shard.items {184chans[index] <- Tuple{key, val}185}186shard.RUnlock()187close(chans[index])188}(index, shard)189}190wg.Wait()191return chans192}193194// fanIn reads elements from channels `chans` into channel `out`195func fanIn(chans []chan Tuple, out chan Tuple) {196wg := sync.WaitGroup{}197wg.Add(len(chans))198for _, ch := range chans {199go func(ch chan Tuple) {200for t := range ch {201out <- t202}203wg.Done()204}(ch)205}206wg.Wait()207close(out)208}209210// Items returns all items as map[string]interface{}211func (m ConcurrentMap) Items() map[string]interface{} {212tmp := make(map[string]interface{})213214// Insert items to temporary map.215for item := range m.IterBuffered() {216tmp[item.Key] = item.Val217}218219return tmp220}221222// Iterator callback,called for every key,value found in223// maps. RLock is held for all calls for a given shard224// therefore callback sess consistent view of a shard,225// but not across the shards226type IterCb func(key string, v interface{})227228// Callback based iterator, cheapest way to read229// all elements in a map.230func (m ConcurrentMap) IterCb(fn IterCb) {231for idx := range m {232shard := (m)[idx]233shard.RLock()234for key, value := range shard.items {235fn(key, value)236}237shard.RUnlock()238}239}240241// Keys returns all keys as []string242func (m ConcurrentMap) Keys() []string {243count := m.Count()244ch := make(chan string, count)245go func() {246// Foreach shard.247wg := sync.WaitGroup{}248wg.Add(SHARD_COUNT)249for _, shard := range m {250go func(shard *ConcurrentMapShared) {251// Foreach key, value pair.252shard.RLock()253for key := range shard.items {254ch <- key255}256shard.RUnlock()257wg.Done()258}(shard)259}260wg.Wait()261close(ch)262}()263264// Generate keys265keys := make([]string, 0, count)266for k := range ch {267keys = append(keys, k)268}269return keys270}271272//Reviles ConcurrentMap "private" variables to json marshal.273func (m ConcurrentMap) MarshalJSON() ([]byte, error) {274// Create a temporary map, which will hold all item spread across shards.275tmp := make(map[string]interface{})276277// Insert items to temporary map.278for item := range m.IterBuffered() {279tmp[item.Key] = item.Val280}281return json.Marshal(tmp)282}283284func fnv32(key string) uint32 {285hash := uint32(2166136261)286const prime32 = uint32(16777619)287for i := 0; i < len(key); i++ {288hash *= prime32289hash ^= uint32(key[i])290}291return hash292}293294// Concurrent map uses Interface{} as its value, therefor JSON Unmarshal295// will probably won't know which to type to unmarshal into, in such case296// we'll end up with a value of type map[string]interface{}, In most cases this isn't297// out value type, this is why we've decided to remove this functionality.298299// func (m *ConcurrentMap) UnmarshalJSON(b []byte) (err error) {300// // Reverse process of Marshal.301302// tmp := make(map[string]interface{})303304// // Unmarshal into a single map.305// if err := json.Unmarshal(b, &tmp); err != nil {306// return nil307// }308309// // foreach key,value pair in temporary map insert into our concurrent map.310// for key, val := range tmp {311// m.Set(key, val)312// }313// return nil314// }315316317