Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/otelcol/config_queue.go
4096 views
1
package otelcol
2
3
import (
4
"fmt"
5
6
"github.com/grafana/agent/pkg/river"
7
otelexporterhelper "go.opentelemetry.io/collector/exporter/exporterhelper"
8
)
9
10
// QueueArguments holds shared settings for components which can queue
11
// requests.
12
type QueueArguments struct {
13
Enabled bool `river:"enabled,attr,optional"`
14
NumConsumers int `river:"num_consumers,attr,optional"`
15
QueueSize int `river:"queue_size,attr,optional"`
16
17
// TODO(rfratto): queues can send to persistent storage through an extension.
18
}
19
20
var _ river.Unmarshaler = (*QueueArguments)(nil)
21
22
// DefaultQueueArguments holds default settings for QueueArguments.
23
var DefaultQueueArguments = QueueArguments{
24
Enabled: true,
25
NumConsumers: 10,
26
27
// Copied from [upstream]:
28
//
29
// 5000 queue elements at 100 requests/sec gives about 50 seconds of survival
30
// of destination outage. This is a pretty decent value for production. Users
31
// should calculate this from the perspective of how many seconds to buffer
32
// in case of a backend outage and multiply that by the number of requests
33
// per second.
34
//
35
// [upstream]: https://github.com/open-telemetry/opentelemetry-collector/blob/ff73e49f74d8fd8c57a849aa3ff23ae1940cc16a/exporter/exporterhelper/queued_retry.go#L62-L65
36
QueueSize: 5000,
37
}
38
39
// UnmarshalRiver implements river.Unmarshaler.
40
func (args *QueueArguments) UnmarshalRiver(f func(interface{}) error) error {
41
*args = DefaultQueueArguments
42
type arguments QueueArguments
43
return f((*arguments)(args))
44
}
45
46
// Convert converts args into the upstream type.
47
func (args *QueueArguments) Convert() *otelexporterhelper.QueueSettings {
48
if args == nil {
49
return nil
50
}
51
52
return &otelexporterhelper.QueueSettings{
53
Enabled: args.Enabled,
54
NumConsumers: args.NumConsumers,
55
QueueSize: args.QueueSize,
56
}
57
}
58
59
// Validate returns an error if args is invalid.
60
func (args *QueueArguments) Validate() error {
61
if args == nil || !args.Enabled {
62
return nil
63
}
64
65
if args.QueueSize <= 0 {
66
return fmt.Errorf("queue_size must be greater than zero")
67
}
68
69
return nil
70
}
71
72