Path: blob/main/component/otelcol/processor/batch/batch.go
4096 views
// Package batch provides an otelcol.processor.batch component.1package batch23import (4"fmt"5"time"67"github.com/grafana/agent/component"8"github.com/grafana/agent/component/otelcol"9"github.com/grafana/agent/component/otelcol/processor"10"github.com/grafana/agent/pkg/river"11otelcomponent "go.opentelemetry.io/collector/component"12otelconfig "go.opentelemetry.io/collector/config"13"go.opentelemetry.io/collector/processor/batchprocessor"14)1516func init() {17component.Register(component.Registration{18Name: "otelcol.processor.batch",19Args: Arguments{},20Exports: otelcol.ConsumerExports{},2122Build: func(opts component.Options, args component.Arguments) (component.Component, error) {23fact := batchprocessor.NewFactory()24return processor.New(opts, fact, args.(Arguments))25},26})27}2829// Arguments configures the otelcol.processor.batch component.30type Arguments struct {31Timeout time.Duration `river:"timeout,attr,optional"`32SendBatchSize uint32 `river:"send_batch_size,attr,optional"`33SendBatchMaxSize uint32 `river:"send_batch_max_size,attr,optional"`3435// Output configures where to send processed data. Required.36Output *otelcol.ConsumerArguments `river:"output,block"`37}3839var (40_ processor.Arguments = Arguments{}41_ river.Unmarshaler = (*Arguments)(nil)42)4344// DefaultArguments holds default settings for Arguments.45var DefaultArguments = Arguments{46Timeout: 200 * time.Millisecond,47SendBatchSize: 8192,48}4950// UnmarshalRiver implements river.Unmarshaler. It applies defaults to args and51// validates settings provided by the user.52func (args *Arguments) UnmarshalRiver(f func(interface{}) error) error {53*args = DefaultArguments5455type arguments Arguments56if err := f((*arguments)(args)); err != nil {57return err58}5960if args.SendBatchMaxSize > 0 && args.SendBatchMaxSize < args.SendBatchSize {61return fmt.Errorf("send_batch_max_size must be greater or equal to send_batch_size when not 0")62}63return nil64}6566// Convert implements processor.Arguments.67func (args Arguments) Convert() (otelconfig.Processor, error) {68return &batchprocessor.Config{69ProcessorSettings: otelconfig.NewProcessorSettings(otelconfig.NewComponentID("batch")),70Timeout: args.Timeout,71SendBatchSize: args.SendBatchSize,72SendBatchMaxSize: args.SendBatchMaxSize,73}, nil74}7576// Extensions implements processor.Arguments.77func (args Arguments) Extensions() map[otelconfig.ComponentID]otelcomponent.Extension {78return nil79}8081// Exporters implements processor.Arguments.82func (args Arguments) Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter {83return nil84}8586// NextConsumers implements processor.Arguments.87func (args Arguments) NextConsumers() *otelcol.ConsumerArguments {88return args.Output89}909192