Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/otelcol/processor/tail_sampling/tail_sampling.go
4096 views
1
// Package tail_sampling provides an otelcol.processor.tail_sampling component.
2
package tail_sampling
3
4
import (
5
"fmt"
6
"time"
7
8
"github.com/grafana/agent/component"
9
"github.com/grafana/agent/component/otelcol"
10
"github.com/grafana/agent/component/otelcol/processor"
11
"github.com/grafana/agent/pkg/river"
12
tsp "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor"
13
otelcomponent "go.opentelemetry.io/collector/component"
14
otelconfig "go.opentelemetry.io/collector/config"
15
)
16
17
func init() {
18
component.Register(component.Registration{
19
Name: "otelcol.processor.tail_sampling",
20
Args: Arguments{},
21
Exports: otelcol.ConsumerExports{},
22
23
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
24
fact := tsp.NewFactory()
25
return processor.New(opts, fact, args.(Arguments))
26
},
27
})
28
}
29
30
// Arguments configures the otelcol.processor.tail_sampling component.
31
type Arguments struct {
32
PolicyCfgs []PolicyCfg `river:"policy,block"`
33
DecisionWait time.Duration `river:"decision_wait,attr,optional"`
34
NumTraces uint64 `river:"num_traces,attr,optional"`
35
ExpectedNewTracesPerSec uint64 `river:"expected_new_traces_per_sec,attr,optional"`
36
// Output configures where to send processed data. Required.
37
Output *otelcol.ConsumerArguments `river:"output,block"`
38
}
39
40
var (
41
_ processor.Arguments = Arguments{}
42
_ river.Unmarshaler = (*Arguments)(nil)
43
)
44
45
// DefaultArguments holds default settings for Arguments.
46
var DefaultArguments = Arguments{
47
DecisionWait: 30 * time.Second,
48
NumTraces: 50000,
49
ExpectedNewTracesPerSec: 0,
50
}
51
52
// UnmarshalRiver implements river.Unmarshaler. It applies defaults to args and
53
// validates settings provided by the user.
54
func (args *Arguments) UnmarshalRiver(f func(interface{}) error) error {
55
*args = DefaultArguments
56
57
type arguments Arguments
58
if err := f((*arguments)(args)); err != nil {
59
return err
60
}
61
62
if args.DecisionWait.Milliseconds() <= 0 {
63
return fmt.Errorf("decision_wait must be greater than zero")
64
}
65
66
if args.NumTraces <= 0 {
67
return fmt.Errorf("num_traces must be greater than zero")
68
}
69
70
return nil
71
}
72
73
// Convert implements processor.Arguments.
74
func (args Arguments) Convert() (otelconfig.Processor, error) {
75
// TODO: Get rid of mapstructure once tailsamplingprocessor.Config has all public types
76
var otelConfig tsp.Config
77
78
var otelPolicyCfgs []tsp.PolicyCfg
79
for _, policyCfg := range args.PolicyCfgs {
80
otelPolicyCfgs = append(otelPolicyCfgs, policyCfg.Convert())
81
}
82
83
mustDecodeMapStructure(map[string]interface{}{
84
"decision_wait": args.DecisionWait,
85
"num_traces": args.NumTraces,
86
"expected_new_traces_per_sec": args.ExpectedNewTracesPerSec,
87
"policies": otelPolicyCfgs,
88
}, &otelConfig)
89
90
otelConfig.ProcessorSettings = otelconfig.NewProcessorSettings(otelconfig.NewComponentID("tail_sampling"))
91
92
return &otelConfig, nil
93
}
94
95
// Extensions implements processor.Arguments.
96
func (args Arguments) Extensions() map[otelconfig.ComponentID]otelcomponent.Extension {
97
return nil
98
}
99
100
// Exporters implements processor.Arguments.
101
func (args Arguments) Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter {
102
return nil
103
}
104
105
// NextConsumers implements processor.Arguments.
106
func (args Arguments) NextConsumers() *otelcol.ConsumerArguments {
107
return args.Output
108
}
109
110