Path: blob/main/pkg/traces/servicegraphprocessor/store.go
4095 views
package servicegraphprocessor12import (3"container/list"4"errors"5"sync"6"time"7)89var (10errTooManyItems = errors.New("too many items")11)1213type storeCallback func(e *edge)1415type store struct {16l *list.List17mtx *sync.RWMutex18m map[string]*list.Element1920evictCallback storeCallback21ttl time.Duration22maxItems int23}2425func newStore(ttl time.Duration, maxItems int, evictCallback storeCallback) *store {26s := &store{27l: list.New(),28mtx: &sync.RWMutex{},29m: make(map[string]*list.Element),3031evictCallback: evictCallback,32ttl: ttl,33maxItems: maxItems,34}3536return s37}3839func (s *store) len() int {40s.mtx.RLock()41defer s.mtx.RUnlock()4243return s.l.Len()44}4546// shouldEvictHead checks if the oldest item (head of list) has expired and should be evicted.47// Returns true if the item has expired, false otherwise.48//49// Must be called under lock.50func (s *store) shouldEvictHead() bool {51h := s.l.Front()52if h == nil {53return false54}55ts := h.Value.(*edge).expiration56return ts < time.Now().Unix()57}5859// evictHead removes the head from the store (and map).60// It also collects metrics for the evicted edge.61//62// Must be called under lock.63func (s *store) evictHead() {64front := s.l.Front().Value.(*edge)65s.evictEdge(front.key)66}6768// evictEdge evicts and edge under lock69func (s *store) evictEdgeWithLock(key string) {70s.mtx.Lock()71defer s.mtx.Unlock()7273s.evictEdge(key)74}7576// evictEdge removes the edge from the store (and map).77// It also collects metrics for the evicted edge.78//79// Must be called under lock.80func (s *store) evictEdge(key string) {81ele := s.m[key]82if ele == nil { // it may already have been processed83return84}8586edge := ele.Value.(*edge)87s.evictCallback(edge)8889delete(s.m, key)90s.l.Remove(ele)91}9293// Fetches an edge from the store.94// If the edge doesn't exist, it creates a new one with the default TTL.95func (s *store) upsertEdge(k string, cb storeCallback) (*edge, error) {96s.mtx.Lock()97defer s.mtx.Unlock()9899if storedEdge, ok := s.m[k]; ok {100edge := storedEdge.Value.(*edge)101cb(edge)102return edge, nil103}104105if s.l.Len() >= s.maxItems {106// todo: try to evict expired items107return nil, errTooManyItems108}109110newEdge := newEdge(k, s.ttl)111ele := s.l.PushBack(newEdge)112s.m[k] = ele113cb(newEdge)114115return newEdge, nil116}117118// expire evicts all expired items in the store.119func (s *store) expire() {120s.mtx.RLock()121if !s.shouldEvictHead() {122s.mtx.RUnlock()123return124}125s.mtx.RUnlock()126127s.mtx.Lock()128defer s.mtx.Unlock()129130for s.shouldEvictHead() {131s.evictHead()132}133}134135136