Path: blob/main/component/otelcol/processor/memorylimiter/memorylimiter_test.go
4096 views
package memorylimiter_test12import (3"context"4"testing"5"time"67"github.com/go-kit/log/level"8"github.com/grafana/agent/component/otelcol"9"github.com/grafana/agent/component/otelcol/internal/fakeconsumer"10"github.com/grafana/agent/component/otelcol/processor/memorylimiter"11"github.com/grafana/agent/pkg/flow/componenttest"12"github.com/grafana/agent/pkg/river"13"github.com/grafana/agent/pkg/util"14"github.com/grafana/dskit/backoff"15"github.com/stretchr/testify/require"16"go.opentelemetry.io/collector/pdata/ptrace"17)1819// Test performs a basic integration test which runs the20// otelcol.processor.memory_limiter component and ensures that it can accept,21// process, and forward data.22func Test(t *testing.T) {23ctx := componenttest.TestContext(t)24l := util.TestLogger(t)2526ctrl, err := componenttest.NewControllerFromID(l, "otelcol.processor.memory_limiter")27require.NoError(t, err)2829cfg := `30check_interval = "10ms"31limit = "20MiB"3233output {34// no-op: will be overridden by test code.35}36`37var args memorylimiter.Arguments38require.NoError(t, river.Unmarshal([]byte(cfg), &args))3940// Override our arguments so traces get forwarded to traceCh.41traceCh := make(chan ptrace.Traces)42args.Output = makeTracesOutput(traceCh)4344go func() {45err := ctrl.Run(ctx, args)46require.NoError(t, err)47}()4849require.NoError(t, ctrl.WaitRunning(time.Second), "component never started")50require.NoError(t, ctrl.WaitExports(time.Second), "component never exported anything")5152// Send traces in the background to our processor.53go func() {54exports := ctrl.Exports().(otelcol.ConsumerExports)5556bo := backoff.New(ctx, backoff.Config{57MinBackoff: 10 * time.Millisecond,58MaxBackoff: 100 * time.Millisecond,59})60for bo.Ongoing() {61err := exports.Input.ConsumeTraces(ctx, createTestTraces())62if err != nil {63level.Error(l).Log("msg", "failed to send traces", "err", err)64bo.Wait()65continue66}6768return69}70}()7172// Wait for our processor to finish and forward data to traceCh.73select {74case <-time.After(time.Second):75require.FailNow(t, "failed waiting for traces")76case tr := <-traceCh:77require.Equal(t, 1, tr.SpanCount())78}79}8081// makeTracesOutput returns ConsumerArguments which will forward traces to the82// provided channel.83func makeTracesOutput(ch chan ptrace.Traces) *otelcol.ConsumerArguments {84traceConsumer := fakeconsumer.Consumer{85ConsumeTracesFunc: func(ctx context.Context, t ptrace.Traces) error {86select {87case <-ctx.Done():88return ctx.Err()89case ch <- t:90return nil91}92},93}9495return &otelcol.ConsumerArguments{96Traces: []otelcol.Consumer{&traceConsumer},97}98}99100func createTestTraces() ptrace.Traces {101// Matches format from the protobuf definition:102// https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto103var bb = `{104"resource_spans": [{105"scope_spans": [{106"spans": [{107"name": "TestSpan"108}]109}]110}]111}`112113decoder := &ptrace.JSONUnmarshaler{}114data, err := decoder.UnmarshalTraces([]byte(bb))115if err != nil {116panic(err)117}118return data119}120121122