Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/flow/componenttest/componenttest.go
4094 views
1
// Package componenttest provides utilities for testing Flow components.
2
package componenttest
3
4
import (
5
"context"
6
"fmt"
7
"os"
8
"reflect"
9
"sync"
10
"time"
11
12
"github.com/prometheus/client_golang/prometheus"
13
"go.opentelemetry.io/otel/trace"
14
"go.uber.org/atomic"
15
16
"github.com/go-kit/log"
17
"github.com/grafana/agent/component"
18
"github.com/grafana/agent/pkg/flow/logging"
19
)
20
21
// A Controller is a testing controller which controls a single component.
22
type Controller struct {
23
reg component.Registration
24
log log.Logger
25
26
onRun sync.Once
27
running chan struct{}
28
runError atomic.Error
29
30
innerMut sync.Mutex
31
inner component.Component
32
33
exportsMut sync.Mutex
34
exports component.Exports
35
exportsCh chan struct{}
36
}
37
38
// NewControllerFromID returns a new testing Controller for the component with
39
// the provided name.
40
func NewControllerFromID(l log.Logger, componentName string) (*Controller, error) {
41
reg, ok := component.Get(componentName)
42
if !ok {
43
return nil, fmt.Errorf("no such component %q", componentName)
44
}
45
return NewControllerFromReg(l, reg), nil
46
}
47
48
// NewControllerFromReg registers a new testing Controller for a component with
49
// the given registration. This can be used for testing fake components which
50
// aren't really registered.
51
func NewControllerFromReg(l log.Logger, reg component.Registration) *Controller {
52
if l == nil {
53
l = log.NewNopLogger()
54
}
55
56
return &Controller{
57
reg: reg,
58
log: l,
59
60
running: make(chan struct{}, 1),
61
exportsCh: make(chan struct{}, 1),
62
}
63
}
64
65
func (c *Controller) onStateChange(e component.Exports) {
66
c.exportsMut.Lock()
67
changed := !reflect.DeepEqual(c.exports, e)
68
c.exports = e
69
c.exportsMut.Unlock()
70
71
if !changed {
72
return
73
}
74
75
select {
76
case c.exportsCh <- struct{}{}:
77
default:
78
}
79
}
80
81
// WaitRunning blocks until the Controller is running up to the provided
82
// timeout.
83
func (c *Controller) WaitRunning(timeout time.Duration) error {
84
select {
85
case <-time.After(timeout):
86
return fmt.Errorf("timed out waiting for the controller to start running")
87
case <-c.running:
88
if err := c.runError.Load(); err != nil {
89
return fmt.Errorf("component failed to start: %w", err)
90
}
91
return nil
92
}
93
}
94
95
// WaitExports blocks until new Exports are available up to the provided
96
// timeout.
97
func (c *Controller) WaitExports(timeout time.Duration) error {
98
select {
99
case <-time.After(timeout):
100
return fmt.Errorf("timed out waiting for exports")
101
case <-c.exportsCh:
102
return nil
103
}
104
}
105
106
// Exports gets the most recent exports for a component.
107
func (c *Controller) Exports() component.Exports {
108
c.exportsMut.Lock()
109
defer c.exportsMut.Unlock()
110
return c.exports
111
}
112
113
// Run starts the controller, building and running the component. Run blocks
114
// until ctx is canceled, the component exits, or if there was an error.
115
//
116
// Run may only be called once per Controller.
117
func (c *Controller) Run(ctx context.Context, args component.Arguments) error {
118
dataPath, err := os.MkdirTemp("", "controller-*")
119
if err != nil {
120
return err
121
}
122
defer func() {
123
_ = os.RemoveAll(dataPath)
124
}()
125
126
run, err := c.buildComponent(dataPath, args)
127
128
// We close c.running before checking the error, since the component will
129
// never run if we return an error anyway.
130
c.onRun.Do(func() {
131
c.runError.Store(err)
132
close(c.running)
133
})
134
135
if err != nil {
136
return err
137
}
138
return run.Run(ctx)
139
}
140
141
func (c *Controller) buildComponent(dataPath string, args component.Arguments) (component.Component, error) {
142
c.innerMut.Lock()
143
defer c.innerMut.Unlock()
144
145
writerAdapter := log.NewStdlibAdapter(c.log)
146
sink, err := logging.WriterSink(writerAdapter, logging.SinkOptions{
147
Level: logging.LevelDebug,
148
Format: logging.FormatLogfmt,
149
})
150
if err != nil {
151
return nil, err
152
}
153
154
opts := component.Options{
155
ID: c.reg.Name + ".test",
156
Logger: logging.New(sink),
157
Tracer: trace.NewNoopTracerProvider(),
158
DataPath: dataPath,
159
OnStateChange: c.onStateChange,
160
Registerer: prometheus.NewRegistry(),
161
}
162
163
inner, err := c.reg.Build(opts, args)
164
if err != nil {
165
return nil, err
166
}
167
168
c.inner = inner
169
return inner, nil
170
}
171
172
// Update updates the running component. Should only be called after Run.
173
func (c *Controller) Update(args component.Arguments) error {
174
c.innerMut.Lock()
175
defer c.innerMut.Unlock()
176
177
if c.inner == nil {
178
return fmt.Errorf("component is not running")
179
}
180
return c.inner.Update(args)
181
}
182
183