Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/otelcol/exporter/prometheus/internal/convert/convert.go
4100 views
1
// Package convert implements conversion utilities to convert between
2
// OpenTelemetry Collector data and Prometheus data.
3
//
4
// It follows the [OpenTelemetry Metrics Data Model] for implementing the
5
// conversion.
6
//
7
// [OpenTelemetry Metrics Data Model]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/data-model.md
8
package convert
9
10
import (
11
"context"
12
"fmt"
13
"strconv"
14
"sync"
15
"time"
16
17
"github.com/go-kit/log"
18
"github.com/go-kit/log/level"
19
"github.com/prometheus/common/model"
20
"github.com/prometheus/prometheus/model/labels"
21
"github.com/prometheus/prometheus/model/metadata"
22
"github.com/prometheus/prometheus/model/textparse"
23
"github.com/prometheus/prometheus/model/value"
24
"github.com/prometheus/prometheus/storage"
25
"go.opentelemetry.io/collector/consumer"
26
"go.opentelemetry.io/collector/pdata/pcommon"
27
"go.opentelemetry.io/collector/pdata/pmetric"
28
semconv "go.opentelemetry.io/collector/semconv/v1.6.1"
29
30
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus"
31
)
32
33
var (
34
scopeNameLabel = "otel_scope_name"
35
scopeVersionLabel = "otel_scope_version"
36
)
37
38
// TODO(rfratto): Exemplars are not currently supported.
39
40
// Converter implements consumer.Metrics and converts received metrics
41
// into Prometheus-compatible metrics.
42
type Converter struct {
43
log log.Logger
44
45
optsMut sync.RWMutex
46
opts Options
47
48
seriesCache sync.Map // Cache of active series.
49
metadataCache sync.Map // Cache of active metadata entries.
50
51
next storage.Appendable // Location to write converted metrics.
52
}
53
54
// Options configure a Converter.
55
type Options struct {
56
// IncludeTargetInfo includes the target_info metric.
57
IncludeTargetInfo bool
58
// IncludeScopeInfo includes the otel_scope_info metric and adds
59
// otel_scope_name and otel_scope_version labels to data points.
60
IncludeScopeInfo bool
61
}
62
63
var _ consumer.Metrics = (*Converter)(nil)
64
65
// New returns a new Converter. Converted metrics are passed to the provided
66
// storage.Appendable implementation.
67
func New(l log.Logger, next storage.Appendable, opts Options) *Converter {
68
if l == nil {
69
l = log.NewNopLogger()
70
}
71
return &Converter{log: l, next: next, opts: opts}
72
}
73
74
// UpdateOptions updates the options for the Converter.
75
func (conv *Converter) UpdateOptions(opts Options) {
76
conv.optsMut.Lock()
77
defer conv.optsMut.Unlock()
78
conv.opts = opts
79
}
80
81
// getOpts gets a copy of the current options for the Converter.
82
func (conv *Converter) getOpts() Options {
83
conv.optsMut.RLock()
84
defer conv.optsMut.RUnlock()
85
return conv.opts
86
}
87
88
// Capabilities implements consumer.Metrics.
89
func (conv *Converter) Capabilities() consumer.Capabilities {
90
return consumer.Capabilities{
91
MutatesData: false,
92
}
93
}
94
95
// ConsumeMetrics converts the provided OpenTelemetry Collector-formatted
96
// metrics into Prometheus-compatible metrics. Each call to ConsumeMetrics
97
// requests a storage.Appender and will commit generated metrics to it at the
98
// end of the call.
99
//
100
// Metrics are tracked in memory over time. Call [*Converter.GC] to clean up
101
// stale metrics.
102
func (conv *Converter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
103
// NOTE(rfratto): OpenTelemetry Collector doesn't have any equivalent concept
104
// of storage.SeriesRef from Prometheus. This adds some extra CPU overhead in
105
// converting pmetric.Metrics to Prometheus data, since we'll always have to
106
// build a key to uniquely represent each data point.
107
//
108
// To reduce CPU and allocations as much as possible, each datapoint is
109
// tracked as an "active series." See memorySeries for information on what is
110
// cached.
111
112
app := conv.next.Appender(ctx)
113
114
for rcount := 0; rcount < md.ResourceMetrics().Len(); rcount++ {
115
rm := md.ResourceMetrics().At(rcount)
116
conv.consumeResourceMetrics(app, rm)
117
}
118
119
return app.Commit()
120
}
121
122
func (conv *Converter) consumeResourceMetrics(app storage.Appender, rm pmetric.ResourceMetrics) {
123
resourceMD := conv.createOrUpdateMetadata("target_info", metadata.Metadata{
124
Type: textparse.MetricTypeGauge,
125
Help: "Target metadata",
126
})
127
memResource := conv.getOrCreateResource(rm.Resource())
128
129
if conv.getOpts().IncludeTargetInfo {
130
if err := resourceMD.WriteTo(app, time.Now()); err != nil {
131
level.Warn(conv.log).Log("msg", "failed to write target_info metadata", "err", err)
132
}
133
if err := memResource.WriteTo(app, time.Now()); err != nil {
134
level.Error(conv.log).Log("msg", "failed to write target_info metric", "err", err)
135
}
136
}
137
138
for smcount := 0; smcount < rm.ScopeMetrics().Len(); smcount++ {
139
sm := rm.ScopeMetrics().At(smcount)
140
conv.consumeScopeMetrics(app, memResource, sm)
141
}
142
}
143
144
func (conv *Converter) createOrUpdateMetadata(name string, md metadata.Metadata) *memoryMetadata {
145
entry := &memoryMetadata{
146
Name: name,
147
}
148
if actual, loaded := conv.metadataCache.LoadOrStore(name, entry); loaded {
149
entry = actual.(*memoryMetadata)
150
}
151
152
entry.Update(md)
153
return entry
154
}
155
156
// getOrCreateResource gets or creates a [*memorySeries] from the provided
157
// res. The LastSeen field of the *memorySeries is updated before returning.
158
func (conv *Converter) getOrCreateResource(res pcommon.Resource) *memorySeries {
159
targetInfoLabels := labels.FromStrings(model.MetricNameLabel, "target_info")
160
161
var (
162
attrs = res.Attributes().Sort()
163
164
jobLabel string
165
instanceLabel string
166
)
167
168
if serviceName, ok := attrs.Get(semconv.AttributeServiceName); ok {
169
if serviceNamespace, ok := attrs.Get(semconv.AttributeServiceNamespace); ok {
170
jobLabel = fmt.Sprintf("%s/%s", serviceNamespace.AsString(), serviceName.AsString())
171
} else {
172
jobLabel = serviceName.AsString()
173
}
174
}
175
176
if instanceID, ok := attrs.Get(semconv.AttributeServiceInstanceID); ok {
177
instanceLabel = instanceID.AsString()
178
}
179
180
lb := labels.NewBuilder(targetInfoLabels)
181
lb.Set(model.JobLabel, jobLabel)
182
lb.Set(model.InstanceLabel, instanceLabel)
183
184
attrs.Range(func(k string, v pcommon.Value) bool {
185
// Skip attributes that we used for determining the job and instance
186
// labels.
187
if k == semconv.AttributeServiceName ||
188
k == semconv.AttributeServiceNamespace ||
189
k == semconv.AttributeServiceInstanceID {
190
191
return true
192
}
193
194
lb.Set(prometheus.NormalizeLabel(k), v.AsString())
195
return true
196
})
197
198
labels := lb.Labels(nil)
199
200
entry := newMemorySeries(map[string]string{
201
model.JobLabel: jobLabel,
202
model.InstanceLabel: instanceLabel,
203
}, labels)
204
if actual, loaded := conv.seriesCache.LoadOrStore(labels.String(), entry); loaded {
205
entry = actual.(*memorySeries)
206
}
207
208
entry.SetValue(1)
209
entry.Ping()
210
return entry
211
}
212
213
func (conv *Converter) consumeScopeMetrics(app storage.Appender, memResource *memorySeries, sm pmetric.ScopeMetrics) {
214
scopeMD := conv.createOrUpdateMetadata("otel_scope_info", metadata.Metadata{
215
Type: textparse.MetricTypeGauge,
216
})
217
memScope := conv.getOrCreateScope(memResource, sm.Scope())
218
219
if conv.getOpts().IncludeScopeInfo {
220
if err := scopeMD.WriteTo(app, time.Now()); err != nil {
221
level.Warn(conv.log).Log("msg", "failed to write otel_scope_info metadata", "err", err)
222
}
223
if err := memScope.WriteTo(app, time.Now()); err != nil {
224
level.Error(conv.log).Log("msg", "failed to write otel_scope_info metric", "err", err)
225
}
226
}
227
228
for mcount := 0; mcount < sm.Metrics().Len(); mcount++ {
229
m := sm.Metrics().At(mcount)
230
conv.consumeMetric(app, memResource, memScope, m)
231
}
232
}
233
234
// getOrCreateScope gets or creates a [*memorySeries] from the provided scope.
235
// The LastSeen field of the *memorySeries is updated before returning.
236
func (conv *Converter) getOrCreateScope(res *memorySeries, scope pcommon.InstrumentationScope) *memorySeries {
237
scopeInfoLabels := labels.FromStrings(
238
model.MetricNameLabel, "otel_scope_info",
239
model.JobLabel, res.metadata[model.JobLabel],
240
model.InstanceLabel, res.metadata[model.InstanceLabel],
241
"name", scope.Name(),
242
"version", scope.Version(),
243
)
244
245
lb := labels.NewBuilder(scopeInfoLabels)
246
scope.Attributes().Sort().Range(func(k string, v pcommon.Value) bool {
247
lb.Set(prometheus.NormalizeLabel(k), v.AsString())
248
return true
249
})
250
251
labels := lb.Labels(nil)
252
253
entry := newMemorySeries(map[string]string{
254
scopeNameLabel: scope.Name(),
255
scopeVersionLabel: scope.Version(),
256
}, labels)
257
if actual, loaded := conv.seriesCache.LoadOrStore(labels.String(), entry); loaded {
258
entry = actual.(*memorySeries)
259
}
260
261
entry.SetValue(1)
262
entry.Ping()
263
return entry
264
}
265
266
func (conv *Converter) consumeMetric(app storage.Appender, memResource *memorySeries, memScope *memorySeries, m pmetric.Metric) {
267
switch m.Type() {
268
case pmetric.MetricTypeGauge:
269
conv.consumeGauge(app, memResource, memScope, m)
270
case pmetric.MetricTypeSum:
271
conv.consumeSum(app, memResource, memScope, m)
272
case pmetric.MetricTypeHistogram:
273
conv.consumeHistogram(app, memResource, memScope, m)
274
case pmetric.MetricTypeSummary:
275
conv.consumeSummary(app, memResource, memScope, m)
276
}
277
}
278
279
func (conv *Converter) consumeGauge(app storage.Appender, memResource *memorySeries, memScope *memorySeries, m pmetric.Metric) {
280
metricName := prometheus.BuildPromCompliantName(m, "")
281
282
metricMD := conv.createOrUpdateMetadata(metricName, metadata.Metadata{
283
Type: textparse.MetricTypeGauge,
284
Unit: m.Unit(),
285
Help: m.Description(),
286
})
287
if err := metricMD.WriteTo(app, time.Now()); err != nil {
288
level.Warn(conv.log).Log("msg", "failed to write metric family metadata", "err", err)
289
}
290
291
for dpcount := 0; dpcount < m.Gauge().DataPoints().Len(); dpcount++ {
292
dp := m.Gauge().DataPoints().At(dpcount)
293
294
memSeries := conv.getOrCreateSeries(memResource, memScope, metricName, dp.Attributes())
295
if err := writeSeries(app, memSeries, dp, getNumberDataPointValue(dp)); err != nil {
296
level.Error(conv.log).Log("msg", "failed to write metric sample", "err", err)
297
}
298
}
299
}
300
301
type otelcolDataPoint interface {
302
Timestamp() pcommon.Timestamp
303
Flags() pmetric.DataPointFlags
304
}
305
306
func writeSeries(app storage.Appender, series *memorySeries, dp otelcolDataPoint, val float64) error {
307
ts := dp.Timestamp().AsTime()
308
if ts.Before(series.Timestamp()) {
309
// Out-of-order; skip.
310
return nil
311
}
312
series.SetTimestamp(ts)
313
314
if dp.Flags().NoRecordedValue() {
315
val = float64(value.StaleNaN)
316
}
317
series.SetValue(val)
318
319
return series.WriteTo(app, ts)
320
}
321
322
// getOrCreateSeries gets or creates a [*memorySeries] from the provided
323
// resource, scope, metric, and attributes. The LastSeen field of the
324
// *memorySeries is updated before returning.
325
func (conv *Converter) getOrCreateSeries(res *memorySeries, scope *memorySeries, name string, attrs pcommon.Map, extraLabels ...labels.Label) *memorySeries {
326
seriesBaseLabels := labels.FromStrings(
327
model.MetricNameLabel, name,
328
model.JobLabel, res.metadata[model.JobLabel],
329
model.InstanceLabel, res.metadata[model.InstanceLabel],
330
)
331
332
lb := labels.NewBuilder(seriesBaseLabels)
333
for _, extraLabel := range extraLabels {
334
lb.Set(extraLabel.Name, extraLabel.Value)
335
}
336
337
if conv.getOpts().IncludeScopeInfo {
338
lb.Set("otel_scope_name", scope.metadata[scopeNameLabel])
339
lb.Set("otel_scope_version", scope.metadata[scopeVersionLabel])
340
}
341
342
attrs.Sort().Range(func(k string, v pcommon.Value) bool {
343
lb.Set(prometheus.NormalizeLabel(k), v.AsString())
344
return true
345
})
346
347
labels := lb.Labels(nil)
348
349
entry := newMemorySeries(nil, labels)
350
if actual, loaded := conv.seriesCache.LoadOrStore(labels.String(), entry); loaded {
351
entry = actual.(*memorySeries)
352
}
353
354
entry.Ping()
355
return entry
356
}
357
358
func getNumberDataPointValue(dp pmetric.NumberDataPoint) float64 {
359
switch dp.ValueType() {
360
case pmetric.NumberDataPointValueTypeDouble:
361
return dp.DoubleValue()
362
case pmetric.NumberDataPointValueTypeInt:
363
return float64(dp.IntValue())
364
}
365
366
return 0
367
}
368
369
func (conv *Converter) consumeSum(app storage.Appender, memResource *memorySeries, memScope *memorySeries, m pmetric.Metric) {
370
metricName := prometheus.BuildPromCompliantName(m, "")
371
372
// Excerpt from the spec:
373
//
374
// * If the aggregation temporarlity is cumulative and sum is monotonic, it
375
// MUST be converted to a Prometheus Counter.
376
// * If the aggregation temporarlity is cumulative and sum is non-monotonic,
377
// it MUST be converted to a Prometheus Gauge.
378
// * If the aggregation temporarlity is delta and the sum is monotonic, it
379
// SHOULD be converted to a cumulative temporarlity and become a Prometheus
380
// Sum.
381
// * Otherwise, it MUST be dropped.
382
var convType textparse.MetricType
383
switch {
384
case m.Sum().AggregationTemporality() == pmetric.AggregationTemporalityCumulative && m.Sum().IsMonotonic():
385
convType = textparse.MetricTypeCounter
386
case m.Sum().AggregationTemporality() == pmetric.AggregationTemporalityCumulative && !m.Sum().IsMonotonic():
387
convType = textparse.MetricTypeGauge
388
case m.Sum().AggregationTemporality() == pmetric.AggregationTemporalityDelta && m.Sum().IsMonotonic():
389
// Drop non-cumulative summaries for now, which is permitted by the spec.
390
//
391
// TODO(rfratto): implement delta-to-cumulative for sums.
392
return
393
default:
394
// Drop the metric.
395
return
396
}
397
398
metricMD := conv.createOrUpdateMetadata(metricName, metadata.Metadata{
399
Type: convType,
400
Unit: m.Unit(),
401
Help: m.Description(),
402
})
403
if err := metricMD.WriteTo(app, time.Now()); err != nil {
404
level.Warn(conv.log).Log("msg", "failed to write metric family metadata", "err", err)
405
}
406
407
for dpcount := 0; dpcount < m.Sum().DataPoints().Len(); dpcount++ {
408
dp := m.Sum().DataPoints().At(dpcount)
409
410
memSeries := conv.getOrCreateSeries(memResource, memScope, metricName, dp.Attributes())
411
412
val := getNumberDataPointValue(dp)
413
if err := writeSeries(app, memSeries, dp, val); err != nil {
414
level.Error(conv.log).Log("msg", "failed to write metric sample", "err", err)
415
}
416
}
417
}
418
419
func (conv *Converter) consumeHistogram(app storage.Appender, memResource *memorySeries, memScope *memorySeries, m pmetric.Metric) {
420
metricName := prometheus.BuildPromCompliantName(m, "")
421
422
if m.Histogram().AggregationTemporality() != pmetric.AggregationTemporalityCumulative {
423
// Drop non-cumulative histograms for now, which is permitted by the spec.
424
//
425
// TODO(rfratto): implement delta-to-cumulative for histograms.
426
return
427
}
428
429
metricMD := conv.createOrUpdateMetadata(metricName, metadata.Metadata{
430
Type: textparse.MetricTypeHistogram,
431
Unit: m.Unit(),
432
Help: m.Description(),
433
})
434
if err := metricMD.WriteTo(app, time.Now()); err != nil {
435
level.Warn(conv.log).Log("msg", "failed to write metric family metadata", "err", err)
436
}
437
438
for dpcount := 0; dpcount < m.Histogram().DataPoints().Len(); dpcount++ {
439
dp := m.Histogram().DataPoints().At(dpcount)
440
441
// Sum metric
442
if dp.HasSum() {
443
sumMetric := conv.getOrCreateSeries(memResource, memScope, metricName+"_sum", dp.Attributes())
444
sumMetricVal := dp.Sum()
445
446
if err := writeSeries(app, sumMetric, dp, sumMetricVal); err != nil {
447
level.Error(conv.log).Log("msg", "failed to write histogram sum sample", "err", err)
448
}
449
}
450
451
// Count metric
452
{
453
countMetric := conv.getOrCreateSeries(memResource, memScope, metricName+"_count", dp.Attributes())
454
countMetricVal := float64(dp.Count())
455
456
if err := writeSeries(app, countMetric, dp, countMetricVal); err != nil {
457
level.Error(conv.log).Log("msg", "failed to write histogram count sample", "err", err)
458
}
459
}
460
461
// Process the boundaries. The number of buckets = number of explicit
462
// bounds + 1.
463
for i := 0; i < dp.ExplicitBounds().Len() && i < dp.BucketCounts().Len(); i++ {
464
bound := dp.ExplicitBounds().At(i)
465
count := dp.BucketCounts().At(i)
466
467
bucketLabel := labels.Label{
468
Name: model.BucketLabel,
469
Value: strconv.FormatFloat(bound, 'f', -1, 64),
470
}
471
472
bucket := conv.getOrCreateSeries(memResource, memScope, metricName+"_bucket", dp.Attributes(), bucketLabel)
473
bucketVal := float64(count)
474
475
if err := writeSeries(app, bucket, dp, bucketVal); err != nil {
476
level.Error(conv.log).Log("msg", "failed to write histogram bucket sample", "bucket", bucketLabel.Value, "err", err)
477
}
478
}
479
480
// Add le=+Inf bucket. All values are <= +Inf, so the value is the same as
481
// the count of the datapoint.
482
{
483
bucketLabel := labels.Label{
484
Name: model.BucketLabel,
485
Value: "+Inf",
486
}
487
488
infBucket := conv.getOrCreateSeries(memResource, memScope, metricName+"_bucket", dp.Attributes(), bucketLabel)
489
infBucketVal := float64(dp.Count())
490
491
if err := writeSeries(app, infBucket, dp, infBucketVal); err != nil {
492
level.Error(conv.log).Log("msg", "failed to write histogram bucket sample", "bucket", bucketLabel.Value, "err", err)
493
}
494
}
495
}
496
}
497
498
func (conv *Converter) consumeSummary(app storage.Appender, memResource *memorySeries, memScope *memorySeries, m pmetric.Metric) {
499
metricName := prometheus.BuildPromCompliantName(m, "")
500
501
metricMD := conv.createOrUpdateMetadata(metricName, metadata.Metadata{
502
Type: textparse.MetricTypeSummary,
503
Unit: m.Unit(),
504
Help: m.Description(),
505
})
506
if err := metricMD.WriteTo(app, time.Now()); err != nil {
507
level.Warn(conv.log).Log("msg", "failed to write metric family metadata", "err", err)
508
}
509
510
for dpcount := 0; dpcount < m.Summary().DataPoints().Len(); dpcount++ {
511
dp := m.Summary().DataPoints().At(dpcount)
512
513
// Sum metric
514
{
515
sumMetric := conv.getOrCreateSeries(memResource, memScope, metricName+"_sum", dp.Attributes())
516
sumMetricVal := dp.Sum()
517
518
if err := writeSeries(app, sumMetric, dp, sumMetricVal); err != nil {
519
level.Error(conv.log).Log("msg", "failed to write summary sum sample", "err", err)
520
}
521
}
522
523
// Count metric
524
{
525
countMetric := conv.getOrCreateSeries(memResource, memScope, metricName+"_count", dp.Attributes())
526
countMetricVal := float64(dp.Count())
527
528
if err := writeSeries(app, countMetric, dp, countMetricVal); err != nil {
529
level.Error(conv.log).Log("msg", "failed to write histogram count sample", "err", err)
530
}
531
}
532
533
// Quantiles
534
for i := 0; i < dp.QuantileValues().Len(); i++ {
535
qp := dp.QuantileValues().At(i)
536
537
quantileLabel := labels.Label{
538
Name: model.QuantileLabel,
539
Value: strconv.FormatFloat(qp.Quantile(), 'f', -1, 64),
540
}
541
542
quantile := conv.getOrCreateSeries(memResource, memScope, metricName, dp.Attributes(), quantileLabel)
543
quantileVal := qp.Value()
544
545
if err := writeSeries(app, quantile, dp, quantileVal); err != nil {
546
level.Error(conv.log).Log("msg", "failed to write histogram quantile sample", "quantile", quantileLabel.Value, "err", err)
547
}
548
}
549
}
550
}
551
552
// GC cleans up stale metrics which have not been updated in the time specified
553
// by staleTime.
554
func (conv *Converter) GC(staleTime time.Duration) {
555
now := time.Now()
556
557
// In the code below, we use TryLock as a small performance optimization.
558
//
559
// The garbage collector doesn't bother to wait for locks for anything in the
560
// cache; the lock being unavailable implies that the cached resource is
561
// still active.
562
563
conv.seriesCache.Range(func(key, value any) bool {
564
series := value.(*memorySeries)
565
if !series.TryLock() {
566
return true
567
}
568
defer series.Unlock()
569
570
if now.Sub(series.lastSeen) > staleTime {
571
conv.seriesCache.Delete(key)
572
}
573
return true
574
})
575
576
conv.metadataCache.Range(func(key, value any) bool {
577
series := value.(*memoryMetadata)
578
if !series.TryLock() {
579
return true
580
}
581
defer series.Unlock()
582
583
if now.Sub(series.lastSeen) > staleTime {
584
conv.seriesCache.Delete(key)
585
}
586
return true
587
})
588
}
589
590
// FlushMetadata empties out the metadata cache, forcing metadata to get
591
// rewritten.
592
func (conv *Converter) FlushMetadata() {
593
// TODO(rfratto): this is fairly inefficient since it'll require rebuilding
594
// all of the metadata for every active series. However, it's the easiest
595
// thing to do for now.
596
conv.metadataCache.Range(func(key, _ any) bool {
597
conv.metadataCache.Delete(key)
598
return true
599
})
600
}
601
602