Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/flow/internal/dag/walk.go
4095 views
1
package dag
2
3
// WalkFunc is a function that gets invoked when walking a Graph. Walking will
4
// stop if WalkFunc returns a non-nil error.
5
type WalkFunc func(n Node) error
6
7
// Walk performs a depth-first walk of outgoing edges for all nodes in start,
8
// invoking the provided fn for each node. Walk returns the error returned by
9
// fn.
10
//
11
// Nodes unreachable from start will not be passed to fn.
12
func Walk(g *Graph, start []Node, fn WalkFunc) error {
13
var (
14
visited = make(nodeSet)
15
unchecked = make([]Node, 0, len(start))
16
)
17
18
// Prefill the set of unchecked nodes with our start set.
19
unchecked = append(unchecked, start...)
20
21
// Iterate through unchecked nodes, visiting each in turn and adding outgoing
22
// edges to the unchecked list until all reachable nodes have been processed.
23
for len(unchecked) > 0 {
24
check := unchecked[len(unchecked)-1]
25
unchecked = unchecked[:len(unchecked)-1]
26
27
if visited.Has(check) {
28
continue
29
}
30
visited.Add(check)
31
32
if err := fn(check); err != nil {
33
return err
34
}
35
36
for n := range g.outEdges[check] {
37
unchecked = append(unchecked, n)
38
}
39
}
40
41
return nil
42
}
43
44
// WalkReverse performs a depth-first walk of incoming edges for all nodes in
45
// start, invoking the provided fn for each node. Walk returns the error
46
// returned by fn.
47
//
48
// Nodes unreachable from start will not be passed to fn.
49
func WalkReverse(g *Graph, start []Node, fn WalkFunc) error {
50
var (
51
visited = make(nodeSet)
52
unchecked = make([]Node, 0, len(start))
53
)
54
55
// Prefill the set of unchecked nodes with our start set.
56
unchecked = append(unchecked, start...)
57
58
// Iterate through unchecked nodes, visiting each in turn and adding incoming
59
// edges to the unchecked list until all reachable nodes have been processed.
60
for len(unchecked) > 0 {
61
check := unchecked[len(unchecked)-1]
62
unchecked = unchecked[:len(unchecked)-1]
63
64
if visited.Has(check) {
65
continue
66
}
67
visited.Add(check)
68
69
if err := fn(check); err != nil {
70
return err
71
}
72
73
for n := range g.inEdges[check] {
74
unchecked = append(unchecked, n)
75
}
76
}
77
78
return nil
79
}
80
81
// WalkTopological performs a topological walk of all nodes in start in
82
// dependency order: a node will not be visited until its outgoing edges are
83
// visited first.
84
//
85
// Nodes will not be passed to fn if they are not reachable from start or if
86
// not all of their outgoing edges are reachable from start.
87
func WalkTopological(g *Graph, start []Node, fn WalkFunc) error {
88
// NOTE(rfratto): WalkTopological is an implementation of Kahn's algorithm
89
// which leaves g unmodified.
90
91
var (
92
visited = make(nodeSet)
93
unchecked = make([]Node, 0, len(start))
94
95
remainingDeps = make(map[Node]int)
96
)
97
98
// Pre-fill the set of nodes to check from the start list.
99
unchecked = append(unchecked, start...)
100
101
for len(unchecked) > 0 {
102
check := unchecked[len(unchecked)-1]
103
unchecked = unchecked[:len(unchecked)-1]
104
105
if visited.Has(check) {
106
continue
107
}
108
visited.Add(check)
109
110
if err := fn(check); err != nil {
111
return err
112
}
113
114
// Iterate through the incoming edges to check and queue nodes if we're the
115
// last edge to be walked.
116
for n := range g.inEdges[check] {
117
// remainingDeps starts with the number of edges, and we subtract one for
118
// each outgoing edge that's visited.
119
if _, ok := remainingDeps[n]; !ok {
120
remainingDeps[n] = len(g.outEdges[n])
121
}
122
remainingDeps[n]--
123
124
// Only enqueue the incoming edge once all of its outgoing edges have
125
// been consumed. This prevents it from being visited before its
126
// dependencies.
127
if remainingDeps[n] == 0 {
128
unchecked = append(unchecked, n)
129
}
130
}
131
}
132
133
return nil
134
}
135
136