Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/phlare/write/write.go
4096 views
1
package write
2
3
import (
4
"context"
5
"errors"
6
"fmt"
7
"strings"
8
"time"
9
10
"github.com/bufbuild/connect-go"
11
"github.com/go-kit/log/level"
12
"github.com/oklog/run"
13
commonconfig "github.com/prometheus/common/config"
14
"github.com/prometheus/common/model"
15
"github.com/prometheus/prometheus/model/labels"
16
"go.uber.org/multierr"
17
18
"github.com/grafana/agent/component"
19
"github.com/grafana/agent/component/common/config"
20
"github.com/grafana/agent/component/phlare"
21
"github.com/grafana/agent/pkg/build"
22
"github.com/grafana/dskit/backoff"
23
pushv1 "github.com/grafana/phlare/api/gen/proto/go/push/v1"
24
pushv1connect "github.com/grafana/phlare/api/gen/proto/go/push/v1/pushv1connect"
25
typesv1 "github.com/grafana/phlare/api/gen/proto/go/types/v1"
26
)
27
28
var (
29
userAgent = fmt.Sprintf("GrafanaAgent/%s", build.Version)
30
DefaultArguments = func() Arguments {
31
return Arguments{}
32
}
33
_ component.Component = (*Component)(nil)
34
)
35
36
func init() {
37
component.Register(component.Registration{
38
Name: "phlare.write",
39
Args: Arguments{},
40
Exports: Exports{},
41
Build: func(o component.Options, c component.Arguments) (component.Component, error) {
42
return NewComponent(o, c.(Arguments))
43
},
44
})
45
}
46
47
// Arguments represents the input state of the phlare.write
48
// component.
49
type Arguments struct {
50
ExternalLabels map[string]string `river:"external_labels,attr,optional"`
51
Endpoints []*EndpointOptions `river:"endpoint,block,optional"`
52
}
53
54
// UnmarshalRiver implements river.Unmarshaler.
55
func (rc *Arguments) UnmarshalRiver(f func(interface{}) error) error {
56
*rc = DefaultArguments()
57
58
type config Arguments
59
return f((*config)(rc))
60
}
61
62
// EndpointOptions describes an individual location for where profiles
63
// should be delivered to using the Phlare push API.
64
type EndpointOptions struct {
65
Name string `river:"name,attr,optional"`
66
URL string `river:"url,attr"`
67
RemoteTimeout time.Duration `river:"remote_timeout,attr,optional"`
68
Headers map[string]string `river:"headers,attr,optional"`
69
HTTPClientConfig *config.HTTPClientConfig `river:",squash"`
70
MinBackoff time.Duration `river:"min_backoff_period,attr,optional"` // start backoff at this level
71
MaxBackoff time.Duration `river:"max_backoff_period,attr,optional"` // increase exponentially to this level
72
MaxBackoffRetries int `river:"max_backoff_retries,attr,optional"` // give up after this many; zero means infinite retries
73
}
74
75
func GetDefaultEndpointOptions() EndpointOptions {
76
defaultEndpointOptions := EndpointOptions{
77
RemoteTimeout: 10 * time.Second,
78
MinBackoff: 500 * time.Millisecond,
79
MaxBackoff: 5 * time.Minute,
80
MaxBackoffRetries: 10,
81
HTTPClientConfig: config.CloneDefaultHTTPClientConfig(),
82
}
83
84
return defaultEndpointOptions
85
}
86
87
// UnmarshalRiver implements river.Unmarshaler.
88
func (r *EndpointOptions) UnmarshalRiver(f func(v interface{}) error) error {
89
*r = GetDefaultEndpointOptions()
90
91
type arguments EndpointOptions
92
err := f((*arguments)(r))
93
if err != nil {
94
return err
95
}
96
97
// We must explicitly Validate because HTTPClientConfig is squashed and it won't run otherwise
98
if r.HTTPClientConfig != nil {
99
return r.HTTPClientConfig.Validate()
100
}
101
102
return nil
103
}
104
105
// Component is the phlare.write component.
106
type Component struct {
107
opts component.Options
108
cfg Arguments
109
metrics *metrics
110
}
111
112
// Exports are the set of fields exposed by the phlare.write component.
113
type Exports struct {
114
Receiver phlare.Appendable `river:"receiver,attr"`
115
}
116
117
// NewComponent creates a new phlare.write component.
118
func NewComponent(o component.Options, c Arguments) (*Component, error) {
119
metrics := newMetrics(o.Registerer)
120
receiver, err := NewFanOut(o, c, metrics)
121
if err != nil {
122
return nil, err
123
}
124
// Immediately export the receiver
125
o.OnStateChange(Exports{Receiver: receiver})
126
127
return &Component{
128
cfg: c,
129
opts: o,
130
metrics: metrics,
131
}, nil
132
}
133
134
var _ component.Component = (*Component)(nil)
135
136
// Run implements Component.
137
func (c *Component) Run(ctx context.Context) error {
138
<-ctx.Done()
139
return ctx.Err()
140
}
141
142
// Update implements Component.
143
func (c *Component) Update(newConfig component.Arguments) error {
144
c.cfg = newConfig.(Arguments)
145
level.Debug(c.opts.Logger).Log("msg", "updating phlare.write config", "old", c.cfg, "new", newConfig)
146
receiver, err := NewFanOut(c.opts, newConfig.(Arguments), c.metrics)
147
if err != nil {
148
return err
149
}
150
c.opts.OnStateChange(Exports{Receiver: receiver})
151
return nil
152
}
153
154
type fanOutClient struct {
155
// The list of push clients to fan out to.
156
clients []pushv1connect.PusherServiceClient
157
158
config Arguments
159
opts component.Options
160
metrics *metrics
161
}
162
163
// NewFanOut creates a new fan out client that will fan out to all endpoints.
164
func NewFanOut(opts component.Options, config Arguments, metrics *metrics) (*fanOutClient, error) {
165
clients := make([]pushv1connect.PusherServiceClient, 0, len(config.Endpoints))
166
for _, endpoint := range config.Endpoints {
167
httpClient, err := commonconfig.NewClientFromConfig(*endpoint.HTTPClientConfig.Convert(), endpoint.Name)
168
if err != nil {
169
return nil, err
170
}
171
clients = append(clients, pushv1connect.NewPusherServiceClient(httpClient, endpoint.URL, WithUserAgent(userAgent)))
172
}
173
return &fanOutClient{
174
clients: clients,
175
config: config,
176
opts: opts,
177
metrics: metrics,
178
}, nil
179
}
180
181
// Push implements the PusherServiceClient interface.
182
func (f *fanOutClient) Push(ctx context.Context, req *connect.Request[pushv1.PushRequest]) (*connect.Response[pushv1.PushResponse], error) {
183
// Don't flow the context down to the `run.Group`.
184
// We want to fan out to all even in case of failures to one.
185
var (
186
g run.Group
187
errs error
188
reqSize, profileCount = requestSize(req)
189
)
190
191
for i, client := range f.clients {
192
var (
193
client = client
194
i = i
195
backoff = backoff.New(ctx, backoff.Config{
196
MinBackoff: f.config.Endpoints[i].MinBackoff,
197
MaxBackoff: f.config.Endpoints[i].MaxBackoff,
198
MaxRetries: f.config.Endpoints[i].MaxBackoffRetries,
199
})
200
err error
201
)
202
g.Add(func() error {
203
req := connect.NewRequest(req.Msg)
204
for k, v := range f.config.Endpoints[i].Headers {
205
req.Header().Set(k, v)
206
}
207
for {
208
err = func() error {
209
ctx, cancel := context.WithTimeout(ctx, f.config.Endpoints[i].RemoteTimeout)
210
defer cancel()
211
212
_, err := client.Push(ctx, req)
213
return err
214
}()
215
if err == nil {
216
f.metrics.sentBytes.WithLabelValues(f.config.Endpoints[i].URL).Add(float64(reqSize))
217
f.metrics.sentProfiles.WithLabelValues(f.config.Endpoints[i].URL).Add(float64(profileCount))
218
break
219
}
220
level.Warn(f.opts.Logger).Log("msg", "failed to push to endpoint", "endpoint", f.config.Endpoints[i].URL, "err", err)
221
if !shouldRetry(err) {
222
break
223
}
224
backoff.Wait()
225
if !backoff.Ongoing() {
226
break
227
}
228
f.metrics.retries.WithLabelValues(f.config.Endpoints[i].URL).Inc()
229
}
230
if err != nil {
231
f.metrics.droppedBytes.WithLabelValues(f.config.Endpoints[i].URL).Add(float64(reqSize))
232
f.metrics.droppedProfiles.WithLabelValues(f.config.Endpoints[i].URL).Add(float64(profileCount))
233
level.Warn(f.opts.Logger).Log("msg", "final error sending to profiles to endpoint", "endpoint", f.config.Endpoints[i].URL, "err", err)
234
errs = multierr.Append(errs, err)
235
}
236
return err
237
}, func(err error) {})
238
}
239
if err := g.Run(); err != nil {
240
return nil, err
241
}
242
if errs != nil {
243
return nil, errs
244
}
245
return connect.NewResponse(&pushv1.PushResponse{}), nil
246
}
247
248
func shouldRetry(err error) bool {
249
if errors.Is(err, context.DeadlineExceeded) {
250
return true
251
}
252
switch connect.CodeOf(err) {
253
case connect.CodeDeadlineExceeded, connect.CodeUnknown,
254
connect.CodeResourceExhausted, connect.CodeInternal,
255
connect.CodeUnavailable, connect.CodeDataLoss, connect.CodeAborted:
256
return true
257
}
258
return false
259
}
260
261
func requestSize(req *connect.Request[pushv1.PushRequest]) (int64, int64) {
262
var size, profiles int64
263
for _, raw := range req.Msg.Series {
264
for _, sample := range raw.Samples {
265
size += int64(len(sample.RawProfile))
266
profiles++
267
}
268
}
269
return size, profiles
270
}
271
272
// Append implements the phlare.Appendable interface.
273
func (f *fanOutClient) Appender() phlare.Appender {
274
return f
275
}
276
277
// Append implements the Appender interface.
278
func (f *fanOutClient) Append(ctx context.Context, lbs labels.Labels, samples []*phlare.RawSample) error {
279
// todo(ctovena): we should probably pool the label pair arrays and label builder to avoid allocs.
280
var (
281
protoLabels = make([]*typesv1.LabelPair, 0, len(lbs)+len(f.config.ExternalLabels))
282
protoSamples = make([]*pushv1.RawSample, 0, len(samples))
283
lbsBuilder = labels.NewBuilder(nil)
284
)
285
286
for _, label := range lbs {
287
// only __name__ is required as a private label.
288
if strings.HasPrefix(label.Name, model.ReservedLabelPrefix) && label.Name != labels.MetricName {
289
continue
290
}
291
lbsBuilder.Set(label.Name, label.Value)
292
}
293
for name, value := range f.config.ExternalLabels {
294
lbsBuilder.Set(name, value)
295
}
296
for _, l := range lbsBuilder.Labels(lbs) {
297
protoLabels = append(protoLabels, &typesv1.LabelPair{
298
Name: l.Name,
299
Value: l.Value,
300
})
301
}
302
for _, sample := range samples {
303
protoSamples = append(protoSamples, &pushv1.RawSample{
304
RawProfile: sample.RawProfile,
305
})
306
}
307
// push to all clients
308
_, err := f.Push(ctx, connect.NewRequest(&pushv1.PushRequest{
309
Series: []*pushv1.RawProfileSeries{
310
{Labels: protoLabels, Samples: protoSamples},
311
},
312
}))
313
return err
314
}
315
316
// WithUserAgent returns a `connect.ClientOption` that sets the User-Agent header on.
317
func WithUserAgent(agent string) connect.ClientOption {
318
return connect.WithInterceptors(&agentInterceptor{agent})
319
}
320
321
type agentInterceptor struct {
322
agent string
323
}
324
325
func (i *agentInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc {
326
return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
327
req.Header().Set("User-Agent", i.agent)
328
return next(ctx, req)
329
}
330
}
331
332
func (i *agentInterceptor) WrapStreamingClient(next connect.StreamingClientFunc) connect.StreamingClientFunc {
333
return func(ctx context.Context, spec connect.Spec) connect.StreamingClientConn {
334
conn := next(ctx, spec)
335
conn.RequestHeader().Set("User-Agent", i.agent)
336
return conn
337
}
338
}
339
340
func (i *agentInterceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc) connect.StreamingHandlerFunc {
341
return next
342
}
343
344