Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/flow/flow.go
4094 views
1
// Package flow implements the Flow component graph system. Flow configuration
2
// files are parsed from River, which contain a listing of components to run.
3
//
4
// # Components
5
//
6
// Each component has a set of arguments (River attributes and blocks) and
7
// optionally a set of exported fields. Components can reference the exports of
8
// other components using River expressions.
9
//
10
// See the top-level component package for more information on components, and
11
// subpackages for defined components.
12
//
13
// # Component Health
14
//
15
// A component will have various health states during its lifetime:
16
//
17
// 1. Unknown: The initial health state for new components.
18
// 2. Healthy: A healthy component
19
// 3. Unhealthy: An unhealthy component.
20
// 4. Exited: A component which is no longer running.
21
//
22
// Health states are paired with a time for when the health state was generated
23
// and a message providing more detail for the health state.
24
//
25
// Components can report their own health states. The health state reported by
26
// a component is merged with the Flow-level health of that component: an error
27
// when evaluating the configuration for a component will always be reported as
28
// unhealthy until the next successful evaluation.
29
//
30
// # Component Evaluation
31
//
32
// The process of converting the River block associated with a component into
33
// the appropriate Go struct is called "component evaluation."
34
//
35
// Components are only evaluated after all components they reference have been
36
// evaluated; cyclic dependencies are invalid.
37
//
38
// If a component updates its Exports at runtime, other components which directly
39
// or indirectly reference the updated component will have their Arguments
40
// re-evaluated.
41
//
42
// The arguments and exports for a component will be left in their last valid
43
// state if a component shuts down or is given an invalid config. This prevents
44
// a domino effect of a single failed component taking down other components
45
// which are otherwise healthy.
46
package flow
47
48
import (
49
"context"
50
"encoding/json"
51
"net"
52
"sync"
53
"time"
54
55
"github.com/go-kit/log/level"
56
"github.com/grafana/agent/pkg/cluster"
57
"github.com/grafana/agent/pkg/flow/internal/controller"
58
"github.com/grafana/agent/pkg/flow/internal/dag"
59
"github.com/grafana/agent/pkg/flow/logging"
60
"github.com/grafana/agent/pkg/flow/tracing"
61
"github.com/prometheus/client_golang/prometheus"
62
"go.uber.org/atomic"
63
)
64
65
// Options holds static options for a flow controller.
66
type Options struct {
67
// ControllerID is an identifier used to represent the controller.
68
// ControllerID is used to generate a globally unique display name for
69
// components in a binary where multiple controllers are used.
70
//
71
// If running multiple Flow controllers, each controller must have a
72
// different value for ControllerID to be able to differentiate between
73
// components in telemetry data.
74
ControllerID string
75
76
// LogSink to use for controller logs and components. A no-op logger will be
77
// created if this is nil.
78
LogSink *logging.Sink
79
80
// Tracer for components to use. A no-op tracer will be created if this is
81
// nil.
82
Tracer *tracing.Tracer
83
84
// Clusterer for implementing distributed behavior among components running
85
// on different nodes.
86
Clusterer *cluster.Clusterer
87
88
// Directory where components can write data. Constructed components will be
89
// given a subdirectory of DataPath using the local ID of the component.
90
//
91
// If running multiple Flow controllers, each controller must have a
92
// different value for DataPath to prevent components from colliding.
93
DataPath string
94
95
// Reg is the prometheus register to use
96
Reg prometheus.Registerer
97
98
// HTTPPathPrefix is the path prefix given to managed components. May be
99
// empty. When provided, it should be an absolute path.
100
//
101
// Components will be given a path relative to HTTPPathPrefix using their
102
// local ID.
103
//
104
// If running multiple Flow controllers, each controller must have a
105
// different value for HTTPPathPrefix to prevent components from colliding.
106
HTTPPathPrefix string
107
108
// HTTPListenAddr is the base address (host:port) where component APIs are
109
// exposed to other components.
110
HTTPListenAddr string
111
112
// OnExportsChange is called when the exports of the controller change.
113
// Exports are controlled by "export" configuration blocks. If
114
// OnExportsChange is nil, export configuration blocks are not allowed in the
115
// loaded config file.
116
OnExportsChange func(exports map[string]any)
117
118
// DialFunc is a function to use for components to properly connect to
119
// HTTPListenAddr. If nil, DialFunc defaults to (&net.Dialer{}).DialContext.
120
DialFunc func(ctx context.Context, network, address string) (net.Conn, error)
121
}
122
123
// Flow is the Flow system.
124
type Flow struct {
125
log *logging.Logger
126
tracer *tracing.Tracer
127
clusterer *cluster.Clusterer
128
opts Options
129
130
updateQueue *controller.Queue
131
sched *controller.Scheduler
132
loader *controller.Loader
133
134
loadFinished chan struct{}
135
136
loadMut sync.RWMutex
137
loadedOnce atomic.Bool
138
}
139
140
// New creates and starts a new Flow controller. Call Close to stop
141
// the controller.
142
func New(o Options) *Flow {
143
var (
144
log = logging.New(o.LogSink)
145
tracer = o.Tracer
146
clusterer = o.Clusterer
147
)
148
149
if tracer == nil {
150
var err error
151
tracer, err = tracing.New(tracing.DefaultOptions)
152
if err != nil {
153
// This shouldn't happen unless there's a bug
154
panic(err)
155
}
156
}
157
158
dialFunc := o.DialFunc
159
if dialFunc == nil {
160
dialFunc = (&net.Dialer{}).DialContext
161
}
162
163
var (
164
queue = controller.NewQueue()
165
sched = controller.NewScheduler()
166
loader = controller.NewLoader(controller.ComponentGlobals{
167
LogSink: o.LogSink,
168
Logger: log,
169
TraceProvider: tracer,
170
Clusterer: clusterer,
171
DataPath: o.DataPath,
172
OnComponentUpdate: func(cn *controller.ComponentNode) {
173
// Changed components should be queued for reevaluation.
174
queue.Enqueue(cn)
175
},
176
OnExportsChange: o.OnExportsChange,
177
Registerer: o.Reg,
178
HTTPPathPrefix: o.HTTPPathPrefix,
179
HTTPListenAddr: o.HTTPListenAddr,
180
DialFunc: dialFunc,
181
ControllerID: o.ControllerID,
182
})
183
)
184
return &Flow{
185
log: log,
186
tracer: tracer,
187
opts: o,
188
189
clusterer: clusterer,
190
updateQueue: queue,
191
sched: sched,
192
loader: loader,
193
194
loadFinished: make(chan struct{}, 1),
195
}
196
}
197
198
// Run starts the Flow controller, blocking until the provided context is
199
// canceled. Run must only be called once.
200
func (c *Flow) Run(ctx context.Context) {
201
defer c.sched.Close()
202
defer level.Debug(c.log).Log("msg", "flow controller exiting")
203
204
for {
205
select {
206
case <-ctx.Done():
207
return
208
209
case <-c.updateQueue.Chan():
210
// We need to pop _everything_ from the queue and evaluate each of them.
211
// If we only pop a single element, other components may sit waiting for
212
// evaluation forever.
213
for {
214
updated := c.updateQueue.TryDequeue()
215
if updated == nil {
216
break
217
}
218
219
level.Debug(c.log).Log("msg", "handling component with updated state", "node_id", updated.NodeID())
220
c.loader.EvaluateDependencies(updated)
221
}
222
223
case <-c.loadFinished:
224
level.Info(c.log).Log("msg", "scheduling loaded components")
225
226
components := c.loader.Components()
227
runnables := make([]controller.RunnableNode, 0, len(components))
228
for _, uc := range components {
229
runnables = append(runnables, uc)
230
}
231
err := c.sched.Synchronize(runnables)
232
if err != nil {
233
level.Error(c.log).Log("msg", "failed to load components", "err", err)
234
}
235
}
236
}
237
}
238
239
// LoadFile synchronizes the state of the controller with the current config
240
// file. Components in the graph will be marked as unhealthy if there was an
241
// error encountered during Load.
242
//
243
// The controller will only start running components after Load is called once
244
// without any configuration errors.
245
func (c *Flow) LoadFile(file *File, args map[string]any) error {
246
c.loadMut.Lock()
247
defer c.loadMut.Unlock()
248
249
diags := c.loader.Apply(args, file.Components, file.ConfigBlocks)
250
if !c.loadedOnce.Load() && diags.HasErrors() {
251
// The first call to Load should not run any components if there were
252
// errors in the configuration file.
253
return diags
254
}
255
c.loadedOnce.Store(true)
256
257
select {
258
case c.loadFinished <- struct{}{}:
259
default:
260
// A refresh is already scheduled
261
}
262
return diags.ErrorOrNil()
263
}
264
265
// Ready returns whether the Flow controller has finished its initial load.
266
func (c *Flow) Ready() bool {
267
return c.loadedOnce.Load()
268
}
269
270
// ComponentInfos returns the component infos.
271
func (c *Flow) ComponentInfos() []*ComponentInfo {
272
c.loadMut.RLock()
273
defer c.loadMut.RUnlock()
274
275
cns := c.loader.Components()
276
infos := make([]*ComponentInfo, len(cns))
277
edges := c.loader.OriginalGraph().Edges()
278
for i, com := range cns {
279
nn := newFromNode(com, edges)
280
infos[i] = nn
281
}
282
return infos
283
}
284
285
func newFromNode(cn *controller.ComponentNode, edges []dag.Edge) *ComponentInfo {
286
references := make([]string, 0)
287
referencedBy := make([]string, 0)
288
for _, e := range edges {
289
// Skip over any edge which isn't between two component nodes. This is a
290
// temporary workaround needed until there's the concept of configuration
291
// blocks from the API.
292
//
293
// Without this change, the graph fails to render when a configuration
294
// block is referenced in the graph.
295
//
296
// TODO(rfratto): add support for config block nodes in the API and UI.
297
if !isComponentNode(e.From) || !isComponentNode(e.To) {
298
continue
299
}
300
301
if e.From.NodeID() == cn.NodeID() {
302
references = append(references, e.To.NodeID())
303
} else if e.To.NodeID() == cn.NodeID() {
304
referencedBy = append(referencedBy, e.From.NodeID())
305
}
306
}
307
h := cn.CurrentHealth()
308
ci := &ComponentInfo{
309
Label: cn.Label(),
310
ID: cn.NodeID(),
311
Name: cn.ComponentName(),
312
Type: "block",
313
References: references,
314
ReferencedBy: referencedBy,
315
Health: &ComponentHealth{
316
State: h.Health.String(),
317
Message: h.Message,
318
UpdatedTime: h.UpdateTime,
319
},
320
}
321
return ci
322
}
323
324
func isComponentNode(n dag.Node) bool {
325
_, ok := n.(*controller.ComponentNode)
326
return ok
327
}
328
329
// ComponentInfo represents a component in flow.
330
type ComponentInfo struct {
331
Name string `json:"name,omitempty"`
332
Type string `json:"type,omitempty"`
333
ID string `json:"id,omitempty"`
334
Label string `json:"label,omitempty"`
335
References []string `json:"referencesTo"`
336
ReferencedBy []string `json:"referencedBy"`
337
Health *ComponentHealth `json:"health"`
338
Original string `json:"original"`
339
Arguments json.RawMessage `json:"arguments,omitempty"`
340
Exports json.RawMessage `json:"exports,omitempty"`
341
DebugInfo json.RawMessage `json:"debugInfo,omitempty"`
342
}
343
344
// ComponentHealth represents the health of a component.
345
type ComponentHealth struct {
346
State string `json:"state"`
347
Message string `json:"message"`
348
UpdatedTime time.Time `json:"updatedTime"`
349
}
350
351