Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/otelcol/processor/tail_sampling/tail_sampling_test.go
4096 views
1
//go:build !race
2
3
package tail_sampling
4
5
import (
6
"context"
7
"testing"
8
"time"
9
10
"github.com/go-kit/log/level"
11
"github.com/grafana/agent/component/otelcol"
12
"github.com/grafana/agent/component/otelcol/internal/fakeconsumer"
13
"github.com/grafana/agent/pkg/flow/componenttest"
14
"github.com/grafana/agent/pkg/river"
15
"github.com/grafana/agent/pkg/util"
16
"github.com/grafana/dskit/backoff"
17
"github.com/stretchr/testify/require"
18
"go.opentelemetry.io/collector/pdata/ptrace"
19
)
20
21
func TestBadRiverConfig(t *testing.T) {
22
exampleBadRiverConfig := `
23
decision_wait = "10s"
24
num_traces = 0
25
expected_new_traces_per_sec = 10
26
policy {
27
name = "test-policy-1"
28
type = "always_sample"
29
}
30
output {
31
// no-op: will be overridden by test code.
32
}
33
`
34
35
var args Arguments
36
require.Error(t, river.Unmarshal([]byte(exampleBadRiverConfig), &args), "num_traces must be greater than zero")
37
}
38
39
func TestBadOtelConfig(t *testing.T) {
40
var exampleBadOtelConfig = `
41
decision_wait = "10s"
42
num_traces = 100
43
expected_new_traces_per_sec = 10
44
policy {
45
name = "test-policy-1"
46
type = "bad_type"
47
}
48
output {
49
// no-op: will be overridden by test code.
50
}
51
`
52
53
ctx := componenttest.TestContext(t)
54
l := util.TestLogger(t)
55
56
ctrl, err := componenttest.NewControllerFromID(l, "otelcol.processor.tail_sampling")
57
require.NoError(t, err)
58
59
var args Arguments
60
require.NoError(t, river.Unmarshal([]byte(exampleBadOtelConfig), &args))
61
62
// Override our arguments so traces get forwarded to traceCh.
63
traceCh := make(chan ptrace.Traces)
64
args.Output = makeTracesOutput(traceCh)
65
66
go func() {
67
err := ctrl.Run(ctx, args)
68
require.Error(t, err, "unknown sampling policy type bad_type")
69
}()
70
71
require.Error(t, ctrl.WaitRunning(time.Second), "component never started")
72
}
73
74
func TestBigConfig(t *testing.T) {
75
exampleBigConfig := `
76
decision_wait = "10s"
77
num_traces = 100
78
expected_new_traces_per_sec = 10
79
policy {
80
name = "test-policy-1"
81
type = "always_sample"
82
}
83
policy {
84
name = "test-policy-2"
85
type = "latency"
86
latency {
87
threshold_ms = 5000
88
}
89
}
90
policy {
91
name = "test-policy-3"
92
type = "numeric_attribute"
93
numeric_attribute {
94
key = "key1"
95
min_value = 50
96
max_value = 100
97
}
98
}
99
policy {
100
name = "test-policy-4"
101
type = "probabilistic"
102
probabilistic {
103
sampling_percentage = 10
104
}
105
}
106
policy {
107
name = "test-policy-5"
108
type = "status_code"
109
status_code {
110
status_codes = ["ERROR", "UNSET"]
111
}
112
}
113
policy {
114
name = "test-policy-6"
115
type = "string_attribute"
116
string_attribute {
117
key = "key2"
118
values = ["value1", "value2"]
119
}
120
}
121
policy {
122
name = "test-policy-7"
123
type = "string_attribute"
124
string_attribute {
125
key = "key2"
126
values = ["value1", "val*"]
127
enabled_regex_matching = true
128
cache_max_size = 10
129
}
130
}
131
policy {
132
name = "test-policy-8"
133
type = "rate_limiting"
134
rate_limiting {
135
spans_per_second = 35
136
}
137
}
138
policy {
139
name = "test-policy-9"
140
type = "string_attribute"
141
string_attribute {
142
key = "http.url"
143
values = ["/health", "/metrics"]
144
enabled_regex_matching = true
145
invert_match = true
146
}
147
}
148
policy {
149
name = "test-policy-10"
150
type = "span_count"
151
span_count {
152
min_spans = 2
153
}
154
}
155
policy {
156
name = "test-policy-11"
157
type = "trace_state"
158
trace_state {
159
key = "key3"
160
values = ["value1", "value2"]
161
}
162
}
163
policy{
164
name = "and-policy-1"
165
type = "and"
166
and {
167
and_sub_policy {
168
name = "test-and-policy-1"
169
type = "numeric_attribute"
170
numeric_attribute {
171
key = "key1"
172
min_value = 50
173
max_value = 100
174
}
175
}
176
and_sub_policy {
177
name = "test-and-policy-2"
178
type = "string_attribute"
179
string_attribute {
180
key = "key1"
181
values = ["value1", "value2"]
182
}
183
}
184
}
185
}
186
policy{
187
name = "composite-policy-1"
188
type = "composite"
189
composite {
190
max_total_spans_per_second = 1000
191
policy_order = ["test-composite-policy-1", "test-composite-policy-2", "test-composite-policy-3"]
192
composite_sub_policy {
193
name = "test-composite-policy-1"
194
type = "numeric_attribute"
195
numeric_attribute {
196
key = "key1"
197
min_value = 50
198
max_value = 100
199
}
200
}
201
composite_sub_policy {
202
name = "test-composite-policy-2"
203
type = "string_attribute"
204
string_attribute {
205
key = "key1"
206
values = ["value1", "value2"]
207
}
208
}
209
composite_sub_policy {
210
name = "test-composite-policy-3"
211
type = "always_sample"
212
}
213
rate_allocation {
214
policy = "test-composite-policy-1"
215
percent = 50
216
}
217
rate_allocation {
218
policy = "test-composite-policy-2"
219
percent = 50
220
}
221
}
222
}
223
224
output {
225
// no-op: will be overridden by test code.
226
}
227
`
228
229
ctx := componenttest.TestContext(t)
230
l := util.TestLogger(t)
231
232
ctrl, err := componenttest.NewControllerFromID(l, "otelcol.processor.tail_sampling")
233
require.NoError(t, err)
234
235
var args Arguments
236
require.NoError(t, river.Unmarshal([]byte(exampleBigConfig), &args))
237
238
// Override our arguments so traces get forwarded to traceCh.
239
traceCh := make(chan ptrace.Traces)
240
args.Output = makeTracesOutput(traceCh)
241
242
go func() {
243
err := ctrl.Run(ctx, args)
244
require.NoError(t, err)
245
}()
246
247
require.NoError(t, ctrl.WaitRunning(time.Second), "component never started")
248
require.NoError(t, ctrl.WaitExports(time.Second), "component never exported anything")
249
}
250
251
func TestTraceProcessing(t *testing.T) {
252
exampleSmallConfig := `
253
decision_wait = "1s"
254
num_traces = 1
255
expected_new_traces_per_sec = 1
256
policy {
257
name = "test-policy-1"
258
type = "always_sample"
259
}
260
output {
261
// no-op: will be overridden by test code.
262
}
263
`
264
ctx := componenttest.TestContext(t)
265
l := util.TestLogger(t)
266
267
ctrl, err := componenttest.NewControllerFromID(l, "otelcol.processor.tail_sampling")
268
require.NoError(t, err)
269
270
var args Arguments
271
require.NoError(t, river.Unmarshal([]byte(exampleSmallConfig), &args))
272
273
// Override our arguments so traces get forwarded to traceCh.
274
traceCh := make(chan ptrace.Traces)
275
args.Output = makeTracesOutput(traceCh)
276
277
go func() {
278
err := ctrl.Run(ctx, args)
279
require.NoError(t, err)
280
}()
281
282
require.NoError(t, ctrl.WaitRunning(time.Second), "component never started")
283
require.NoError(t, ctrl.WaitExports(time.Second), "component never exported anything")
284
285
// Send traces in the background to our processor.
286
go func() {
287
exports := ctrl.Exports().(otelcol.ConsumerExports)
288
289
exports.Input.Capabilities()
290
291
bo := backoff.New(ctx, backoff.Config{
292
MinBackoff: 10 * time.Millisecond,
293
MaxBackoff: 100 * time.Millisecond,
294
})
295
for bo.Ongoing() {
296
err := exports.Input.ConsumeTraces(ctx, createTestTraces())
297
if err != nil {
298
level.Error(l).Log("msg", "failed to send traces", "err", err)
299
bo.Wait()
300
continue
301
}
302
303
return
304
}
305
}()
306
307
// Wait for our processor to finish and forward data to traceCh.
308
select {
309
case <-time.After(time.Second * 10):
310
require.FailNow(t, "failed waiting for traces")
311
case tr := <-traceCh:
312
require.Equal(t, 1, tr.SpanCount())
313
}
314
}
315
316
// makeTracesOutput returns ConsumerArguments which will forward traces to the
317
// provided channel.
318
func makeTracesOutput(ch chan ptrace.Traces) *otelcol.ConsumerArguments {
319
traceConsumer := fakeconsumer.Consumer{
320
ConsumeTracesFunc: func(ctx context.Context, t ptrace.Traces) error {
321
select {
322
case <-ctx.Done():
323
return ctx.Err()
324
case ch <- t:
325
return nil
326
}
327
},
328
}
329
330
return &otelcol.ConsumerArguments{
331
Traces: []otelcol.Consumer{&traceConsumer},
332
}
333
}
334
335
func createTestTraces() ptrace.Traces {
336
// Matches format from the protobuf definition:
337
// https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto
338
var bb = `{
339
"resource_spans": [{
340
"scope_spans": [{
341
"spans": [{
342
"name": "TestSpan"
343
}]
344
}]
345
}]
346
}`
347
348
decoder := &ptrace.JSONUnmarshaler{}
349
data, err := decoder.UnmarshalTraces([]byte(bb))
350
if err != nil {
351
panic(err)
352
}
353
return data
354
}
355
356