package phlare
import (
"context"
"sync"
"time"
"github.com/hashicorp/go-multierror"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
)
var NoopAppendable = AppendableFunc(func(_ context.Context, _ labels.Labels, _ []*RawSample) error { return nil })
type Appendable interface {
Appender() Appender
}
type Appender interface {
Append(ctx context.Context, labels labels.Labels, samples []*RawSample) error
}
type RawSample struct {
RawProfile []byte
}
var _ Appendable = (*Fanout)(nil)
type Fanout struct {
mut sync.RWMutex
children []Appendable
componentID string
writeLatency prometheus.Histogram
}
func NewFanout(children []Appendable, componentID string, register prometheus.Registerer) *Fanout {
wl := prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "phlare_fanout_latency",
Help: "Write latency for sending to phlare profiles",
})
_ = register.Register(wl)
return &Fanout{
children: children,
componentID: componentID,
writeLatency: wl,
}
}
func (f *Fanout) UpdateChildren(children []Appendable) {
f.mut.Lock()
defer f.mut.Unlock()
f.children = children
}
func (f *Fanout) Children() []Appendable {
f.mut.Lock()
defer f.mut.Unlock()
return f.children
}
func (f *Fanout) Appender() Appender {
f.mut.RLock()
defer f.mut.RUnlock()
app := &appender{
children: make([]Appender, 0),
componentID: f.componentID,
writeLatency: f.writeLatency,
}
for _, x := range f.children {
if x == nil {
continue
}
app.children = append(app.children, x.Appender())
}
return app
}
var _ Appender = (*appender)(nil)
type appender struct {
children []Appender
componentID string
writeLatency prometheus.Histogram
}
func (a *appender) Append(ctx context.Context, labels labels.Labels, samples []*RawSample) error {
now := time.Now()
defer func() {
a.writeLatency.Observe(time.Since(now).Seconds())
}()
var multiErr error
for _, x := range a.children {
err := x.Append(ctx, labels, samples)
if err != nil {
multiErr = multierror.Append(multiErr, err)
}
}
return multiErr
}
type AppendableFunc func(ctx context.Context, labels labels.Labels, samples []*RawSample) error
func (f AppendableFunc) Append(ctx context.Context, labels labels.Labels, samples []*RawSample) error {
return f(ctx, labels, samples)
}
func (f AppendableFunc) Appender() Appender {
return f
}