Path: blob/main/component/otelcol/processor/tail_sampling/tail_sampling.go
4096 views
// Package tail_sampling provides an otelcol.processor.tail_sampling component.1package tail_sampling23import (4"fmt"5"time"67"github.com/grafana/agent/component"8"github.com/grafana/agent/component/otelcol"9"github.com/grafana/agent/component/otelcol/processor"10"github.com/grafana/agent/pkg/river"11tsp "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor"12otelcomponent "go.opentelemetry.io/collector/component"13otelconfig "go.opentelemetry.io/collector/config"14)1516func init() {17component.Register(component.Registration{18Name: "otelcol.processor.tail_sampling",19Args: Arguments{},20Exports: otelcol.ConsumerExports{},2122Build: func(opts component.Options, args component.Arguments) (component.Component, error) {23fact := tsp.NewFactory()24return processor.New(opts, fact, args.(Arguments))25},26})27}2829// Arguments configures the otelcol.processor.tail_sampling component.30type Arguments struct {31PolicyCfgs []PolicyCfg `river:"policy,block"`32DecisionWait time.Duration `river:"decision_wait,attr,optional"`33NumTraces uint64 `river:"num_traces,attr,optional"`34ExpectedNewTracesPerSec uint64 `river:"expected_new_traces_per_sec,attr,optional"`35// Output configures where to send processed data. Required.36Output *otelcol.ConsumerArguments `river:"output,block"`37}3839var (40_ processor.Arguments = Arguments{}41_ river.Unmarshaler = (*Arguments)(nil)42)4344// DefaultArguments holds default settings for Arguments.45var DefaultArguments = Arguments{46DecisionWait: 30 * time.Second,47NumTraces: 50000,48ExpectedNewTracesPerSec: 0,49}5051// UnmarshalRiver implements river.Unmarshaler. It applies defaults to args and52// validates settings provided by the user.53func (args *Arguments) UnmarshalRiver(f func(interface{}) error) error {54*args = DefaultArguments5556type arguments Arguments57if err := f((*arguments)(args)); err != nil {58return err59}6061if args.DecisionWait.Milliseconds() <= 0 {62return fmt.Errorf("decision_wait must be greater than zero")63}6465if args.NumTraces <= 0 {66return fmt.Errorf("num_traces must be greater than zero")67}6869return nil70}7172// Convert implements processor.Arguments.73func (args Arguments) Convert() (otelconfig.Processor, error) {74// TODO: Get rid of mapstructure once tailsamplingprocessor.Config has all public types75var otelConfig tsp.Config7677var otelPolicyCfgs []tsp.PolicyCfg78for _, policyCfg := range args.PolicyCfgs {79otelPolicyCfgs = append(otelPolicyCfgs, policyCfg.Convert())80}8182mustDecodeMapStructure(map[string]interface{}{83"decision_wait": args.DecisionWait,84"num_traces": args.NumTraces,85"expected_new_traces_per_sec": args.ExpectedNewTracesPerSec,86"policies": otelPolicyCfgs,87}, &otelConfig)8889otelConfig.ProcessorSettings = otelconfig.NewProcessorSettings(otelconfig.NewComponentID("tail_sampling"))9091return &otelConfig, nil92}9394// Extensions implements processor.Arguments.95func (args Arguments) Extensions() map[otelconfig.ComponentID]otelcomponent.Extension {96return nil97}9899// Exporters implements processor.Arguments.100func (args Arguments) Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter {101return nil102}103104// NextConsumers implements processor.Arguments.105func (args Arguments) NextConsumers() *otelcol.ConsumerArguments {106return args.Output107}108109110