Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/flow/internal/controller/loader_test.go
4095 views
1
package controller_test
2
3
import (
4
"errors"
5
"io"
6
"strings"
7
"testing"
8
9
"github.com/grafana/agent/pkg/cluster"
10
"github.com/grafana/agent/pkg/flow/internal/controller"
11
"github.com/grafana/agent/pkg/flow/internal/dag"
12
"github.com/grafana/agent/pkg/flow/logging"
13
"github.com/grafana/agent/pkg/river/ast"
14
"github.com/grafana/agent/pkg/river/diag"
15
"github.com/grafana/agent/pkg/river/parser"
16
"github.com/prometheus/client_golang/prometheus"
17
"github.com/stretchr/testify/require"
18
"go.opentelemetry.io/otel/trace"
19
)
20
21
func TestLoader(t *testing.T) {
22
testFile := `
23
testcomponents.tick "ticker" {
24
frequency = "1s"
25
}
26
27
testcomponents.passthrough "static" {
28
input = "hello, world!"
29
}
30
31
testcomponents.passthrough "ticker" {
32
input = testcomponents.tick.ticker.tick_time
33
}
34
35
testcomponents.passthrough "forwarded" {
36
input = testcomponents.passthrough.ticker.output
37
}
38
`
39
40
testConfig := `
41
logging {
42
level = "debug"
43
format = "logfmt"
44
}
45
46
tracing {
47
sampling_fraction = 1
48
}
49
`
50
51
// corresponds to testFile
52
testGraphDefinition := graphDefinition{
53
Nodes: []string{
54
"testcomponents.tick.ticker",
55
"testcomponents.passthrough.static",
56
"testcomponents.passthrough.ticker",
57
"testcomponents.passthrough.forwarded",
58
"logging",
59
"tracing",
60
},
61
OutEdges: []edge{
62
{From: "testcomponents.passthrough.ticker", To: "testcomponents.tick.ticker"},
63
{From: "testcomponents.passthrough.forwarded", To: "testcomponents.passthrough.ticker"},
64
},
65
}
66
67
newGlobals := func() controller.ComponentGlobals {
68
return controller.ComponentGlobals{
69
LogSink: noOpSink(),
70
Logger: logging.New(nil),
71
TraceProvider: trace.NewNoopTracerProvider(),
72
Clusterer: noOpClusterer(),
73
DataPath: t.TempDir(),
74
OnComponentUpdate: func(cn *controller.ComponentNode) { /* no-op */ },
75
Registerer: prometheus.NewRegistry(),
76
}
77
}
78
79
t.Run("New Graph", func(t *testing.T) {
80
l := controller.NewLoader(newGlobals())
81
diags := applyFromContent(t, l, []byte(testFile), []byte(testConfig))
82
require.NoError(t, diags.ErrorOrNil())
83
requireGraph(t, l.Graph(), testGraphDefinition)
84
})
85
86
t.Run("New Graph No Config", func(t *testing.T) {
87
l := controller.NewLoader(newGlobals())
88
diags := applyFromContent(t, l, []byte(testFile), nil)
89
require.NoError(t, diags.ErrorOrNil())
90
requireGraph(t, l.Graph(), testGraphDefinition)
91
})
92
93
t.Run("Copy existing components and delete stale ones", func(t *testing.T) {
94
startFile := `
95
// Component that should be copied over to the new graph
96
testcomponents.tick "ticker" {
97
frequency = "1s"
98
}
99
100
// Component that will not exist in the new graph
101
testcomponents.tick "remove_me" {
102
frequency = "1m"
103
}
104
`
105
l := controller.NewLoader(newGlobals())
106
diags := applyFromContent(t, l, []byte(startFile), []byte(testConfig))
107
origGraph := l.Graph()
108
require.NoError(t, diags.ErrorOrNil())
109
110
diags = applyFromContent(t, l, []byte(testFile), []byte(testConfig))
111
require.NoError(t, diags.ErrorOrNil())
112
newGraph := l.Graph()
113
114
// Ensure that nodes were copied over and not recreated
115
require.Equal(t, origGraph.GetByID("testcomponents.tick.ticker"), newGraph.GetByID("testcomponents.tick.ticker"))
116
require.Nil(t, newGraph.GetByID("testcomponents.tick.remove_me")) // The new graph shouldn't have the old node
117
})
118
119
t.Run("Load with invalid components", func(t *testing.T) {
120
invalidFile := `
121
doesnotexist "bad_component" {
122
}
123
`
124
l := controller.NewLoader(newGlobals())
125
diags := applyFromContent(t, l, []byte(invalidFile), nil)
126
require.ErrorContains(t, diags.ErrorOrNil(), `Unrecognized component name "doesnotexist`)
127
})
128
129
t.Run("Partial load with invalid reference", func(t *testing.T) {
130
invalidFile := `
131
testcomponents.tick "ticker" {
132
frequency = "1s"
133
}
134
135
testcomponents.passthrough "valid" {
136
input = testcomponents.tick.ticker.tick_time
137
}
138
139
testcomponents.passthrough "invalid" {
140
input = testcomponents.tick.doesnotexist.tick_time
141
}
142
`
143
l := controller.NewLoader(newGlobals())
144
diags := applyFromContent(t, l, []byte(invalidFile), nil)
145
require.Error(t, diags.ErrorOrNil())
146
147
requireGraph(t, l.Graph(), graphDefinition{
148
Nodes: nil,
149
OutEdges: nil,
150
})
151
})
152
153
t.Run("File has cycles", func(t *testing.T) {
154
invalidFile := `
155
testcomponents.tick "ticker" {
156
frequency = "1s"
157
}
158
159
testcomponents.passthrough "static" {
160
input = testcomponents.passthrough.forwarded.output
161
}
162
163
testcomponents.passthrough "ticker" {
164
input = testcomponents.passthrough.static.output
165
}
166
167
testcomponents.passthrough "forwarded" {
168
input = testcomponents.passthrough.ticker.output
169
}
170
`
171
l := controller.NewLoader(newGlobals())
172
diags := applyFromContent(t, l, []byte(invalidFile), nil)
173
require.Error(t, diags.ErrorOrNil())
174
})
175
176
t.Run("Handling of singleton component labels", func(t *testing.T) {
177
invalidFile := `
178
testcomponents.tick {
179
}
180
testcomponents.singleton "first" {
181
}
182
`
183
l := controller.NewLoader(newGlobals())
184
diags := applyFromContent(t, l, []byte(invalidFile), nil)
185
require.ErrorContains(t, diags[0], `Component "testcomponents.tick" must have a label`)
186
require.ErrorContains(t, diags[1], `Component "testcomponents.singleton" does not support labels`)
187
})
188
}
189
190
// TestScopeWithFailingComponent is used to ensure that the scope is filled out, even if the component
191
// fails to properly start.
192
func TestScopeWithFailingComponent(t *testing.T) {
193
testFile := `
194
testcomponents.tick "ticker" {
195
frequenc = "1s"
196
}
197
198
testcomponents.passthrough "static" {
199
input = "hello, world!"
200
}
201
202
testcomponents.passthrough "ticker" {
203
input = testcomponents.tick.ticker.tick_time
204
}
205
206
testcomponents.passthrough "forwarded" {
207
input = testcomponents.passthrough.ticker.output
208
}
209
`
210
newGlobals := func() controller.ComponentGlobals {
211
return controller.ComponentGlobals{
212
LogSink: noOpSink(),
213
Logger: logging.New(nil),
214
TraceProvider: trace.NewNoopTracerProvider(),
215
DataPath: t.TempDir(),
216
OnComponentUpdate: func(cn *controller.ComponentNode) { /* no-op */ },
217
Registerer: prometheus.NewRegistry(),
218
Clusterer: noOpClusterer(),
219
}
220
}
221
222
l := controller.NewLoader(newGlobals())
223
diags := applyFromContent(t, l, []byte(testFile), nil)
224
require.Error(t, diags.ErrorOrNil())
225
require.Len(t, diags, 1)
226
require.True(t, strings.Contains(diags.Error(), `unrecognized attribute name "frequenc"`))
227
}
228
229
func noOpSink() *logging.Sink {
230
s, _ := logging.WriterSink(io.Discard, logging.DefaultSinkOptions)
231
return s
232
}
233
234
func noOpClusterer() *cluster.Clusterer {
235
return &cluster.Clusterer{Node: cluster.NewLocalNode("")}
236
}
237
238
func applyFromContent(t *testing.T, l *controller.Loader, componentBytes []byte, configBytes []byte) diag.Diagnostics {
239
t.Helper()
240
241
var (
242
diags diag.Diagnostics
243
componentBlocks []*ast.BlockStmt
244
configBlocks []*ast.BlockStmt = nil
245
)
246
247
componentBlocks, diags = fileToBlock(t, componentBytes)
248
if diags.HasErrors() {
249
return diags
250
}
251
252
if string(configBytes) != "" {
253
configBlocks, diags = fileToBlock(t, configBytes)
254
if diags.HasErrors() {
255
return diags
256
}
257
}
258
259
applyDiags := l.Apply(nil, componentBlocks, configBlocks)
260
diags = append(diags, applyDiags...)
261
262
return diags
263
}
264
265
func fileToBlock(t *testing.T, bytes []byte) ([]*ast.BlockStmt, diag.Diagnostics) {
266
var diags diag.Diagnostics
267
file, err := parser.ParseFile(t.Name(), bytes)
268
269
var parseDiags diag.Diagnostics
270
if errors.As(err, &parseDiags); parseDiags.HasErrors() {
271
return nil, parseDiags
272
}
273
274
var blocks []*ast.BlockStmt
275
for _, stmt := range file.Body {
276
switch stmt := stmt.(type) {
277
case *ast.BlockStmt:
278
blocks = append(blocks, stmt)
279
default:
280
diags = append(diags, diag.Diagnostic{
281
Severity: diag.SeverityLevelError,
282
Message: "unexpected statement",
283
StartPos: ast.StartPos(stmt).Position(),
284
EndPos: ast.EndPos(stmt).Position(),
285
})
286
}
287
}
288
289
return blocks, diags
290
}
291
292
type graphDefinition struct {
293
Nodes []string
294
OutEdges []edge
295
}
296
297
type edge struct{ From, To string }
298
299
func requireGraph(t *testing.T, g *dag.Graph, expect graphDefinition) {
300
t.Helper()
301
302
var (
303
actualNodes []string
304
actualEdges []edge
305
)
306
307
for _, n := range g.Nodes() {
308
actualNodes = append(actualNodes, n.NodeID())
309
}
310
require.ElementsMatch(t, expect.Nodes, actualNodes, "List of nodes do not match")
311
312
for _, e := range g.Edges() {
313
actualEdges = append(actualEdges, edge{
314
From: e.From.NodeID(),
315
To: e.To.NodeID(),
316
})
317
}
318
require.ElementsMatch(t, expect.OutEdges, actualEdges, "List of edges do not match")
319
}
320
321