Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/flow/internal/controller/component.go
4096 views
1
package controller
2
3
import (
4
"context"
5
"errors"
6
"fmt"
7
"net"
8
"net/http"
9
"path"
10
"path/filepath"
11
"reflect"
12
"strings"
13
"sync"
14
"time"
15
16
"github.com/go-kit/log/level"
17
"github.com/grafana/agent/component"
18
"github.com/grafana/agent/pkg/cluster"
19
"github.com/grafana/agent/pkg/flow/logging"
20
"github.com/grafana/agent/pkg/river/ast"
21
"github.com/grafana/agent/pkg/river/vm"
22
"github.com/prometheus/client_golang/prometheus"
23
"go.opentelemetry.io/otel/trace"
24
"go.uber.org/atomic"
25
)
26
27
// ComponentID is a fully-qualified name of a component. Each element in
28
// ComponentID corresponds to a fragment of the period-delimited string;
29
// "remote.http.example" is ComponentID{"remote", "http", "example"}.
30
type ComponentID []string
31
32
// BlockComponentID returns the ComponentID specified by an River block.
33
func BlockComponentID(b *ast.BlockStmt) ComponentID {
34
id := make(ComponentID, 0, len(b.Name)+1) // add 1 for the optional label
35
id = append(id, b.Name...)
36
if b.Label != "" {
37
id = append(id, b.Label)
38
}
39
return id
40
}
41
42
// String returns the string representation of a component ID.
43
func (id ComponentID) String() string {
44
return strings.Join(id, ".")
45
}
46
47
// Equals returns true if id == other.
48
func (id ComponentID) Equals(other ComponentID) bool {
49
if len(id) != len(other) {
50
return false
51
}
52
for i := 0; i < len(id); i++ {
53
if id[i] != other[i] {
54
return false
55
}
56
}
57
return true
58
}
59
60
// DialFunc is a function to establish a network connection.
61
type DialFunc func(ctx context.Context, network, address string) (net.Conn, error)
62
63
// ComponentGlobals are used by ComponentNodes to build managed components. All
64
// ComponentNodes should use the same ComponentGlobals.
65
type ComponentGlobals struct {
66
LogSink *logging.Sink // Sink used for Logging.
67
Logger *logging.Logger // Logger shared between all managed components.
68
TraceProvider trace.TracerProvider // Tracer shared between all managed components.
69
Clusterer *cluster.Clusterer // Clusterer shared between all managed components.
70
DataPath string // Shared directory where component data may be stored
71
OnComponentUpdate func(cn *ComponentNode) // Informs controller that we need to reevaluate
72
OnExportsChange func(exports map[string]any) // Invoked when the managed component updated its exports
73
Registerer prometheus.Registerer // Registerer for serving agent and component metrics
74
HTTPPathPrefix string // HTTP prefix for components.
75
HTTPListenAddr string // Base address for server
76
DialFunc DialFunc // Function to connect to HTTPListenAddr.
77
ControllerID string // ID of controller.
78
}
79
80
// ComponentNode is a controller node which manages a user-defined component.
81
//
82
// ComponentNode manages the underlying component and caches its current
83
// arguments and exports. ComponentNode manages the arguments for the component
84
// from a River block.
85
type ComponentNode struct {
86
id ComponentID
87
label string
88
componentName string
89
nodeID string // Cached from id.String() to avoid allocating new strings every time NodeID is called.
90
reg component.Registration
91
managedOpts component.Options
92
register *wrappedRegisterer
93
exportsType reflect.Type
94
OnComponentUpdate func(cn *ComponentNode) // Informs controller that we need to reevaluate
95
96
mut sync.RWMutex
97
block *ast.BlockStmt // Current River block to derive args from
98
eval *vm.Evaluator
99
managed component.Component // Inner managed component
100
args component.Arguments // Evaluated arguments for the managed component
101
102
doingEval atomic.Bool
103
104
// NOTE(rfratto): health and exports have their own mutex because they may be
105
// set asynchronously while mut is still being held (i.e., when calling Evaluate
106
// and the managed component immediately creates new exports)
107
108
healthMut sync.RWMutex
109
evalHealth component.Health // Health of the last evaluate
110
runHealth component.Health // Health of running the component
111
112
exportsMut sync.RWMutex
113
exports component.Exports // Evaluated exports for the managed component
114
}
115
116
var _ BlockNode = (*ComponentNode)(nil)
117
118
// NewComponentNode creates a new ComponentNode from an initial ast.BlockStmt.
119
// The underlying managed component isn't created until Evaluate is called.
120
func NewComponentNode(globals ComponentGlobals, b *ast.BlockStmt) *ComponentNode {
121
var (
122
id = BlockComponentID(b)
123
nodeID = id.String()
124
)
125
126
reg, ok := component.Get(ComponentID(b.Name).String())
127
if !ok {
128
// NOTE(rfratto): It's normally not possible to get to this point; the
129
// blocks should have been validated by the graph loader in advance to
130
// guarantee that b is an expected component.
131
panic("NewComponentNode: could not find registration for component " + nodeID)
132
}
133
134
initHealth := component.Health{
135
Health: component.HealthTypeUnknown,
136
Message: "component created",
137
UpdateTime: time.Now(),
138
}
139
140
cn := &ComponentNode{
141
id: id,
142
label: b.Label,
143
nodeID: nodeID,
144
componentName: strings.Join(b.Name, "."),
145
reg: reg,
146
exportsType: getExportsType(reg),
147
OnComponentUpdate: globals.OnComponentUpdate,
148
149
block: b,
150
eval: vm.New(b.Body),
151
152
// Prepopulate arguments and exports with their zero values.
153
args: reg.Args,
154
exports: reg.Exports,
155
156
evalHealth: initHealth,
157
runHealth: initHealth,
158
}
159
cn.managedOpts = getManagedOptions(globals, cn)
160
161
return cn
162
}
163
164
func getManagedOptions(globals ComponentGlobals, cn *ComponentNode) component.Options {
165
// Make sure the prefix is always absolute.
166
prefix := globals.HTTPPathPrefix
167
if !strings.HasPrefix(prefix, "/") {
168
prefix = "/" + prefix
169
}
170
171
// We need to generate a globally unique component ID to give to the
172
// component and for use with telemetry data which doesn't support
173
// reconstructing the global ID. For everything else (HTTP, data), we can
174
// just use the controller-local ID as those values are guaranteed to be
175
// globally unique.
176
globalID := cn.nodeID
177
if globals.ControllerID != "" {
178
globalID = path.Join(globals.ControllerID, cn.nodeID)
179
}
180
181
wrapped := newWrappedRegisterer()
182
cn.register = wrapped
183
return component.Options{
184
ID: globalID,
185
Logger: logging.New(logging.LoggerSink(globals.Logger), logging.WithComponentID(cn.nodeID)),
186
Registerer: prometheus.WrapRegistererWith(prometheus.Labels{
187
"component_id": globalID,
188
}, wrapped),
189
Tracer: wrapTracer(globals.TraceProvider, globalID),
190
Clusterer: globals.Clusterer,
191
192
DataPath: filepath.Join(globals.DataPath, cn.nodeID),
193
HTTPListenAddr: globals.HTTPListenAddr,
194
DialFunc: globals.DialFunc,
195
HTTPPath: path.Join(prefix, cn.nodeID) + "/",
196
197
OnStateChange: cn.setExports,
198
}
199
}
200
201
func getExportsType(reg component.Registration) reflect.Type {
202
if reg.Exports != nil {
203
return reflect.TypeOf(reg.Exports)
204
}
205
return nil
206
}
207
208
// ID returns the component ID of the managed component from its River block.
209
func (cn *ComponentNode) ID() ComponentID { return cn.id }
210
211
// Label returns the label for the block or "" if none was specified.
212
func (cn *ComponentNode) Label() string { return cn.label }
213
214
// ComponentName returns the component's type, i.e. `local.file.test` returns `local.file`.
215
func (cn *ComponentNode) ComponentName() string { return cn.componentName }
216
217
// NodeID implements dag.Node and returns the unique ID for this node. The
218
// NodeID is the string representation of the component's ID from its River
219
// block.
220
func (cn *ComponentNode) NodeID() string { return cn.nodeID }
221
222
// UpdateBlock updates the River block used to construct arguments for the
223
// managed component. The new block isn't used until the next time Evaluate is
224
// invoked.
225
//
226
// UpdateBlock will panic if the block does not match the component ID of the
227
// ComponentNode.
228
func (cn *ComponentNode) UpdateBlock(b *ast.BlockStmt) {
229
if !BlockComponentID(b).Equals(cn.id) {
230
panic("UpdateBlock called with an River block with a different component ID")
231
}
232
233
cn.mut.Lock()
234
defer cn.mut.Unlock()
235
cn.block = b
236
cn.eval = vm.New(b.Body)
237
}
238
239
// Evaluate implements BlockNode and updates the arguments for the managed component
240
// by re-evaluating its River block with the provided scope. The managed component
241
// will be built the first time Evaluate is called.
242
//
243
// Evaluate will return an error if the River block cannot be evaluated or if
244
// decoding to arguments fails.
245
func (cn *ComponentNode) Evaluate(scope *vm.Scope) error {
246
err := cn.evaluate(scope)
247
248
switch err {
249
case nil:
250
cn.setEvalHealth(component.HealthTypeHealthy, "component evaluated")
251
default:
252
msg := fmt.Sprintf("component evaluation failed: %s", err)
253
cn.setEvalHealth(component.HealthTypeUnhealthy, msg)
254
}
255
256
return err
257
}
258
259
// Reevaluate calls Update on the managed component with its last used
260
// arguments.Reevaluate does not build the component if it is not already built
261
// and does not re-evaluate the River block itself.
262
// Its only use case is for components opting-in to clustering where calling
263
// Update with the same Arguments may result in different functionality.
264
func (cn *ComponentNode) Reevaluate() error {
265
cn.mut.Lock()
266
defer cn.mut.Unlock()
267
268
cn.doingEval.Store(true)
269
defer cn.doingEval.Store(false)
270
271
if cn.managed == nil {
272
// We haven't built the managed component successfully yet.
273
return nil
274
}
275
276
// Update the existing managed component with the same arguments.
277
err := cn.managed.Update(cn.args)
278
279
switch err {
280
case nil:
281
cn.setEvalHealth(component.HealthTypeHealthy, "component evaluated")
282
return nil
283
default:
284
msg := fmt.Sprintf("component evaluation failed: %s", err)
285
cn.setEvalHealth(component.HealthTypeUnhealthy, msg)
286
return err
287
}
288
}
289
290
func (cn *ComponentNode) evaluate(scope *vm.Scope) error {
291
cn.mut.Lock()
292
defer cn.mut.Unlock()
293
294
cn.doingEval.Store(true)
295
defer cn.doingEval.Store(false)
296
297
argsPointer := cn.reg.CloneArguments()
298
if err := cn.eval.Evaluate(scope, argsPointer); err != nil {
299
return fmt.Errorf("decoding River: %w", err)
300
}
301
302
// args is always a pointer to the args type, so we want to deference it since
303
// components expect a non-pointer.
304
argsCopyValue := reflect.ValueOf(argsPointer).Elem().Interface()
305
306
if cn.managed == nil {
307
// We haven't built the managed component successfully yet.
308
managed, err := cn.reg.Build(cn.managedOpts, argsCopyValue)
309
if err != nil {
310
return fmt.Errorf("building component: %w", err)
311
}
312
cn.managed = managed
313
cn.args = argsCopyValue
314
315
return nil
316
}
317
318
if reflect.DeepEqual(cn.args, argsCopyValue) {
319
// Ignore components which haven't changed. This reduces the cost of
320
// calling evaluate for components where evaluation is expensive (e.g., if
321
// re-evaluating requires re-starting some internal logic).
322
return nil
323
}
324
325
// Update the existing managed component
326
if err := cn.managed.Update(argsCopyValue); err != nil {
327
return fmt.Errorf("updating component: %w", err)
328
}
329
330
cn.args = argsCopyValue
331
return nil
332
}
333
334
// Run runs the managed component in the calling goroutine until ctx is
335
// canceled. Evaluate must have been called at least once without retuning an
336
// error before calling Run.
337
//
338
// Run will immediately return ErrUnevaluated if Evaluate has never been called
339
// successfully. Otherwise, Run will return nil.
340
func (cn *ComponentNode) Run(ctx context.Context) error {
341
cn.mut.RLock()
342
managed := cn.managed
343
cn.mut.RUnlock()
344
345
if managed == nil {
346
return ErrUnevaluated
347
}
348
349
cn.setRunHealth(component.HealthTypeHealthy, "started component")
350
err := cn.managed.Run(ctx)
351
352
var exitMsg string
353
logger := cn.managedOpts.Logger
354
if err != nil {
355
level.Error(logger).Log("msg", "component exited with error", "err", err)
356
exitMsg = fmt.Sprintf("component shut down with error: %s", err)
357
} else {
358
level.Info(logger).Log("msg", "component exited")
359
exitMsg = "component shut down normally"
360
}
361
362
cn.setRunHealth(component.HealthTypeExited, exitMsg)
363
return err
364
}
365
366
// ErrUnevaluated is returned if ComponentNode.Run is called before a managed
367
// component is built.
368
var ErrUnevaluated = errors.New("managed component not built")
369
370
// Arguments returns the current arguments of the managed component.
371
func (cn *ComponentNode) Arguments() component.Arguments {
372
cn.mut.RLock()
373
defer cn.mut.RUnlock()
374
return cn.args
375
}
376
377
// Block implements BlockNode and returns the current block of the managed component.
378
func (cn *ComponentNode) Block() *ast.BlockStmt {
379
cn.mut.RLock()
380
defer cn.mut.RUnlock()
381
return cn.block
382
}
383
384
// Exports returns the current set of exports from the managed component.
385
// Exports returns nil if the managed component does not have exports.
386
func (cn *ComponentNode) Exports() component.Exports {
387
cn.exportsMut.RLock()
388
defer cn.exportsMut.RUnlock()
389
return cn.exports
390
}
391
392
// setExports is called whenever the managed component updates. e must be the
393
// same type as the registered exports type of the managed component.
394
func (cn *ComponentNode) setExports(e component.Exports) {
395
if cn.exportsType == nil {
396
panic(fmt.Sprintf("Component %s called OnStateChange but never registered an Exports type", cn.nodeID))
397
}
398
if reflect.TypeOf(e) != cn.exportsType {
399
panic(fmt.Sprintf("Component %s changed Exports types from %T to %T", cn.nodeID, cn.reg.Exports, e))
400
}
401
402
// Some components may aggressively reexport values even though no exposed
403
// state has changed. This may be done for components which always supply
404
// exports whenever their arguments are evaluated without tracking internal
405
// state to see if anything actually changed.
406
//
407
// To avoid needlessly reevaluating components we'll ignore unchanged
408
// exports.
409
var changed bool
410
411
cn.exportsMut.Lock()
412
if !reflect.DeepEqual(cn.exports, e) {
413
changed = true
414
cn.exports = e
415
}
416
cn.exportsMut.Unlock()
417
418
if cn.doingEval.Load() {
419
// Optimization edge case: some components supply exports when they're
420
// being evaluated.
421
//
422
// Since components that are being evaluated will always cause their
423
// dependencies to also be evaluated, there's no reason to call
424
// onExportsChange here.
425
return
426
}
427
428
if changed {
429
// Inform the controller that we have new exports.
430
cn.OnComponentUpdate(cn)
431
}
432
}
433
434
// CurrentHealth returns the current health of the ComponentNode.
435
//
436
// The health of a ComponentNode is determined by combining:
437
//
438
// 1. Health from the call to Run().
439
// 2. Health from the last call to Evaluate().
440
// 3. Health reported from the component.
441
func (cn *ComponentNode) CurrentHealth() component.Health {
442
cn.healthMut.RLock()
443
defer cn.healthMut.RUnlock()
444
445
var (
446
runHealth = cn.runHealth
447
evalHealth = cn.evalHealth
448
)
449
450
if hc, ok := cn.managed.(component.HealthComponent); ok {
451
componentHealth := hc.CurrentHealth()
452
return component.LeastHealthy(runHealth, evalHealth, componentHealth)
453
}
454
455
return component.LeastHealthy(runHealth, evalHealth)
456
}
457
458
// DebugInfo returns debugging information from the managed component (if any).
459
func (cn *ComponentNode) DebugInfo() interface{} {
460
cn.mut.RLock()
461
defer cn.mut.RUnlock()
462
463
if dc, ok := cn.managed.(component.DebugComponent); ok {
464
return dc.DebugInfo()
465
}
466
return nil
467
}
468
469
// setEvalHealth sets the internal health from a call to Evaluate. See Health
470
// for information on how overall health is calculated.
471
func (cn *ComponentNode) setEvalHealth(t component.HealthType, msg string) {
472
cn.healthMut.Lock()
473
defer cn.healthMut.Unlock()
474
475
cn.evalHealth = component.Health{
476
Health: t,
477
Message: msg,
478
UpdateTime: time.Now(),
479
}
480
}
481
482
// setRunHealth sets the internal health from a call to Run. See Health for
483
// information on how overall health is calculated.
484
func (cn *ComponentNode) setRunHealth(t component.HealthType, msg string) {
485
cn.healthMut.Lock()
486
defer cn.healthMut.Unlock()
487
488
cn.runHealth = component.Health{
489
Health: t,
490
Message: msg,
491
UpdateTime: time.Now(),
492
}
493
}
494
495
// HTTPHandler returns an http handler for a component IF it implements HTTPComponent.
496
// otherwise it will return nil.
497
func (cn *ComponentNode) HTTPHandler() http.Handler {
498
handler, ok := cn.managed.(component.HTTPComponent)
499
if !ok {
500
return nil
501
}
502
return handler.Handler()
503
}
504
505