Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/otelcol/processor/batch/batch.go
4096 views
1
// Package batch provides an otelcol.processor.batch component.
2
package batch
3
4
import (
5
"fmt"
6
"time"
7
8
"github.com/grafana/agent/component"
9
"github.com/grafana/agent/component/otelcol"
10
"github.com/grafana/agent/component/otelcol/processor"
11
"github.com/grafana/agent/pkg/river"
12
otelcomponent "go.opentelemetry.io/collector/component"
13
otelconfig "go.opentelemetry.io/collector/config"
14
"go.opentelemetry.io/collector/processor/batchprocessor"
15
)
16
17
func init() {
18
component.Register(component.Registration{
19
Name: "otelcol.processor.batch",
20
Args: Arguments{},
21
Exports: otelcol.ConsumerExports{},
22
23
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
24
fact := batchprocessor.NewFactory()
25
return processor.New(opts, fact, args.(Arguments))
26
},
27
})
28
}
29
30
// Arguments configures the otelcol.processor.batch component.
31
type Arguments struct {
32
Timeout time.Duration `river:"timeout,attr,optional"`
33
SendBatchSize uint32 `river:"send_batch_size,attr,optional"`
34
SendBatchMaxSize uint32 `river:"send_batch_max_size,attr,optional"`
35
36
// Output configures where to send processed data. Required.
37
Output *otelcol.ConsumerArguments `river:"output,block"`
38
}
39
40
var (
41
_ processor.Arguments = Arguments{}
42
_ river.Unmarshaler = (*Arguments)(nil)
43
)
44
45
// DefaultArguments holds default settings for Arguments.
46
var DefaultArguments = Arguments{
47
Timeout: 200 * time.Millisecond,
48
SendBatchSize: 8192,
49
}
50
51
// UnmarshalRiver implements river.Unmarshaler. It applies defaults to args and
52
// validates settings provided by the user.
53
func (args *Arguments) UnmarshalRiver(f func(interface{}) error) error {
54
*args = DefaultArguments
55
56
type arguments Arguments
57
if err := f((*arguments)(args)); err != nil {
58
return err
59
}
60
61
if args.SendBatchMaxSize > 0 && args.SendBatchMaxSize < args.SendBatchSize {
62
return fmt.Errorf("send_batch_max_size must be greater or equal to send_batch_size when not 0")
63
}
64
return nil
65
}
66
67
// Convert implements processor.Arguments.
68
func (args Arguments) Convert() (otelconfig.Processor, error) {
69
return &batchprocessor.Config{
70
ProcessorSettings: otelconfig.NewProcessorSettings(otelconfig.NewComponentID("batch")),
71
Timeout: args.Timeout,
72
SendBatchSize: args.SendBatchSize,
73
SendBatchMaxSize: args.SendBatchMaxSize,
74
}, nil
75
}
76
77
// Extensions implements processor.Arguments.
78
func (args Arguments) Extensions() map[otelconfig.ComponentID]otelcomponent.Extension {
79
return nil
80
}
81
82
// Exporters implements processor.Arguments.
83
func (args Arguments) Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter {
84
return nil
85
}
86
87
// NextConsumers implements processor.Arguments.
88
func (args Arguments) NextConsumers() *otelcol.ConsumerArguments {
89
return args.Output
90
}
91
92