Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/traces/servicegraphprocessor/store.go
4095 views
1
package servicegraphprocessor
2
3
import (
4
"container/list"
5
"errors"
6
"sync"
7
"time"
8
)
9
10
var (
11
errTooManyItems = errors.New("too many items")
12
)
13
14
type storeCallback func(e *edge)
15
16
type store struct {
17
l *list.List
18
mtx *sync.RWMutex
19
m map[string]*list.Element
20
21
evictCallback storeCallback
22
ttl time.Duration
23
maxItems int
24
}
25
26
func newStore(ttl time.Duration, maxItems int, evictCallback storeCallback) *store {
27
s := &store{
28
l: list.New(),
29
mtx: &sync.RWMutex{},
30
m: make(map[string]*list.Element),
31
32
evictCallback: evictCallback,
33
ttl: ttl,
34
maxItems: maxItems,
35
}
36
37
return s
38
}
39
40
func (s *store) len() int {
41
s.mtx.RLock()
42
defer s.mtx.RUnlock()
43
44
return s.l.Len()
45
}
46
47
// shouldEvictHead checks if the oldest item (head of list) has expired and should be evicted.
48
// Returns true if the item has expired, false otherwise.
49
//
50
// Must be called under lock.
51
func (s *store) shouldEvictHead() bool {
52
h := s.l.Front()
53
if h == nil {
54
return false
55
}
56
ts := h.Value.(*edge).expiration
57
return ts < time.Now().Unix()
58
}
59
60
// evictHead removes the head from the store (and map).
61
// It also collects metrics for the evicted edge.
62
//
63
// Must be called under lock.
64
func (s *store) evictHead() {
65
front := s.l.Front().Value.(*edge)
66
s.evictEdge(front.key)
67
}
68
69
// evictEdge evicts and edge under lock
70
func (s *store) evictEdgeWithLock(key string) {
71
s.mtx.Lock()
72
defer s.mtx.Unlock()
73
74
s.evictEdge(key)
75
}
76
77
// evictEdge removes the edge from the store (and map).
78
// It also collects metrics for the evicted edge.
79
//
80
// Must be called under lock.
81
func (s *store) evictEdge(key string) {
82
ele := s.m[key]
83
if ele == nil { // it may already have been processed
84
return
85
}
86
87
edge := ele.Value.(*edge)
88
s.evictCallback(edge)
89
90
delete(s.m, key)
91
s.l.Remove(ele)
92
}
93
94
// Fetches an edge from the store.
95
// If the edge doesn't exist, it creates a new one with the default TTL.
96
func (s *store) upsertEdge(k string, cb storeCallback) (*edge, error) {
97
s.mtx.Lock()
98
defer s.mtx.Unlock()
99
100
if storedEdge, ok := s.m[k]; ok {
101
edge := storedEdge.Value.(*edge)
102
cb(edge)
103
return edge, nil
104
}
105
106
if s.l.Len() >= s.maxItems {
107
// todo: try to evict expired items
108
return nil, errTooManyItems
109
}
110
111
newEdge := newEdge(k, s.ttl)
112
ele := s.l.PushBack(newEdge)
113
s.m[k] = ele
114
cb(newEdge)
115
116
return newEdge, nil
117
}
118
119
// expire evicts all expired items in the store.
120
func (s *store) expire() {
121
s.mtx.RLock()
122
if !s.shouldEvictHead() {
123
s.mtx.RUnlock()
124
return
125
}
126
s.mtx.RUnlock()
127
128
s.mtx.Lock()
129
defer s.mtx.Unlock()
130
131
for s.shouldEvictHead() {
132
s.evictHead()
133
}
134
}
135
136