Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/flow/internal/controller/value_cache.go
4095 views
1
package controller
2
3
import (
4
"sync"
5
6
"github.com/grafana/agent/component"
7
"github.com/grafana/agent/pkg/river/vm"
8
)
9
10
// valueCache caches component arguments and exports to expose as variables for
11
// River expressions.
12
//
13
// The current state of valueCache can then be built into a *vm.Scope for other
14
// components to be evaluated.
15
type valueCache struct {
16
mut sync.RWMutex
17
components map[string]ComponentID // NodeID -> ComponentID
18
args map[string]interface{} // NodeID -> component arguments value
19
exports map[string]interface{} // NodeID -> component exports value
20
moduleArguments map[string]any // key -> module arguments value
21
moduleExports map[string]any // name -> value for the value of module exports
22
moduleChangedIndex int // Everytime a change occurs this is incremented
23
}
24
25
// newValueCache creates a new ValueCache.
26
func newValueCache() *valueCache {
27
return &valueCache{
28
components: make(map[string]ComponentID),
29
args: make(map[string]interface{}),
30
exports: make(map[string]interface{}),
31
moduleArguments: make(map[string]any),
32
moduleExports: make(map[string]any),
33
}
34
}
35
36
// CacheArguments will cache the provided arguments by the given id. args may
37
// be nil to store an empty object.
38
func (vc *valueCache) CacheArguments(id ComponentID, args component.Arguments) {
39
vc.mut.Lock()
40
defer vc.mut.Unlock()
41
42
nodeID := id.String()
43
vc.components[nodeID] = id
44
45
var argsVal interface{} = make(map[string]interface{})
46
if args != nil {
47
argsVal = args
48
}
49
vc.args[nodeID] = argsVal
50
}
51
52
// CacheExports will cache the provided exports using the given id. exports may
53
// be nil to store an empty object.
54
func (vc *valueCache) CacheExports(id ComponentID, exports component.Exports) {
55
vc.mut.Lock()
56
defer vc.mut.Unlock()
57
58
nodeID := id.String()
59
vc.components[nodeID] = id
60
61
var exportsVal interface{} = make(map[string]interface{})
62
if exports != nil {
63
exportsVal = exports
64
}
65
vc.exports[nodeID] = exportsVal
66
}
67
68
// CacheModuleArgument will cache the provided exports using the given id.
69
func (vc *valueCache) CacheModuleArgument(key string, value any) {
70
vc.mut.Lock()
71
defer vc.mut.Unlock()
72
73
if value == nil {
74
vc.moduleArguments[key] = nil
75
} else {
76
vc.moduleArguments[key] = value
77
}
78
}
79
80
// CacheModuleExportValue saves the value to the map
81
func (vc *valueCache) CacheModuleExportValue(name string, value any) {
82
vc.mut.Lock()
83
defer vc.mut.Unlock()
84
85
// Need to see if the module exports have changed.
86
v, found := vc.moduleExports[name]
87
if !found {
88
vc.moduleChangedIndex++
89
}
90
if v != value {
91
vc.moduleChangedIndex++
92
}
93
94
vc.moduleExports[name] = value
95
}
96
97
// CreateModuleExports creates a map for usage on OnExportsChanged
98
func (vc *valueCache) CreateModuleExports() map[string]any {
99
vc.mut.RLock()
100
defer vc.mut.RUnlock()
101
102
exports := make(map[string]any)
103
for k, v := range vc.moduleExports {
104
exports[k] = v
105
}
106
return exports
107
}
108
109
// ClearModuleExports empties the map and notifies that the exports have changed.
110
func (vc *valueCache) ClearModuleExports() {
111
vc.mut.Lock()
112
defer vc.mut.Unlock()
113
114
vc.moduleChangedIndex++
115
vc.moduleExports = make(map[string]any)
116
}
117
118
// ExportChangeIndex return the change index.
119
func (vc *valueCache) ExportChangeIndex() int {
120
vc.mut.RLock()
121
defer vc.mut.RUnlock()
122
123
return vc.moduleChangedIndex
124
}
125
126
// SyncIDs will remove any cached values for any Component ID which is not in
127
// ids. SyncIDs should be called with the current set of components after the
128
// graph is updated.
129
func (vc *valueCache) SyncIDs(ids []ComponentID) {
130
expectMap := make(map[string]ComponentID, len(ids))
131
for _, id := range ids {
132
expectMap[id.String()] = id
133
}
134
135
vc.mut.Lock()
136
defer vc.mut.Unlock()
137
138
for id := range vc.components {
139
if _, keep := expectMap[id]; keep {
140
continue
141
}
142
delete(vc.components, id)
143
delete(vc.args, id)
144
delete(vc.exports, id)
145
}
146
}
147
148
// SyncModuleArgs will remove any cached values for any args no longer in the map.
149
func (vc *valueCache) SyncModuleArgs(args map[string]any) {
150
vc.mut.Lock()
151
defer vc.mut.Unlock()
152
153
for id := range vc.moduleArguments {
154
if _, keep := args[id]; keep {
155
continue
156
}
157
delete(vc.moduleArguments, id)
158
}
159
}
160
161
// BuildContext builds a vm.Scope based on the current set of cached values.
162
// The arguments and exports for the same ID are merged into one object.
163
func (vc *valueCache) BuildContext() *vm.Scope {
164
vc.mut.RLock()
165
defer vc.mut.RUnlock()
166
167
scope := &vm.Scope{
168
Parent: nil,
169
Variables: make(map[string]interface{}),
170
}
171
172
// First, partition components by River block name.
173
var componentsByBlockName = make(map[string][]ComponentID)
174
for _, id := range vc.components {
175
blockName := id[0]
176
componentsByBlockName[blockName] = append(componentsByBlockName[blockName], id)
177
}
178
179
// Then, convert each partition into a single value.
180
for blockName, ids := range componentsByBlockName {
181
scope.Variables[blockName] = vc.buildValue(ids, 1)
182
}
183
184
// Add module arguments to the scope.
185
if len(vc.moduleArguments) > 0 {
186
scope.Variables["argument"] = make(map[string]any)
187
}
188
for key, value := range vc.moduleArguments {
189
keyMap := make(map[string]any)
190
keyMap["value"] = value
191
192
switch args := scope.Variables["argument"].(type) {
193
case map[string]any:
194
args[key] = keyMap
195
}
196
}
197
198
return scope
199
}
200
201
// buildValue recursively converts the set of user components into a single
202
// value. offset is used to determine which element in the userComponentName
203
// we're looking at.
204
func (vc *valueCache) buildValue(from []ComponentID, offset int) interface{} {
205
// We can't recurse anymore; return the node directly.
206
if len(from) == 1 && offset >= len(from[0]) {
207
name := from[0].String()
208
209
// TODO(rfratto): should we allow arguments to be returned so users can
210
// reference arguments as well as exports?
211
exports, ok := vc.exports[name]
212
if !ok {
213
exports = make(map[string]interface{})
214
}
215
return exports
216
}
217
218
attrs := make(map[string]interface{})
219
220
// First, partition the components by their label.
221
var componentsByLabel = make(map[string][]ComponentID)
222
for _, id := range from {
223
blockName := id[offset]
224
componentsByLabel[blockName] = append(componentsByLabel[blockName], id)
225
}
226
227
// Then, convert each partition into a single value.
228
for label, ids := range componentsByLabel {
229
attrs[label] = vc.buildValue(ids, offset+1)
230
}
231
return attrs
232
}
233
234