Path: blob/main/component/otelcol/processor/tail_sampling/tail_sampling_test.go
4096 views
//go:build !race12package tail_sampling34import (5"context"6"testing"7"time"89"github.com/go-kit/log/level"10"github.com/grafana/agent/component/otelcol"11"github.com/grafana/agent/component/otelcol/internal/fakeconsumer"12"github.com/grafana/agent/pkg/flow/componenttest"13"github.com/grafana/agent/pkg/river"14"github.com/grafana/agent/pkg/util"15"github.com/grafana/dskit/backoff"16"github.com/stretchr/testify/require"17"go.opentelemetry.io/collector/pdata/ptrace"18)1920func TestBadRiverConfig(t *testing.T) {21exampleBadRiverConfig := `22decision_wait = "10s"23num_traces = 024expected_new_traces_per_sec = 1025policy {26name = "test-policy-1"27type = "always_sample"28}29output {30// no-op: will be overridden by test code.31}32`3334var args Arguments35require.Error(t, river.Unmarshal([]byte(exampleBadRiverConfig), &args), "num_traces must be greater than zero")36}3738func TestBadOtelConfig(t *testing.T) {39var exampleBadOtelConfig = `40decision_wait = "10s"41num_traces = 10042expected_new_traces_per_sec = 1043policy {44name = "test-policy-1"45type = "bad_type"46}47output {48// no-op: will be overridden by test code.49}50`5152ctx := componenttest.TestContext(t)53l := util.TestLogger(t)5455ctrl, err := componenttest.NewControllerFromID(l, "otelcol.processor.tail_sampling")56require.NoError(t, err)5758var args Arguments59require.NoError(t, river.Unmarshal([]byte(exampleBadOtelConfig), &args))6061// Override our arguments so traces get forwarded to traceCh.62traceCh := make(chan ptrace.Traces)63args.Output = makeTracesOutput(traceCh)6465go func() {66err := ctrl.Run(ctx, args)67require.Error(t, err, "unknown sampling policy type bad_type")68}()6970require.Error(t, ctrl.WaitRunning(time.Second), "component never started")71}7273func TestBigConfig(t *testing.T) {74exampleBigConfig := `75decision_wait = "10s"76num_traces = 10077expected_new_traces_per_sec = 1078policy {79name = "test-policy-1"80type = "always_sample"81}82policy {83name = "test-policy-2"84type = "latency"85latency {86threshold_ms = 500087}88}89policy {90name = "test-policy-3"91type = "numeric_attribute"92numeric_attribute {93key = "key1"94min_value = 5095max_value = 10096}97}98policy {99name = "test-policy-4"100type = "probabilistic"101probabilistic {102sampling_percentage = 10103}104}105policy {106name = "test-policy-5"107type = "status_code"108status_code {109status_codes = ["ERROR", "UNSET"]110}111}112policy {113name = "test-policy-6"114type = "string_attribute"115string_attribute {116key = "key2"117values = ["value1", "value2"]118}119}120policy {121name = "test-policy-7"122type = "string_attribute"123string_attribute {124key = "key2"125values = ["value1", "val*"]126enabled_regex_matching = true127cache_max_size = 10128}129}130policy {131name = "test-policy-8"132type = "rate_limiting"133rate_limiting {134spans_per_second = 35135}136}137policy {138name = "test-policy-9"139type = "string_attribute"140string_attribute {141key = "http.url"142values = ["/health", "/metrics"]143enabled_regex_matching = true144invert_match = true145}146}147policy {148name = "test-policy-10"149type = "span_count"150span_count {151min_spans = 2152}153}154policy {155name = "test-policy-11"156type = "trace_state"157trace_state {158key = "key3"159values = ["value1", "value2"]160}161}162policy{163name = "and-policy-1"164type = "and"165and {166and_sub_policy {167name = "test-and-policy-1"168type = "numeric_attribute"169numeric_attribute {170key = "key1"171min_value = 50172max_value = 100173}174}175and_sub_policy {176name = "test-and-policy-2"177type = "string_attribute"178string_attribute {179key = "key1"180values = ["value1", "value2"]181}182}183}184}185policy{186name = "composite-policy-1"187type = "composite"188composite {189max_total_spans_per_second = 1000190policy_order = ["test-composite-policy-1", "test-composite-policy-2", "test-composite-policy-3"]191composite_sub_policy {192name = "test-composite-policy-1"193type = "numeric_attribute"194numeric_attribute {195key = "key1"196min_value = 50197max_value = 100198}199}200composite_sub_policy {201name = "test-composite-policy-2"202type = "string_attribute"203string_attribute {204key = "key1"205values = ["value1", "value2"]206}207}208composite_sub_policy {209name = "test-composite-policy-3"210type = "always_sample"211}212rate_allocation {213policy = "test-composite-policy-1"214percent = 50215}216rate_allocation {217policy = "test-composite-policy-2"218percent = 50219}220}221}222223output {224// no-op: will be overridden by test code.225}226`227228ctx := componenttest.TestContext(t)229l := util.TestLogger(t)230231ctrl, err := componenttest.NewControllerFromID(l, "otelcol.processor.tail_sampling")232require.NoError(t, err)233234var args Arguments235require.NoError(t, river.Unmarshal([]byte(exampleBigConfig), &args))236237// Override our arguments so traces get forwarded to traceCh.238traceCh := make(chan ptrace.Traces)239args.Output = makeTracesOutput(traceCh)240241go func() {242err := ctrl.Run(ctx, args)243require.NoError(t, err)244}()245246require.NoError(t, ctrl.WaitRunning(time.Second), "component never started")247require.NoError(t, ctrl.WaitExports(time.Second), "component never exported anything")248}249250func TestTraceProcessing(t *testing.T) {251exampleSmallConfig := `252decision_wait = "1s"253num_traces = 1254expected_new_traces_per_sec = 1255policy {256name = "test-policy-1"257type = "always_sample"258}259output {260// no-op: will be overridden by test code.261}262`263ctx := componenttest.TestContext(t)264l := util.TestLogger(t)265266ctrl, err := componenttest.NewControllerFromID(l, "otelcol.processor.tail_sampling")267require.NoError(t, err)268269var args Arguments270require.NoError(t, river.Unmarshal([]byte(exampleSmallConfig), &args))271272// Override our arguments so traces get forwarded to traceCh.273traceCh := make(chan ptrace.Traces)274args.Output = makeTracesOutput(traceCh)275276go func() {277err := ctrl.Run(ctx, args)278require.NoError(t, err)279}()280281require.NoError(t, ctrl.WaitRunning(time.Second), "component never started")282require.NoError(t, ctrl.WaitExports(time.Second), "component never exported anything")283284// Send traces in the background to our processor.285go func() {286exports := ctrl.Exports().(otelcol.ConsumerExports)287288exports.Input.Capabilities()289290bo := backoff.New(ctx, backoff.Config{291MinBackoff: 10 * time.Millisecond,292MaxBackoff: 100 * time.Millisecond,293})294for bo.Ongoing() {295err := exports.Input.ConsumeTraces(ctx, createTestTraces())296if err != nil {297level.Error(l).Log("msg", "failed to send traces", "err", err)298bo.Wait()299continue300}301302return303}304}()305306// Wait for our processor to finish and forward data to traceCh.307select {308case <-time.After(time.Second * 10):309require.FailNow(t, "failed waiting for traces")310case tr := <-traceCh:311require.Equal(t, 1, tr.SpanCount())312}313}314315// makeTracesOutput returns ConsumerArguments which will forward traces to the316// provided channel.317func makeTracesOutput(ch chan ptrace.Traces) *otelcol.ConsumerArguments {318traceConsumer := fakeconsumer.Consumer{319ConsumeTracesFunc: func(ctx context.Context, t ptrace.Traces) error {320select {321case <-ctx.Done():322return ctx.Err()323case ch <- t:324return nil325}326},327}328329return &otelcol.ConsumerArguments{330Traces: []otelcol.Consumer{&traceConsumer},331}332}333334func createTestTraces() ptrace.Traces {335// Matches format from the protobuf definition:336// https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto337var bb = `{338"resource_spans": [{339"scope_spans": [{340"spans": [{341"name": "TestSpan"342}]343}]344}]345}`346347decoder := &ptrace.JSONUnmarshaler{}348data, err := decoder.UnmarshalTraces([]byte(bb))349if err != nil {350panic(err)351}352return data353}354355356