Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/traces/remotewriteexporter/exporter.go
4096 views
1
package remotewriteexporter
2
3
import (
4
"context"
5
"fmt"
6
"strconv"
7
"strings"
8
"sync"
9
"time"
10
11
util "github.com/cortexproject/cortex/pkg/util/log"
12
"github.com/go-kit/log"
13
"github.com/go-kit/log/level"
14
"github.com/grafana/agent/pkg/metrics/instance"
15
"github.com/grafana/agent/pkg/traces/contextkeys"
16
"github.com/prometheus/prometheus/model/labels"
17
"go.opentelemetry.io/collector/component"
18
"go.opentelemetry.io/collector/consumer"
19
"go.opentelemetry.io/collector/pdata/pcommon"
20
"go.opentelemetry.io/collector/pdata/pmetric"
21
)
22
23
const (
24
nameLabelKey = "__name__"
25
sumSuffix = "sum"
26
countSuffix = "count"
27
bucketSuffix = "bucket"
28
leStr = "le"
29
infBucket = "+Inf"
30
noSuffix = ""
31
)
32
33
type datapoint struct {
34
ts int64
35
v float64
36
l labels.Labels
37
}
38
39
type remoteWriteExporter struct {
40
mtx sync.Mutex
41
42
close chan struct{}
43
closed chan struct{}
44
45
manager instance.Manager
46
promInstance string
47
48
constLabels labels.Labels
49
namespace string
50
51
seriesMap map[uint64]*datapoint
52
staleTime int64
53
lastFlush int64
54
loopInterval time.Duration
55
56
logger log.Logger
57
}
58
59
func newRemoteWriteExporter(cfg *Config) (component.MetricsExporter, error) {
60
logger := log.With(util.Logger, "component", "traces remote write exporter")
61
62
ls := make(labels.Labels, 0, len(cfg.ConstLabels))
63
64
for name, value := range cfg.ConstLabels {
65
ls = append(ls, labels.Label{Name: name, Value: value})
66
}
67
68
staleTime := (15 * time.Minute).Milliseconds()
69
if cfg.StaleTime > 0 {
70
staleTime = cfg.StaleTime.Milliseconds()
71
}
72
73
loopInterval := time.Second
74
if cfg.LoopInterval > 0 {
75
loopInterval = cfg.LoopInterval
76
}
77
78
return &remoteWriteExporter{
79
mtx: sync.Mutex{},
80
close: make(chan struct{}),
81
closed: make(chan struct{}),
82
constLabels: ls,
83
namespace: cfg.Namespace,
84
promInstance: cfg.PromInstance,
85
seriesMap: make(map[uint64]*datapoint),
86
staleTime: staleTime,
87
loopInterval: loopInterval,
88
logger: logger,
89
}, nil
90
}
91
92
func (e *remoteWriteExporter) Start(ctx context.Context, _ component.Host) error {
93
manager, ok := ctx.Value(contextkeys.Metrics).(instance.Manager)
94
if !ok || manager == nil {
95
return fmt.Errorf("key does not contain a InstanceManager instance")
96
}
97
e.manager = manager
98
99
go e.appenderLoop()
100
101
return nil
102
}
103
104
func (e *remoteWriteExporter) Shutdown(ctx context.Context) error {
105
close(e.close)
106
107
select {
108
case <-e.closed:
109
return nil
110
case <-ctx.Done():
111
return ctx.Err()
112
}
113
}
114
115
func (e *remoteWriteExporter) Capabilities() consumer.Capabilities {
116
return consumer.Capabilities{}
117
}
118
119
func (e *remoteWriteExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
120
select {
121
case <-e.closed:
122
return nil
123
default:
124
}
125
126
resourceMetrics := md.ResourceMetrics()
127
for i := 0; i < resourceMetrics.Len(); i++ {
128
resourceMetric := resourceMetrics.At(i)
129
scopeMetricsSlice := resourceMetric.ScopeMetrics()
130
for j := 0; j < scopeMetricsSlice.Len(); j++ {
131
metricSlice := scopeMetricsSlice.At(j).Metrics()
132
for k := 0; k < metricSlice.Len(); k++ {
133
switch metric := metricSlice.At(k); metric.Type() {
134
case pmetric.MetricTypeGauge:
135
dataPoints := metric.Sum().DataPoints()
136
if err := e.handleNumberDataPoints(metric.Name(), dataPoints); err != nil {
137
return err
138
}
139
case pmetric.MetricTypeSum:
140
if metric.Sum().AggregationTemporality() != pmetric.AggregationTemporalityCumulative {
141
continue // Only cumulative metrics are supported
142
}
143
dataPoints := metric.Sum().DataPoints()
144
if err := e.handleNumberDataPoints(metric.Name(), dataPoints); err != nil {
145
return err
146
}
147
case pmetric.MetricTypeHistogram:
148
if metric.Histogram().AggregationTemporality() != pmetric.AggregationTemporalityCumulative {
149
continue // Only cumulative metrics are supported
150
}
151
dataPoints := metric.Histogram().DataPoints()
152
e.handleHistogramDataPoints(metric.Name(), dataPoints)
153
case pmetric.MetricTypeSummary:
154
return fmt.Errorf("unsupported metric data type %s", metric.Type())
155
default:
156
return fmt.Errorf("unsupported metric data type %s", metric.Type())
157
}
158
}
159
}
160
}
161
162
return nil
163
}
164
165
func (e *remoteWriteExporter) handleNumberDataPoints(name string, dataPoints pmetric.NumberDataPointSlice) error {
166
for ix := 0; ix < dataPoints.Len(); ix++ {
167
dataPoint := dataPoints.At(ix)
168
lbls := e.createLabelSet(name, noSuffix, dataPoint.Attributes(), labels.Labels{})
169
if err := e.appendNumberDataPoint(dataPoint, lbls); err != nil {
170
return fmt.Errorf("failed to process datapoints %s", err)
171
}
172
}
173
return nil
174
}
175
176
func (e *remoteWriteExporter) appendNumberDataPoint(dataPoint pmetric.NumberDataPoint, labels labels.Labels) error {
177
var val float64
178
switch dataPoint.ValueType() {
179
case pmetric.NumberDataPointValueTypeDouble:
180
val = dataPoint.DoubleValue()
181
case pmetric.NumberDataPointValueTypeInt:
182
val = float64(dataPoint.IntValue())
183
default:
184
return fmt.Errorf("unknown data point type: %s", dataPoint.ValueType())
185
}
186
ts := e.timestamp()
187
188
e.appendDatapointForSeries(labels, ts, val)
189
190
return nil
191
}
192
193
func (e *remoteWriteExporter) handleHistogramDataPoints(name string, dataPoints pmetric.HistogramDataPointSlice) {
194
for ix := 0; ix < dataPoints.Len(); ix++ {
195
dataPoint := dataPoints.At(ix)
196
ts := e.timestamp()
197
198
// Append sum value
199
sumLabels := e.createLabelSet(name, sumSuffix, dataPoint.Attributes(), labels.Labels{})
200
e.appendDatapointForSeries(sumLabels, ts, dataPoint.Sum())
201
202
// Append count value
203
countLabels := e.createLabelSet(name, countSuffix, dataPoint.Attributes(), labels.Labels{})
204
e.appendDatapointForSeries(countLabels, ts, float64(dataPoint.Count()))
205
206
var cumulativeCount uint64
207
for ix := 0; ix < dataPoint.ExplicitBounds().Len(); ix++ {
208
eb := dataPoint.ExplicitBounds().At(ix)
209
210
if ix >= dataPoint.BucketCounts().Len() {
211
break
212
}
213
cumulativeCount += dataPoint.BucketCounts().At(ix)
214
boundStr := strconv.FormatFloat(eb, 'f', -1, 64)
215
bucketLabels := e.createLabelSet(name, bucketSuffix, dataPoint.Attributes(), labels.Labels{{Name: leStr, Value: boundStr}})
216
e.appendDatapointForSeries(bucketLabels, ts, float64(cumulativeCount))
217
}
218
219
// add le=+Inf bucket
220
cumulativeCount += dataPoint.BucketCounts().At(dataPoint.BucketCounts().Len() - 1)
221
infBucketLabels := e.createLabelSet(name, bucketSuffix, dataPoint.Attributes(), labels.Labels{{Name: leStr, Value: infBucket}})
222
e.appendDatapointForSeries(infBucketLabels, ts, float64(cumulativeCount))
223
}
224
}
225
226
func (e *remoteWriteExporter) appendDatapointForSeries(l labels.Labels, ts int64, v float64) {
227
e.mtx.Lock()
228
defer e.mtx.Unlock()
229
230
series := l.Hash()
231
if lastDatapoint, ok := e.seriesMap[series]; ok {
232
if lastDatapoint.ts >= ts {
233
return
234
}
235
lastDatapoint.ts = ts
236
lastDatapoint.v = v
237
return
238
}
239
240
e.seriesMap[series] = &datapoint{l: l, ts: ts, v: v}
241
}
242
243
func (e *remoteWriteExporter) appenderLoop() {
244
t := time.NewTicker(e.loopInterval)
245
246
for {
247
select {
248
case <-t.C:
249
e.mtx.Lock()
250
inst, err := e.manager.GetInstance(e.promInstance)
251
if err != nil {
252
level.Error(e.logger).Log("msg", "failed to get prom instance", "err", err)
253
continue
254
}
255
appender := inst.Appender(context.Background())
256
257
now := time.Now().UnixMilli()
258
for _, dp := range e.seriesMap {
259
// If the datapoint hasn't been updated since the last loop, don't append it
260
if dp.ts < e.lastFlush {
261
// If the datapoint is older than now - staleTime, it is stale and gets removed.
262
if now-dp.ts > e.staleTime {
263
delete(e.seriesMap, dp.l.Hash())
264
}
265
continue
266
}
267
268
if _, err := appender.Append(0, dp.l, dp.ts, dp.v); err != nil {
269
level.Error(e.logger).Log("msg", "failed to append datapoint", "err", err)
270
}
271
}
272
273
if err := appender.Commit(); err != nil {
274
level.Error(e.logger).Log("msg", "failed to commit appender", "err", err)
275
}
276
277
e.lastFlush = now
278
e.mtx.Unlock()
279
280
case <-e.close:
281
close(e.closed)
282
return
283
}
284
}
285
}
286
287
func (e *remoteWriteExporter) createLabelSet(name, suffix string, labelMap pcommon.Map, customLabels labels.Labels) labels.Labels {
288
ls := make(labels.Labels, 0, labelMap.Len()+1+len(e.constLabels)+len(customLabels))
289
// Labels from spanmetrics processor
290
labelMap.Range(func(k string, v pcommon.Value) bool {
291
ls = append(ls, labels.Label{
292
Name: strings.Replace(k, ".", "_", -1),
293
Value: v.Str(),
294
})
295
return true
296
})
297
// Metric name label
298
ls = append(ls, labels.Label{
299
Name: nameLabelKey,
300
Value: metricName(e.namespace, name, suffix),
301
})
302
// Const labels
303
ls = append(ls, e.constLabels...)
304
// Custom labels
305
ls = append(ls, customLabels...)
306
return ls
307
}
308
309
func (e *remoteWriteExporter) timestamp() int64 {
310
return time.Now().UnixMilli()
311
}
312
313
func metricName(namespace, metric, suffix string) string {
314
if len(suffix) != 0 {
315
return fmt.Sprintf("%s_%s_%s", namespace, metric, suffix)
316
}
317
return fmt.Sprintf("%s_%s", namespace, metric)
318
}
319
320