Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/traces/internal/traceutils/server.go
4096 views
1
package traceutils
2
3
import (
4
"context"
5
"fmt"
6
"math/rand"
7
"strings"
8
"testing"
9
"time"
10
11
"github.com/grafana/agent/pkg/util"
12
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanmetricsprocessor/mocks"
13
"github.com/stretchr/testify/assert"
14
"go.opentelemetry.io/collector/component"
15
"go.opentelemetry.io/collector/config"
16
"go.opentelemetry.io/collector/confmap"
17
"go.opentelemetry.io/collector/consumer"
18
"go.opentelemetry.io/collector/pdata/ptrace"
19
"go.opentelemetry.io/collector/receiver/otlpreceiver"
20
"go.opentelemetry.io/collector/service/external/configunmarshaler"
21
"go.opentelemetry.io/collector/service/external/pipelines"
22
"go.opentelemetry.io/otel/metric"
23
"go.opentelemetry.io/otel/trace"
24
"go.uber.org/zap"
25
"gopkg.in/yaml.v3"
26
)
27
28
// Server is a Tracing testing server that invokes a function every time a span
29
// is received.
30
type Server struct {
31
pipelines *pipelines.Pipelines
32
}
33
34
// NewTestServer creates a new Server for testing, where received traces will
35
// call the callback function. The returned string is the address where traces
36
// can be sent using OTLP.
37
func NewTestServer(t *testing.T, callback func(ptrace.Traces)) string {
38
t.Helper()
39
40
srv, listenAddr, err := NewServerWithRandomPort(callback)
41
if err != nil {
42
t.Fatalf("failed to create OTLP server: %s", err)
43
}
44
t.Cleanup(func() {
45
err := srv.Stop()
46
assert.NoError(t, err)
47
})
48
49
return listenAddr
50
}
51
52
// NewServerWithRandomPort calls NewServer with a random port >49152 and
53
// <65535. It will try up to five times before failing.
54
func NewServerWithRandomPort(callback func(ptrace.Traces)) (srv *Server, addr string, err error) {
55
var lastError error
56
57
for i := 0; i < 5; i++ {
58
port := rand.Intn(65535-49152) + 49152
59
listenAddr := fmt.Sprintf("127.0.0.1:%d", port)
60
61
srv, err = NewServer(listenAddr, callback)
62
if err != nil {
63
lastError = err
64
continue
65
}
66
67
return srv, listenAddr, nil
68
}
69
70
return nil, "", fmt.Errorf("failed 5 times to create a server. last error: %w", lastError)
71
}
72
73
// NewServer creates an OTLP-accepting server that calls a function when a
74
// trace is received. This is primarily useful for testing.
75
func NewServer(addr string, callback func(ptrace.Traces)) (*Server, error) {
76
conf := util.Untab(fmt.Sprintf(`
77
processors:
78
func_processor:
79
receivers:
80
otlp:
81
protocols:
82
grpc:
83
endpoint: %s
84
exporters:
85
noop:
86
service:
87
pipelines:
88
traces:
89
receivers: [otlp]
90
processors: [func_processor]
91
exporters: [noop]
92
`, addr))
93
94
var cfg map[string]interface{}
95
if err := yaml.NewDecoder(strings.NewReader(conf)).Decode(&cfg); err != nil {
96
panic("could not decode config: " + err.Error())
97
}
98
99
extensionsFactory, err := component.MakeExtensionFactoryMap()
100
if err != nil {
101
return nil, fmt.Errorf("failed to make extension factory map: %w", err)
102
}
103
104
receiversFactory, err := component.MakeReceiverFactoryMap(otlpreceiver.NewFactory())
105
if err != nil {
106
return nil, fmt.Errorf("failed to make receiver factory map: %w", err)
107
}
108
109
exportersFactory, err := component.MakeExporterFactoryMap(newNoopExporterFactory())
110
if err != nil {
111
return nil, fmt.Errorf("failed to make exporter factory map: %w", err)
112
}
113
114
processorsFactory, err := component.MakeProcessorFactoryMap(
115
newFuncProcessorFactory(callback),
116
)
117
if err != nil {
118
return nil, fmt.Errorf("failed to make processor factory map: %w", err)
119
}
120
121
factories := component.Factories{
122
Extensions: extensionsFactory,
123
Receivers: receiversFactory,
124
Processors: processorsFactory,
125
Exporters: exportersFactory,
126
}
127
128
configMap := confmap.NewFromStringMap(cfg)
129
otelCfg, err := configunmarshaler.Unmarshal(configMap, factories)
130
if err != nil {
131
return nil, fmt.Errorf("failed to make otel config: %w", err)
132
}
133
134
var (
135
logger = zap.NewNop()
136
startInfo component.BuildInfo
137
)
138
139
settings := component.TelemetrySettings{
140
Logger: logger,
141
TracerProvider: trace.NewNoopTracerProvider(),
142
MeterProvider: metric.NewNoopMeterProvider(),
143
}
144
145
pipelines, err := pipelines.Build(context.Background(), pipelines.Settings{
146
Telemetry: settings,
147
BuildInfo: startInfo,
148
149
ReceiverFactories: factories.Receivers,
150
ReceiverConfigs: otelCfg.Receivers,
151
ProcessorFactories: factories.Processors,
152
ProcessorConfigs: otelCfg.Processors,
153
ExporterFactories: factories.Exporters,
154
ExporterConfigs: otelCfg.Exporters,
155
156
PipelineConfigs: otelCfg.Pipelines,
157
})
158
if err != nil {
159
return nil, fmt.Errorf("failed to build pipelines: %w", err)
160
}
161
162
h := &mocks.Host{}
163
h.On("GetExtensions").Return(nil)
164
if err := pipelines.StartAll(context.Background(), h); err != nil {
165
return nil, fmt.Errorf("failed to start receivers: %w", err)
166
}
167
168
return &Server{
169
pipelines: pipelines,
170
}, nil
171
}
172
173
// Stop stops the testing server.
174
func (s *Server) Stop() error {
175
shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
176
defer cancel()
177
178
return s.pipelines.ShutdownAll(shutdownCtx)
179
}
180
181
func newFuncProcessorFactory(callback func(ptrace.Traces)) component.ProcessorFactory {
182
return component.NewProcessorFactory(
183
"func_processor",
184
func() config.Processor {
185
processorSettings := config.NewProcessorSettings(config.NewComponentIDWithName("func_processor", "func_processor"))
186
return &processorSettings
187
},
188
component.WithTracesProcessor(func(
189
_ context.Context,
190
_ component.ProcessorCreateSettings,
191
_ config.Processor,
192
next consumer.Traces,
193
) (component.TracesProcessor, error) {
194
195
return &funcProcessor{
196
Callback: callback,
197
Next: next,
198
}, nil
199
}, component.StabilityLevelUndefined),
200
)
201
}
202
203
type funcProcessor struct {
204
Callback func(ptrace.Traces)
205
Next consumer.Traces
206
}
207
208
func (p *funcProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
209
if p.Callback != nil {
210
p.Callback(td)
211
}
212
return p.Next.ConsumeTraces(ctx, td)
213
}
214
215
func (p *funcProcessor) Capabilities() consumer.Capabilities {
216
return consumer.Capabilities{MutatesData: true}
217
}
218
219
func (p *funcProcessor) Start(context.Context, component.Host) error { return nil }
220
func (p *funcProcessor) Shutdown(context.Context) error { return nil }
221
222
func newNoopExporterFactory() component.ExporterFactory {
223
return component.NewExporterFactory(
224
"noop",
225
func() config.Exporter {
226
exporterSettings := config.NewExporterSettings(config.NewComponentIDWithName("noop", "noop"))
227
return &exporterSettings
228
},
229
component.WithTracesExporter(func(
230
context.Context,
231
component.ExporterCreateSettings,
232
config.Exporter) (
233
component.TracesExporter,
234
error) {
235
236
return &noopExporter{}, nil
237
}, component.StabilityLevelUndefined),
238
)
239
}
240
241
type noopExporter struct{}
242
243
func (n noopExporter) Start(context.Context, component.Host) error { return nil }
244
245
func (n noopExporter) Shutdown(context.Context) error { return nil }
246
247
func (n noopExporter) Capabilities() consumer.Capabilities { return consumer.Capabilities{} }
248
249
func (n noopExporter) ConsumeTraces(context.Context, ptrace.Traces) error { return nil }
250
251