Path: blob/main/pkg/flow/internal/controller/value_cache.go
4095 views
package controller12import (3"sync"45"github.com/grafana/agent/component"6"github.com/grafana/agent/pkg/river/vm"7)89// valueCache caches component arguments and exports to expose as variables for10// River expressions.11//12// The current state of valueCache can then be built into a *vm.Scope for other13// components to be evaluated.14type valueCache struct {15mut sync.RWMutex16components map[string]ComponentID // NodeID -> ComponentID17args map[string]interface{} // NodeID -> component arguments value18exports map[string]interface{} // NodeID -> component exports value19moduleArguments map[string]any // key -> module arguments value20moduleExports map[string]any // name -> value for the value of module exports21moduleChangedIndex int // Everytime a change occurs this is incremented22}2324// newValueCache creates a new ValueCache.25func newValueCache() *valueCache {26return &valueCache{27components: make(map[string]ComponentID),28args: make(map[string]interface{}),29exports: make(map[string]interface{}),30moduleArguments: make(map[string]any),31moduleExports: make(map[string]any),32}33}3435// CacheArguments will cache the provided arguments by the given id. args may36// be nil to store an empty object.37func (vc *valueCache) CacheArguments(id ComponentID, args component.Arguments) {38vc.mut.Lock()39defer vc.mut.Unlock()4041nodeID := id.String()42vc.components[nodeID] = id4344var argsVal interface{} = make(map[string]interface{})45if args != nil {46argsVal = args47}48vc.args[nodeID] = argsVal49}5051// CacheExports will cache the provided exports using the given id. exports may52// be nil to store an empty object.53func (vc *valueCache) CacheExports(id ComponentID, exports component.Exports) {54vc.mut.Lock()55defer vc.mut.Unlock()5657nodeID := id.String()58vc.components[nodeID] = id5960var exportsVal interface{} = make(map[string]interface{})61if exports != nil {62exportsVal = exports63}64vc.exports[nodeID] = exportsVal65}6667// CacheModuleArgument will cache the provided exports using the given id.68func (vc *valueCache) CacheModuleArgument(key string, value any) {69vc.mut.Lock()70defer vc.mut.Unlock()7172if value == nil {73vc.moduleArguments[key] = nil74} else {75vc.moduleArguments[key] = value76}77}7879// CacheModuleExportValue saves the value to the map80func (vc *valueCache) CacheModuleExportValue(name string, value any) {81vc.mut.Lock()82defer vc.mut.Unlock()8384// Need to see if the module exports have changed.85v, found := vc.moduleExports[name]86if !found {87vc.moduleChangedIndex++88}89if v != value {90vc.moduleChangedIndex++91}9293vc.moduleExports[name] = value94}9596// CreateModuleExports creates a map for usage on OnExportsChanged97func (vc *valueCache) CreateModuleExports() map[string]any {98vc.mut.RLock()99defer vc.mut.RUnlock()100101exports := make(map[string]any)102for k, v := range vc.moduleExports {103exports[k] = v104}105return exports106}107108// ClearModuleExports empties the map and notifies that the exports have changed.109func (vc *valueCache) ClearModuleExports() {110vc.mut.Lock()111defer vc.mut.Unlock()112113vc.moduleChangedIndex++114vc.moduleExports = make(map[string]any)115}116117// ExportChangeIndex return the change index.118func (vc *valueCache) ExportChangeIndex() int {119vc.mut.RLock()120defer vc.mut.RUnlock()121122return vc.moduleChangedIndex123}124125// SyncIDs will remove any cached values for any Component ID which is not in126// ids. SyncIDs should be called with the current set of components after the127// graph is updated.128func (vc *valueCache) SyncIDs(ids []ComponentID) {129expectMap := make(map[string]ComponentID, len(ids))130for _, id := range ids {131expectMap[id.String()] = id132}133134vc.mut.Lock()135defer vc.mut.Unlock()136137for id := range vc.components {138if _, keep := expectMap[id]; keep {139continue140}141delete(vc.components, id)142delete(vc.args, id)143delete(vc.exports, id)144}145}146147// SyncModuleArgs will remove any cached values for any args no longer in the map.148func (vc *valueCache) SyncModuleArgs(args map[string]any) {149vc.mut.Lock()150defer vc.mut.Unlock()151152for id := range vc.moduleArguments {153if _, keep := args[id]; keep {154continue155}156delete(vc.moduleArguments, id)157}158}159160// BuildContext builds a vm.Scope based on the current set of cached values.161// The arguments and exports for the same ID are merged into one object.162func (vc *valueCache) BuildContext() *vm.Scope {163vc.mut.RLock()164defer vc.mut.RUnlock()165166scope := &vm.Scope{167Parent: nil,168Variables: make(map[string]interface{}),169}170171// First, partition components by River block name.172var componentsByBlockName = make(map[string][]ComponentID)173for _, id := range vc.components {174blockName := id[0]175componentsByBlockName[blockName] = append(componentsByBlockName[blockName], id)176}177178// Then, convert each partition into a single value.179for blockName, ids := range componentsByBlockName {180scope.Variables[blockName] = vc.buildValue(ids, 1)181}182183// Add module arguments to the scope.184if len(vc.moduleArguments) > 0 {185scope.Variables["argument"] = make(map[string]any)186}187for key, value := range vc.moduleArguments {188keyMap := make(map[string]any)189keyMap["value"] = value190191switch args := scope.Variables["argument"].(type) {192case map[string]any:193args[key] = keyMap194}195}196197return scope198}199200// buildValue recursively converts the set of user components into a single201// value. offset is used to determine which element in the userComponentName202// we're looking at.203func (vc *valueCache) buildValue(from []ComponentID, offset int) interface{} {204// We can't recurse anymore; return the node directly.205if len(from) == 1 && offset >= len(from[0]) {206name := from[0].String()207208// TODO(rfratto): should we allow arguments to be returned so users can209// reference arguments as well as exports?210exports, ok := vc.exports[name]211if !ok {212exports = make(map[string]interface{})213}214return exports215}216217attrs := make(map[string]interface{})218219// First, partition the components by their label.220var componentsByLabel = make(map[string][]ComponentID)221for _, id := range from {222blockName := id[offset]223componentsByLabel[blockName] = append(componentsByLabel[blockName], id)224}225226// Then, convert each partition into a single value.227for label, ids := range componentsByLabel {228attrs[label] = vc.buildValue(ids, offset+1)229}230return attrs231}232233234