Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/flow/internal/controller/loader.go
4095 views
1
package controller
2
3
import (
4
"context"
5
"errors"
6
"fmt"
7
"sync"
8
"time"
9
10
"github.com/go-kit/log"
11
"github.com/go-kit/log/level"
12
"github.com/grafana/agent/component"
13
"github.com/grafana/agent/pkg/flow/internal/dag"
14
"github.com/grafana/agent/pkg/flow/logging"
15
"github.com/grafana/agent/pkg/river/ast"
16
"github.com/grafana/agent/pkg/river/diag"
17
"github.com/hashicorp/go-multierror"
18
"github.com/rfratto/ckit"
19
"github.com/rfratto/ckit/peer"
20
"go.opentelemetry.io/otel/attribute"
21
"go.opentelemetry.io/otel/codes"
22
"go.opentelemetry.io/otel/trace"
23
24
_ "github.com/grafana/agent/pkg/flow/internal/testcomponents" // Include test components
25
)
26
27
// The Loader builds and evaluates ComponentNodes from River blocks.
28
type Loader struct {
29
log *logging.Logger
30
tracer trace.TracerProvider
31
globals ComponentGlobals
32
33
mut sync.RWMutex
34
graph *dag.Graph
35
originalGraph *dag.Graph
36
components []*ComponentNode
37
cache *valueCache
38
blocks []*ast.BlockStmt // Most recently loaded blocks, used for writing
39
cm *controllerMetrics
40
moduleExportIndex int
41
}
42
43
// NewLoader creates a new Loader. Components built by the Loader will be built
44
// with co for their options.
45
func NewLoader(globals ComponentGlobals) *Loader {
46
l := &Loader{
47
log: globals.Logger,
48
tracer: globals.TraceProvider,
49
globals: globals,
50
51
graph: &dag.Graph{},
52
originalGraph: &dag.Graph{},
53
cache: newValueCache(),
54
cm: newControllerMetrics(globals.Registerer),
55
}
56
cc := newControllerCollector(l)
57
if globals.Registerer != nil {
58
globals.Registerer.MustRegister(cc)
59
}
60
61
globals.Clusterer.Node.Observe(ckit.FuncObserver(func(peers []peer.Peer) (reregister bool) {
62
tracer := l.tracer.Tracer("")
63
spanCtx, span := tracer.Start(context.Background(), "ClusterStateChange", trace.WithSpanKind(trace.SpanKindInternal))
64
defer span.End()
65
for _, cmp := range l.Components() {
66
if cc, ok := cmp.managed.(component.ClusteredComponent); ok {
67
if cc.ClusterUpdatesRegistration() {
68
_, span := tracer.Start(spanCtx, "ClusteredComponentReevaluation", trace.WithSpanKind(trace.SpanKindInternal))
69
span.SetAttributes(attribute.String("node_id", cmp.NodeID()))
70
defer span.End()
71
72
err := cmp.Reevaluate()
73
if err != nil {
74
level.Error(globals.Logger).Log("msg", "failed to reevaluate component", "componentID", cmp.NodeID(), "err", err)
75
}
76
}
77
}
78
}
79
return true
80
}))
81
82
return l
83
}
84
85
// Apply loads a new set of components into the Loader. Apply will drop any
86
// previously loaded component which is not described in the set of River
87
// blocks.
88
//
89
// Apply will reuse existing components if there is an existing component which
90
// matches the component ID specified by any of the provided River blocks.
91
// Reused components will be updated to point at the new River block.
92
//
93
// Apply will perform an evaluation of all loaded components before returning.
94
// The provided parentContext can be used to provide global variables and
95
// functions to components. A child context will be constructed from the parent
96
// to expose values of other components.
97
func (l *Loader) Apply(args map[string]any, componentBlocks []*ast.BlockStmt, configBlocks []*ast.BlockStmt) diag.Diagnostics {
98
start := time.Now()
99
l.mut.Lock()
100
defer l.mut.Unlock()
101
l.cm.controllerEvaluation.Set(1)
102
defer l.cm.controllerEvaluation.Set(0)
103
104
for key, value := range args {
105
l.cache.CacheModuleArgument(key, value)
106
}
107
l.cache.SyncModuleArgs(args)
108
109
newGraph, diags := l.loadNewGraph(args, componentBlocks, configBlocks)
110
if diags.HasErrors() {
111
return diags
112
}
113
114
var (
115
components = make([]*ComponentNode, 0, len(componentBlocks))
116
componentIDs = make([]ComponentID, 0, len(componentBlocks))
117
)
118
119
tracer := l.tracer.Tracer("")
120
spanCtx, span := tracer.Start(context.Background(), "GraphEvaluate", trace.WithSpanKind(trace.SpanKindInternal))
121
defer span.End()
122
123
logger := log.With(l.log, "trace_id", span.SpanContext().TraceID())
124
level.Info(logger).Log("msg", "starting complete graph evaluation")
125
defer func() {
126
span.SetStatus(codes.Ok, "")
127
128
duration := time.Since(start)
129
level.Info(logger).Log("msg", "finished complete graph evaluation", "duration", duration)
130
l.cm.componentEvaluationTime.Observe(duration.Seconds())
131
}()
132
133
l.cache.ClearModuleExports()
134
// Evaluate all the components.
135
_ = dag.WalkTopological(&newGraph, newGraph.Leaves(), func(n dag.Node) error {
136
_, span := tracer.Start(spanCtx, "EvaluateNode", trace.WithSpanKind(trace.SpanKindInternal))
137
span.SetAttributes(attribute.String("node_id", n.NodeID()))
138
defer span.End()
139
140
start := time.Now()
141
defer func() {
142
level.Info(logger).Log("msg", "finished node evaluation", "node_id", n.NodeID(), "duration", time.Since(start))
143
}()
144
145
var err error
146
147
switch c := n.(type) {
148
case *ComponentNode:
149
components = append(components, c)
150
componentIDs = append(componentIDs, c.ID())
151
152
if err = l.evaluate(logger, c); err != nil {
153
var evalDiags diag.Diagnostics
154
if errors.As(err, &evalDiags) {
155
diags = append(diags, evalDiags...)
156
} else {
157
diags.Add(diag.Diagnostic{
158
Severity: diag.SeverityLevelError,
159
Message: fmt.Sprintf("Failed to build component: %s", err),
160
StartPos: ast.StartPos(c.Block()).Position(),
161
EndPos: ast.EndPos(c.Block()).Position(),
162
})
163
}
164
}
165
case BlockNode:
166
if err = l.evaluate(logger, c); err != nil {
167
diags.Add(diag.Diagnostic{
168
Severity: diag.SeverityLevelError,
169
Message: fmt.Sprintf("Failed to evaluate node for config block: %s", err),
170
StartPos: ast.StartPos(c.Block()).Position(),
171
EndPos: ast.EndPos(c.Block()).Position(),
172
})
173
}
174
if exp, ok := n.(*ExportConfigNode); ok {
175
l.cache.CacheModuleExportValue(exp.Label(), exp.Value())
176
}
177
}
178
179
// We only use the error for updating the span status; we don't return the
180
// error because we want to evaluate as many nodes as we can.
181
if err != nil {
182
span.SetStatus(codes.Error, err.Error())
183
} else {
184
span.SetStatus(codes.Ok, "")
185
}
186
return nil
187
})
188
189
l.components = components
190
l.graph = &newGraph
191
l.cache.SyncIDs(componentIDs)
192
l.blocks = componentBlocks
193
l.cm.componentEvaluationTime.Observe(time.Since(start).Seconds())
194
if l.globals.OnExportsChange != nil && l.cache.ExportChangeIndex() != l.moduleExportIndex {
195
l.moduleExportIndex = l.cache.ExportChangeIndex()
196
l.globals.OnExportsChange(l.cache.CreateModuleExports())
197
}
198
return diags
199
}
200
201
// loadNewGraph creates a new graph from the provided blocks and validates it.
202
func (l *Loader) loadNewGraph(args map[string]any, componentBlocks []*ast.BlockStmt, configBlocks []*ast.BlockStmt) (dag.Graph, diag.Diagnostics) {
203
var g dag.Graph
204
// Fill our graph with config blocks.
205
diags := l.populateConfigBlockNodes(args, &g, configBlocks)
206
207
// Fill our graph with components.
208
componentNodeDiags := l.populateComponentNodes(&g, componentBlocks)
209
diags = append(diags, componentNodeDiags...)
210
211
// Write up the edges of the graph
212
wireDiags := l.wireGraphEdges(&g)
213
diags = append(diags, wireDiags...)
214
215
// Validate graph to detect cycles
216
err := dag.Validate(&g)
217
if err != nil {
218
diags = append(diags, multierrToDiags(err)...)
219
return g, diags
220
}
221
222
// Copy the original graph, this is so we can have access to the original graph for things like displaying a UI or
223
// debug information.
224
l.originalGraph = g.Clone()
225
// Perform a transitive reduction of the graph to clean it up.
226
dag.Reduce(&g)
227
228
return g, diags
229
}
230
231
// populateConfigBlockNodes adds any config blocks to the graph.
232
func (l *Loader) populateConfigBlockNodes(args map[string]any, g *dag.Graph, configBlocks []*ast.BlockStmt) diag.Diagnostics {
233
var (
234
diags diag.Diagnostics
235
nodeMap = NewConfigNodeMap()
236
)
237
238
for _, block := range configBlocks {
239
node, newConfigNodeDiags := NewConfigNode(block, l.globals)
240
diags = append(diags, newConfigNodeDiags...)
241
242
if g.GetByID(node.NodeID()) != nil {
243
diags.Add(diag.Diagnostic{
244
Severity: diag.SeverityLevelError,
245
Message: fmt.Sprintf("%q block already declared", node.NodeID()),
246
StartPos: ast.StartPos(block).Position(),
247
EndPos: ast.EndPos(block).Position(),
248
})
249
250
continue
251
}
252
253
nodeMapDiags := nodeMap.Append(node)
254
diags = append(diags, nodeMapDiags...)
255
if diags.HasErrors() {
256
continue
257
}
258
259
g.Add(node)
260
}
261
262
validateDiags := nodeMap.Validate(l.isModule(), args)
263
diags = append(diags, validateDiags...)
264
265
// If a logging config block is not provided, we create an empty node which uses defaults.
266
if nodeMap.logging == nil && !l.isModule() {
267
c := NewDefaultLoggingConfigNode(l.globals)
268
g.Add(c)
269
}
270
271
// If a tracing config block is not provided, we create an empty node which uses defaults.
272
if nodeMap.tracing == nil && !l.isModule() {
273
c := NewDefaulTracingConfigNode(l.globals)
274
g.Add(c)
275
}
276
277
return diags
278
}
279
280
// populateComponentNodes adds any components to the graph.
281
func (l *Loader) populateComponentNodes(g *dag.Graph, componentBlocks []*ast.BlockStmt) diag.Diagnostics {
282
var (
283
diags diag.Diagnostics
284
blockMap = make(map[string]*ast.BlockStmt, len(componentBlocks))
285
)
286
for _, block := range componentBlocks {
287
var c *ComponentNode
288
id := BlockComponentID(block).String()
289
290
if orig, redefined := blockMap[id]; redefined {
291
diags.Add(diag.Diagnostic{
292
Severity: diag.SeverityLevelError,
293
Message: fmt.Sprintf("Component %s already declared at %s", id, ast.StartPos(orig).Position()),
294
StartPos: block.NamePos.Position(),
295
EndPos: block.NamePos.Add(len(id) - 1).Position(),
296
})
297
continue
298
}
299
blockMap[id] = block
300
301
if exist := l.graph.GetByID(id); exist != nil {
302
// Re-use the existing component and update its block
303
c = exist.(*ComponentNode)
304
c.UpdateBlock(block)
305
} else {
306
componentName := block.GetBlockName()
307
registration, exists := component.Get(componentName)
308
if !exists {
309
diags.Add(diag.Diagnostic{
310
Severity: diag.SeverityLevelError,
311
Message: fmt.Sprintf("Unrecognized component name %q", componentName),
312
StartPos: block.NamePos.Position(),
313
EndPos: block.NamePos.Add(len(componentName) - 1).Position(),
314
})
315
continue
316
}
317
318
if registration.Singleton && block.Label != "" {
319
diags.Add(diag.Diagnostic{
320
Severity: diag.SeverityLevelError,
321
Message: fmt.Sprintf("Component %q does not support labels", componentName),
322
StartPos: block.LabelPos.Position(),
323
EndPos: block.LabelPos.Add(len(block.Label) + 1).Position(),
324
})
325
continue
326
}
327
328
if !registration.Singleton && block.Label == "" {
329
diags.Add(diag.Diagnostic{
330
Severity: diag.SeverityLevelError,
331
Message: fmt.Sprintf("Component %q must have a label", componentName),
332
StartPos: block.NamePos.Position(),
333
EndPos: block.NamePos.Add(len(componentName) - 1).Position(),
334
})
335
continue
336
}
337
338
if registration.Singleton && l.isModule() {
339
diags.Add(diag.Diagnostic{
340
Severity: diag.SeverityLevelError,
341
Message: fmt.Sprintf("Component %q is a singleton and unsupported inside a module", componentName),
342
StartPos: block.NamePos.Position(),
343
EndPos: block.NamePos.Add(len(componentName) - 1).Position(),
344
})
345
continue
346
}
347
348
// Create a new component
349
c = NewComponentNode(l.globals, block)
350
}
351
352
g.Add(c)
353
}
354
355
return diags
356
}
357
358
// Wire up all the related nodes
359
func (l *Loader) wireGraphEdges(g *dag.Graph) diag.Diagnostics {
360
var diags diag.Diagnostics
361
362
for _, n := range g.Nodes() {
363
refs, nodeDiags := ComponentReferences(n, g)
364
for _, ref := range refs {
365
g.AddEdge(dag.Edge{From: n, To: ref.Target})
366
}
367
diags = append(diags, nodeDiags...)
368
}
369
370
return diags
371
}
372
373
// Variables returns the Variables the Loader exposes for other Flow components
374
// to reference.
375
func (l *Loader) Variables() map[string]interface{} {
376
return l.cache.BuildContext().Variables
377
}
378
379
// Components returns the current set of loaded components.
380
func (l *Loader) Components() []*ComponentNode {
381
l.mut.RLock()
382
defer l.mut.RUnlock()
383
return l.components
384
}
385
386
// Graph returns a copy of the DAG managed by the Loader.
387
func (l *Loader) Graph() *dag.Graph {
388
l.mut.RLock()
389
defer l.mut.RUnlock()
390
return l.graph.Clone()
391
}
392
393
// OriginalGraph returns a copy of the graph before Reduce was called. This can be used if you want to show a UI of the
394
// original graph before the reduce function was called.
395
func (l *Loader) OriginalGraph() *dag.Graph {
396
l.mut.RLock()
397
defer l.mut.RUnlock()
398
return l.originalGraph.Clone()
399
}
400
401
// EvaluateDependencies re-evaluates components which depend directly or
402
// indirectly on c. EvaluateDependencies should be called whenever a component
403
// updates its exports.
404
//
405
// The provided parentContext can be used to provide global variables and
406
// functions to components. A child context will be constructed from the parent
407
// to expose values of other components.
408
func (l *Loader) EvaluateDependencies(c *ComponentNode) {
409
tracer := l.tracer.Tracer("")
410
411
l.mut.RLock()
412
defer l.mut.RUnlock()
413
414
l.cm.controllerEvaluation.Set(1)
415
defer l.cm.controllerEvaluation.Set(0)
416
start := time.Now()
417
418
spanCtx, span := tracer.Start(context.Background(), "GraphEvaluatePartial", trace.WithSpanKind(trace.SpanKindInternal))
419
span.SetAttributes(attribute.String("initiator", c.NodeID()))
420
defer span.End()
421
422
logger := log.With(l.log, "trace_id", span.SpanContext().TraceID())
423
level.Info(logger).Log("msg", "starting partial graph evaluation")
424
defer func() {
425
span.SetStatus(codes.Ok, "")
426
427
duration := time.Since(start)
428
level.Info(logger).Log("msg", "finished partial graph evaluation", "duration", duration)
429
l.cm.componentEvaluationTime.Observe(duration.Seconds())
430
}()
431
432
// Make sure we're in-sync with the current exports of c.
433
l.cache.CacheExports(c.ID(), c.Exports())
434
435
_ = dag.WalkReverse(l.graph, []dag.Node{c}, func(n dag.Node) error {
436
if n == c {
437
// Skip over the starting component; the starting component passed to
438
// EvaluateDependencies had its exports changed and none of its input
439
// arguments will need re-evaluation.
440
return nil
441
}
442
443
_, span := tracer.Start(spanCtx, "EvaluateNode", trace.WithSpanKind(trace.SpanKindInternal))
444
span.SetAttributes(attribute.String("node_id", n.NodeID()))
445
defer span.End()
446
447
var err error
448
449
switch n := n.(type) {
450
case BlockNode:
451
err = l.evaluate(logger, n)
452
if exp, ok := n.(*ExportConfigNode); ok {
453
l.cache.CacheModuleExportValue(exp.Label(), exp.Value())
454
}
455
}
456
457
// We only use the error for updating the span status; we don't return the
458
// error because we want to evaluate as many nodes as we can.
459
if err != nil {
460
span.SetStatus(codes.Error, err.Error())
461
} else {
462
span.SetStatus(codes.Ok, "")
463
}
464
return nil
465
})
466
467
if l.globals.OnExportsChange != nil && l.cache.ExportChangeIndex() != l.moduleExportIndex {
468
l.globals.OnExportsChange(l.cache.CreateModuleExports())
469
l.moduleExportIndex = l.cache.ExportChangeIndex()
470
}
471
}
472
473
// evaluate constructs the final context for the BlockNode and
474
// evaluates it. mut must be held when calling evaluate.
475
func (l *Loader) evaluate(logger log.Logger, bn BlockNode) error {
476
ectx := l.cache.BuildContext()
477
err := bn.Evaluate(ectx)
478
479
switch c := bn.(type) {
480
case *ComponentNode:
481
// Always update the cache both the arguments and exports, since both might
482
// change when a component gets re-evaluated. We also want to cache the arguments and exports in case of an error
483
l.cache.CacheArguments(c.ID(), c.Arguments())
484
l.cache.CacheExports(c.ID(), c.Exports())
485
case *ArgumentConfigNode:
486
if _, found := l.cache.moduleArguments[c.Label()]; !found {
487
if c.Optional() {
488
l.cache.CacheModuleArgument(c.Label(), c.Default())
489
} else {
490
err = fmt.Errorf("missing required argument %q to module", c.Label())
491
}
492
}
493
}
494
495
if err != nil {
496
level.Error(logger).Log("msg", "failed to evaluate config", "node", bn.NodeID(), "err", err)
497
return err
498
}
499
return nil
500
}
501
502
func multierrToDiags(errors error) diag.Diagnostics {
503
var diags diag.Diagnostics
504
for _, err := range errors.(*multierror.Error).Errors {
505
// TODO(rfratto): should this include position information?
506
diags.Add(diag.Diagnostic{
507
Severity: diag.SeverityLevelError,
508
Message: err.Error(),
509
})
510
}
511
return diags
512
}
513
514
// If the definition of a module ever changes, update this.
515
func (l *Loader) isModule() bool {
516
// Either 1 of these checks is technically sufficient but let's be extra careful.
517
return l.globals.OnExportsChange != nil && l.globals.ControllerID != ""
518
}
519
520