Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/prometheus/interceptor.go
4093 views
1
package prometheus
2
3
import (
4
"context"
5
6
"github.com/prometheus/prometheus/model/exemplar"
7
"github.com/prometheus/prometheus/model/histogram"
8
"github.com/prometheus/prometheus/model/labels"
9
"github.com/prometheus/prometheus/model/metadata"
10
"github.com/prometheus/prometheus/storage"
11
)
12
13
// Interceptor is a storage.Appendable which invokes callback functions upon
14
// getting data. Interceptor should not be modified once created. All callback
15
// fields are optional.
16
type Interceptor struct {
17
onAppend func(ref storage.SeriesRef, l labels.Labels, t int64, v float64, next storage.Appender) (storage.SeriesRef, error)
18
onAppendExemplar func(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar, next storage.Appender) (storage.SeriesRef, error)
19
onUpdateMetadata func(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata, next storage.Appender) (storage.SeriesRef, error)
20
onAppendHistogram func(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram, next storage.Appender) (storage.SeriesRef, error)
21
22
// next is the next appendable to pass in the chain.
23
next storage.Appendable
24
}
25
26
var _ storage.Appendable = (*Interceptor)(nil)
27
28
// NewInterceptor creates a new Interceptor storage.Appendable. Options can be
29
// provided to NewInterceptor to install custom hooks for different methods.
30
func NewInterceptor(next storage.Appendable, opts ...InterceptorOption) *Interceptor {
31
i := &Interceptor{next: next}
32
for _, opt := range opts {
33
opt(i)
34
}
35
return i
36
}
37
38
// InterceptorOption is an option argument passed to NewInterceptor.
39
type InterceptorOption func(*Interceptor)
40
41
// WithAppendHook returns an InterceptorOption which hooks into calls to
42
// Append.
43
func WithAppendHook(f func(ref storage.SeriesRef, l labels.Labels, t int64, v float64, next storage.Appender) (storage.SeriesRef, error)) InterceptorOption {
44
return func(i *Interceptor) {
45
i.onAppend = f
46
}
47
}
48
49
// WithExemplarHook returns an InterceptorOption which hooks into calls to
50
// AppendExemplar.
51
func WithExemplarHook(f func(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar, next storage.Appender) (storage.SeriesRef, error)) InterceptorOption {
52
return func(i *Interceptor) {
53
i.onAppendExemplar = f
54
}
55
}
56
57
// WithMetadataHook returns an InterceptorOption which hooks into calls to
58
// UpdateMetadata.
59
func WithMetadataHook(f func(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata, next storage.Appender) (storage.SeriesRef, error)) InterceptorOption {
60
return func(i *Interceptor) {
61
i.onUpdateMetadata = f
62
}
63
}
64
65
// WithAppendHistogram returns an InterceptorOption which hooks into calls to
66
// AppendHistogram.
67
func WithAppendHistogram(f func(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram, next storage.Appender) (storage.SeriesRef, error)) InterceptorOption {
68
return func(i *Interceptor) {
69
i.onAppendHistogram = f
70
}
71
}
72
73
// Appender satisfies the Appendable interface.
74
func (f *Interceptor) Appender(ctx context.Context) storage.Appender {
75
app := &interceptappender{
76
interceptor: f,
77
}
78
if f.next != nil {
79
app.child = f.next.Appender(ctx)
80
}
81
return app
82
}
83
84
type interceptappender struct {
85
interceptor *Interceptor
86
child storage.Appender
87
}
88
89
var _ storage.Appender = (*interceptappender)(nil)
90
91
// Append satisfies the Appender interface.
92
func (a *interceptappender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
93
if ref == 0 {
94
ref = storage.SeriesRef(GlobalRefMapping.GetOrAddGlobalRefID(l))
95
}
96
97
if a.interceptor.onAppend != nil {
98
return a.interceptor.onAppend(ref, l, t, v, a.child)
99
}
100
return a.child.Append(ref, l, t, v)
101
}
102
103
// Commit satisfies the Appender interface.
104
func (a *interceptappender) Commit() error {
105
if a.child == nil {
106
return nil
107
}
108
return a.child.Commit()
109
}
110
111
// Rollback satisfies the Appender interface.
112
func (a *interceptappender) Rollback() error {
113
if a.child == nil {
114
return nil
115
}
116
return a.child.Rollback()
117
}
118
119
// AppendExemplar satisfies the Appender interface.
120
func (a *interceptappender) AppendExemplar(
121
ref storage.SeriesRef,
122
l labels.Labels,
123
e exemplar.Exemplar,
124
) (storage.SeriesRef, error) {
125
126
if ref == 0 {
127
ref = storage.SeriesRef(GlobalRefMapping.GetOrAddGlobalRefID(l))
128
}
129
130
if a.interceptor.onAppendExemplar != nil {
131
return a.interceptor.onAppendExemplar(ref, l, e, a.child)
132
}
133
return a.child.AppendExemplar(ref, l, e)
134
}
135
136
// UpdateMetadata satisfies the Appender interface.
137
func (a *interceptappender) UpdateMetadata(
138
ref storage.SeriesRef,
139
l labels.Labels,
140
m metadata.Metadata,
141
) (storage.SeriesRef, error) {
142
143
if ref == 0 {
144
ref = storage.SeriesRef(GlobalRefMapping.GetOrAddGlobalRefID(l))
145
}
146
147
if a.interceptor.onUpdateMetadata != nil {
148
return a.interceptor.onUpdateMetadata(ref, l, m, a.child)
149
}
150
return a.child.UpdateMetadata(ref, l, m)
151
}
152
153
func (a *interceptappender) AppendHistogram(
154
ref storage.SeriesRef,
155
l labels.Labels,
156
t int64,
157
h *histogram.Histogram,
158
fh *histogram.FloatHistogram,
159
) (storage.SeriesRef, error) {
160
161
if ref == 0 {
162
ref = storage.SeriesRef(GlobalRefMapping.GetOrAddGlobalRefID(l))
163
}
164
165
if a.interceptor.onAppendHistogram != nil {
166
return a.interceptor.onAppendHistogram(ref, l, t, h, fh, a.child)
167
}
168
return a.child.AppendHistogram(ref, l, t, h, fh)
169
}
170
171