Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/phlare/appender.go
4093 views
1
package phlare
2
3
import (
4
"context"
5
"sync"
6
"time"
7
8
"github.com/hashicorp/go-multierror"
9
"github.com/prometheus/client_golang/prometheus"
10
"github.com/prometheus/prometheus/model/labels"
11
)
12
13
var NoopAppendable = AppendableFunc(func(_ context.Context, _ labels.Labels, _ []*RawSample) error { return nil })
14
15
type Appendable interface {
16
Appender() Appender
17
}
18
19
type Appender interface {
20
Append(ctx context.Context, labels labels.Labels, samples []*RawSample) error
21
}
22
23
type RawSample struct {
24
// raw_profile is the set of bytes of the pprof profile
25
RawProfile []byte
26
}
27
28
var _ Appendable = (*Fanout)(nil)
29
30
// Fanout supports the default Flow style of appendables since it can go to multiple outputs. It also allows the intercepting of appends.
31
type Fanout struct {
32
mut sync.RWMutex
33
// children is where to fan out.
34
children []Appendable
35
// ComponentID is what component this belongs to.
36
componentID string
37
writeLatency prometheus.Histogram
38
}
39
40
// NewFanout creates a fanout appendable.
41
func NewFanout(children []Appendable, componentID string, register prometheus.Registerer) *Fanout {
42
wl := prometheus.NewHistogram(prometheus.HistogramOpts{
43
Name: "phlare_fanout_latency",
44
Help: "Write latency for sending to phlare profiles",
45
})
46
_ = register.Register(wl)
47
return &Fanout{
48
children: children,
49
componentID: componentID,
50
writeLatency: wl,
51
}
52
}
53
54
// UpdateChildren allows changing of the children of the fanout.
55
func (f *Fanout) UpdateChildren(children []Appendable) {
56
f.mut.Lock()
57
defer f.mut.Unlock()
58
f.children = children
59
}
60
61
// Children returns the children of the fanout.
62
func (f *Fanout) Children() []Appendable {
63
f.mut.Lock()
64
defer f.mut.Unlock()
65
return f.children
66
}
67
68
// Appender satisfies the Appendable interface.
69
func (f *Fanout) Appender() Appender {
70
f.mut.RLock()
71
defer f.mut.RUnlock()
72
73
app := &appender{
74
children: make([]Appender, 0),
75
componentID: f.componentID,
76
writeLatency: f.writeLatency,
77
}
78
for _, x := range f.children {
79
if x == nil {
80
continue
81
}
82
app.children = append(app.children, x.Appender())
83
}
84
return app
85
}
86
87
var _ Appender = (*appender)(nil)
88
89
type appender struct {
90
children []Appender
91
componentID string
92
writeLatency prometheus.Histogram
93
}
94
95
// Append satisfies the Appender interface.
96
func (a *appender) Append(ctx context.Context, labels labels.Labels, samples []*RawSample) error {
97
now := time.Now()
98
defer func() {
99
a.writeLatency.Observe(time.Since(now).Seconds())
100
}()
101
var multiErr error
102
for _, x := range a.children {
103
err := x.Append(ctx, labels, samples)
104
if err != nil {
105
multiErr = multierror.Append(multiErr, err)
106
}
107
}
108
return multiErr
109
}
110
111
type AppendableFunc func(ctx context.Context, labels labels.Labels, samples []*RawSample) error
112
113
func (f AppendableFunc) Append(ctx context.Context, labels labels.Labels, samples []*RawSample) error {
114
return f(ctx, labels, samples)
115
}
116
117
func (f AppendableFunc) Appender() Appender {
118
return f
119
}
120
121