Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/prometheus/fanout.go
4093 views
1
package prometheus
2
3
import (
4
"context"
5
"sync"
6
"time"
7
8
"github.com/prometheus/client_golang/prometheus"
9
10
"github.com/hashicorp/go-multierror"
11
12
"github.com/prometheus/prometheus/model/exemplar"
13
"github.com/prometheus/prometheus/model/histogram"
14
"github.com/prometheus/prometheus/model/labels"
15
"github.com/prometheus/prometheus/model/metadata"
16
"github.com/prometheus/prometheus/scrape"
17
18
"github.com/prometheus/prometheus/storage"
19
)
20
21
var _ storage.Appendable = (*Fanout)(nil)
22
23
// Fanout supports the default Flow style of appendables since it can go to multiple outputs. It also allows the intercepting of appends.
24
type Fanout struct {
25
mut sync.RWMutex
26
// children is where to fan out.
27
children []storage.Appendable
28
// ComponentID is what component this belongs to.
29
componentID string
30
writeLatency prometheus.Histogram
31
samplesCounter prometheus.Counter
32
}
33
34
// NewFanout creates a fanout appendable.
35
func NewFanout(children []storage.Appendable, componentID string, register prometheus.Registerer) *Fanout {
36
wl := prometheus.NewHistogram(prometheus.HistogramOpts{
37
Name: "agent_prometheus_fanout_latency",
38
Help: "Write latency for sending to direct and indirect components",
39
})
40
_ = register.Register(wl)
41
42
s := prometheus.NewCounter(prometheus.CounterOpts{
43
Name: "agent_prometheus_forwarded_samples_total",
44
Help: "Total number of samples sent to downstream components.",
45
})
46
_ = register.Register(s)
47
48
return &Fanout{
49
children: children,
50
componentID: componentID,
51
writeLatency: wl,
52
samplesCounter: s,
53
}
54
}
55
56
// UpdateChildren allows changing of the children of the fanout.
57
func (f *Fanout) UpdateChildren(children []storage.Appendable) {
58
f.mut.Lock()
59
defer f.mut.Unlock()
60
f.children = children
61
}
62
63
// Appender satisfies the Appendable interface.
64
func (f *Fanout) Appender(ctx context.Context) storage.Appender {
65
f.mut.RLock()
66
defer f.mut.RUnlock()
67
68
// TODO(@tpaschalis): The `otelcol.receiver.prometheus` component reuses
69
// code from the prometheusreceiver which expects the Appender context to
70
// be contain both a scrape target and a metadata store, and fails the
71
// conversion if they are missing. We should find a way around this as both
72
// Targets and Metadata will be handled in a different way in Flow.
73
ctx = scrape.ContextWithTarget(ctx, &scrape.Target{})
74
ctx = scrape.ContextWithMetricMetadataStore(ctx, NoopMetadataStore{})
75
76
app := &appender{
77
children: make([]storage.Appender, 0),
78
componentID: f.componentID,
79
writeLatency: f.writeLatency,
80
samplesCounter: f.samplesCounter,
81
}
82
83
for _, x := range f.children {
84
if x == nil {
85
continue
86
}
87
app.children = append(app.children, x.Appender(ctx))
88
}
89
return app
90
}
91
92
type appender struct {
93
children []storage.Appender
94
componentID string
95
writeLatency prometheus.Histogram
96
samplesCounter prometheus.Counter
97
start time.Time
98
}
99
100
var _ storage.Appender = (*appender)(nil)
101
102
// Append satisfies the Appender interface.
103
func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
104
if a.start.IsZero() {
105
a.start = time.Now()
106
}
107
if ref == 0 {
108
ref = storage.SeriesRef(GlobalRefMapping.GetOrAddGlobalRefID(l))
109
}
110
var multiErr error
111
updated := false
112
for _, x := range a.children {
113
_, err := x.Append(ref, l, t, v)
114
if err != nil {
115
multiErr = multierror.Append(multiErr, err)
116
} else {
117
updated = true
118
}
119
}
120
if updated {
121
a.samplesCounter.Inc()
122
}
123
return ref, multiErr
124
}
125
126
// Commit satisfies the Appender interface.
127
func (a *appender) Commit() error {
128
defer a.recordLatency()
129
var multiErr error
130
for _, x := range a.children {
131
err := x.Commit()
132
if err != nil {
133
multiErr = multierror.Append(multiErr, err)
134
}
135
}
136
return multiErr
137
}
138
139
// Rollback satisfies the Appender interface.
140
func (a *appender) Rollback() error {
141
defer a.recordLatency()
142
var multiErr error
143
for _, x := range a.children {
144
err := x.Rollback()
145
if err != nil {
146
multiErr = multierror.Append(multiErr, err)
147
}
148
}
149
return multiErr
150
}
151
152
func (a *appender) recordLatency() {
153
if a.start.IsZero() {
154
return
155
}
156
duration := time.Since(a.start)
157
a.writeLatency.Observe(duration.Seconds())
158
}
159
160
// AppendExemplar satisfies the Appender interface.
161
func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
162
if a.start.IsZero() {
163
a.start = time.Now()
164
}
165
if ref == 0 {
166
ref = storage.SeriesRef(GlobalRefMapping.GetOrAddGlobalRefID(l))
167
}
168
var multiErr error
169
for _, x := range a.children {
170
_, err := x.AppendExemplar(ref, l, e)
171
if err != nil {
172
multiErr = multierror.Append(multiErr, err)
173
}
174
}
175
return ref, multiErr
176
}
177
178
// UpdateMetadata satisfies the Appender interface.
179
func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
180
if a.start.IsZero() {
181
a.start = time.Now()
182
}
183
if ref == 0 {
184
ref = storage.SeriesRef(GlobalRefMapping.GetOrAddGlobalRefID(l))
185
}
186
var multiErr error
187
for _, x := range a.children {
188
_, err := x.UpdateMetadata(ref, l, m)
189
if err != nil {
190
multiErr = multierror.Append(multiErr, err)
191
}
192
}
193
return ref, multiErr
194
}
195
196
func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
197
if a.start.IsZero() {
198
a.start = time.Now()
199
}
200
if ref == 0 {
201
ref = storage.SeriesRef(GlobalRefMapping.GetOrAddGlobalRefID(l))
202
}
203
var multiErr error
204
for _, x := range a.children {
205
_, err := x.AppendHistogram(ref, l, t, h, fh)
206
if err != nil {
207
multiErr = multierror.Append(multiErr, err)
208
}
209
}
210
return ref, multiErr
211
}
212
213
// NoopMetadataStore implements the MetricMetadataStore interface.
214
type NoopMetadataStore map[string]scrape.MetricMetadata
215
216
// GetMetadata implements the MetricMetadataStore interface.
217
func (ms NoopMetadataStore) GetMetadata(familyName string) (scrape.MetricMetadata, bool) {
218
return scrape.MetricMetadata{}, false
219
}
220
221
// ListMetadata implements the MetricMetadataStore interface.
222
func (ms NoopMetadataStore) ListMetadata() []scrape.MetricMetadata { return nil }
223
224
// SizeMetadata implements the MetricMetadataStore interface.
225
func (ms NoopMetadataStore) SizeMetadata() int { return 0 }
226
227
// LengthMetadata implements the MetricMetadataStore interface.
228
func (ms NoopMetadataStore) LengthMetadata() int { return 0 }
229
230