Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/otelcol/receiver/jaeger/jaeger.go
4096 views
1
// Package jaeger provides an otelcol.receiver.jaeger component.
2
package jaeger
3
4
import (
5
"fmt"
6
"time"
7
8
"github.com/alecthomas/units"
9
"github.com/grafana/agent/component"
10
"github.com/grafana/agent/component/otelcol"
11
"github.com/grafana/agent/component/otelcol/receiver"
12
"github.com/grafana/agent/pkg/river"
13
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver"
14
otelcomponent "go.opentelemetry.io/collector/component"
15
otelconfig "go.opentelemetry.io/collector/config"
16
)
17
18
func init() {
19
component.Register(component.Registration{
20
Name: "otelcol.receiver.jaeger",
21
Args: Arguments{},
22
23
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
24
fact := jaegerreceiver.NewFactory()
25
return receiver.New(opts, fact, args.(Arguments))
26
},
27
})
28
}
29
30
// Arguments configures the otelcol.receiver.jaeger component.
31
type Arguments struct {
32
Protocols ProtocolsArguments `river:"protocols,block"`
33
RemoteSampling *RemoteSamplingArguments `river:"remote_sampling,block,optional"`
34
35
// Output configures where to send received data. Required.
36
Output *otelcol.ConsumerArguments `river:"output,block"`
37
}
38
39
var (
40
_ river.Unmarshaler = (*Arguments)(nil)
41
_ receiver.Arguments = Arguments{}
42
)
43
44
// DefaultArguments provides default settings for Arguments. All protocols are
45
// configured with defaults and then set to nil in UnmarshalRiver if they were
46
// not defined in the source config.
47
var DefaultArguments = Arguments{
48
Protocols: ProtocolsArguments{
49
GRPC: &otelcol.GRPCServerArguments{
50
Endpoint: "0.0.0.0:14250",
51
Transport: "tcp",
52
},
53
ThriftHTTP: &otelcol.HTTPServerArguments{
54
Endpoint: "0.0.0.0:14268",
55
},
56
ThriftBinary: &ProtocolUDP{
57
Endpoint: "0.0.0.0:6832",
58
QueueSize: 1_000,
59
MaxPacketSize: 65 * units.KiB,
60
Workers: 10,
61
},
62
ThriftCompact: &ProtocolUDP{
63
Endpoint: "0.0.0.0:6831",
64
QueueSize: 1_000,
65
MaxPacketSize: 65 * units.KiB,
66
Workers: 10,
67
},
68
},
69
}
70
71
// UnmarshalRiver implements river.Unmarshaler.
72
func (args *Arguments) UnmarshalRiver(f func(interface{}) error) error {
73
*args = DefaultArguments
74
75
type arguments Arguments
76
77
// Unmarshal into a temporary struct so we can detect which protocols were
78
// actually enabled by the user.
79
var temp arguments
80
if err := f(&temp); err != nil {
81
return err
82
}
83
84
// Remove protocols from args if they weren't provided by the user.
85
if temp.Protocols.GRPC == nil {
86
args.Protocols.GRPC = nil
87
}
88
if temp.Protocols.ThriftHTTP == nil {
89
args.Protocols.ThriftHTTP = nil
90
}
91
if temp.Protocols.ThriftBinary == nil {
92
args.Protocols.ThriftBinary = nil
93
}
94
if temp.Protocols.ThriftCompact == nil {
95
args.Protocols.ThriftCompact = nil
96
}
97
98
// Finally, unmarshal into the real struct.
99
if err := f((*arguments)(args)); err != nil {
100
return err
101
}
102
return args.Validate()
103
}
104
105
// Validate returns an error if args is invalid.
106
func (args *Arguments) Validate() error {
107
if args.Protocols.GRPC == nil &&
108
args.Protocols.ThriftHTTP == nil &&
109
args.Protocols.ThriftBinary == nil &&
110
args.Protocols.ThriftCompact == nil {
111
112
return fmt.Errorf("at least one protocol must be enabled")
113
}
114
115
return nil
116
}
117
118
// Convert implements receiver.Arguments.
119
func (args Arguments) Convert() (otelconfig.Receiver, error) {
120
return &jaegerreceiver.Config{
121
ReceiverSettings: otelconfig.NewReceiverSettings(otelconfig.NewComponentID("jaeger")),
122
Protocols: jaegerreceiver.Protocols{
123
GRPC: args.Protocols.GRPC.Convert(),
124
ThriftHTTP: args.Protocols.ThriftHTTP.Convert(),
125
ThriftBinary: args.Protocols.ThriftBinary.Convert(),
126
ThriftCompact: args.Protocols.ThriftCompact.Convert(),
127
},
128
RemoteSampling: args.RemoteSampling.Convert(),
129
}, nil
130
}
131
132
// Extensions implements receiver.Arguments.
133
func (args Arguments) Extensions() map[otelconfig.ComponentID]otelcomponent.Extension {
134
if args.RemoteSampling == nil {
135
return nil
136
}
137
return args.RemoteSampling.Client.Extensions()
138
}
139
140
// Exporters implements receiver.Arguments.
141
func (args Arguments) Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter {
142
return nil
143
}
144
145
// NextConsumers implements receiver.Arguments.
146
func (args Arguments) NextConsumers() *otelcol.ConsumerArguments {
147
return args.Output
148
}
149
150
// ProtocolsArguments configures protocols for otelcol.receiver.jaeger to
151
// listen on.
152
type ProtocolsArguments struct {
153
GRPC *otelcol.GRPCServerArguments `river:"grpc,block,optional"`
154
ThriftHTTP *otelcol.HTTPServerArguments `river:"thrift_http,block,optional"`
155
ThriftBinary *ProtocolUDP `river:"thrift_binary,block,optional"`
156
ThriftCompact *ProtocolUDP `river:"thrift_compact,block,optional"`
157
}
158
159
// ProtocolUDP configures a UDP server.
160
type ProtocolUDP struct {
161
Endpoint string `river:"endpoint,attr,optional"`
162
QueueSize int `river:"queue_size,attr,optional"`
163
MaxPacketSize units.Base2Bytes `river:"max_packet_size,attr,optional"`
164
Workers int `river:"workers,attr,optional"`
165
SocketBufferSize units.Base2Bytes `river:"socket_buffer_size,attr,optional"`
166
}
167
168
// Convert converts proto into the upstream type.
169
func (proto *ProtocolUDP) Convert() *jaegerreceiver.ProtocolUDP {
170
if proto == nil {
171
return nil
172
}
173
174
return &jaegerreceiver.ProtocolUDP{
175
Endpoint: proto.Endpoint,
176
ServerConfigUDP: jaegerreceiver.ServerConfigUDP{
177
QueueSize: proto.QueueSize,
178
MaxPacketSize: int(proto.MaxPacketSize),
179
Workers: proto.Workers,
180
SocketBufferSize: int(proto.SocketBufferSize),
181
},
182
}
183
}
184
185
// RemoteSamplingArguments configures remote sampling settings.
186
type RemoteSamplingArguments struct {
187
// TODO(rfratto): can we work with upstream to provide a hook to provide a
188
// custom strategy file and bypass the reload interval?
189
//
190
// That would let users connect a local.file to otelcol.receiver.jaeger for
191
// the remote sampling.
192
193
HostEndpoint string `river:"host_endpoint,attr"`
194
StrategyFile string `river:"strategy_file,attr"`
195
StrategyFileReloadInterval time.Duration `river:"strategy_file_reload_interval,attr"`
196
Client otelcol.GRPCClientArguments `river:"client,block"`
197
}
198
199
// Convert converts args into the upstream type.
200
func (args *RemoteSamplingArguments) Convert() *jaegerreceiver.RemoteSamplingConfig {
201
if args == nil {
202
return nil
203
}
204
205
return &jaegerreceiver.RemoteSamplingConfig{
206
HostEndpoint: args.HostEndpoint,
207
StrategyFile: args.StrategyFile,
208
StrategyFileReloadInterval: args.StrategyFileReloadInterval,
209
GRPCClientSettings: *args.Client.Convert(),
210
}
211
}
212
213