Path: blob/main/component/otelcol/exporter/prometheus/internal/convert/convert.go
4100 views
// Package convert implements conversion utilities to convert between1// OpenTelemetry Collector data and Prometheus data.2//3// It follows the [OpenTelemetry Metrics Data Model] for implementing the4// conversion.5//6// [OpenTelemetry Metrics Data Model]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/data-model.md7package convert89import (10"context"11"fmt"12"strconv"13"sync"14"time"1516"github.com/go-kit/log"17"github.com/go-kit/log/level"18"github.com/prometheus/common/model"19"github.com/prometheus/prometheus/model/labels"20"github.com/prometheus/prometheus/model/metadata"21"github.com/prometheus/prometheus/model/textparse"22"github.com/prometheus/prometheus/model/value"23"github.com/prometheus/prometheus/storage"24"go.opentelemetry.io/collector/consumer"25"go.opentelemetry.io/collector/pdata/pcommon"26"go.opentelemetry.io/collector/pdata/pmetric"27semconv "go.opentelemetry.io/collector/semconv/v1.6.1"2829"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus"30)3132var (33scopeNameLabel = "otel_scope_name"34scopeVersionLabel = "otel_scope_version"35)3637// TODO(rfratto): Exemplars are not currently supported.3839// Converter implements consumer.Metrics and converts received metrics40// into Prometheus-compatible metrics.41type Converter struct {42log log.Logger4344optsMut sync.RWMutex45opts Options4647seriesCache sync.Map // Cache of active series.48metadataCache sync.Map // Cache of active metadata entries.4950next storage.Appendable // Location to write converted metrics.51}5253// Options configure a Converter.54type Options struct {55// IncludeTargetInfo includes the target_info metric.56IncludeTargetInfo bool57// IncludeScopeInfo includes the otel_scope_info metric and adds58// otel_scope_name and otel_scope_version labels to data points.59IncludeScopeInfo bool60}6162var _ consumer.Metrics = (*Converter)(nil)6364// New returns a new Converter. Converted metrics are passed to the provided65// storage.Appendable implementation.66func New(l log.Logger, next storage.Appendable, opts Options) *Converter {67if l == nil {68l = log.NewNopLogger()69}70return &Converter{log: l, next: next, opts: opts}71}7273// UpdateOptions updates the options for the Converter.74func (conv *Converter) UpdateOptions(opts Options) {75conv.optsMut.Lock()76defer conv.optsMut.Unlock()77conv.opts = opts78}7980// getOpts gets a copy of the current options for the Converter.81func (conv *Converter) getOpts() Options {82conv.optsMut.RLock()83defer conv.optsMut.RUnlock()84return conv.opts85}8687// Capabilities implements consumer.Metrics.88func (conv *Converter) Capabilities() consumer.Capabilities {89return consumer.Capabilities{90MutatesData: false,91}92}9394// ConsumeMetrics converts the provided OpenTelemetry Collector-formatted95// metrics into Prometheus-compatible metrics. Each call to ConsumeMetrics96// requests a storage.Appender and will commit generated metrics to it at the97// end of the call.98//99// Metrics are tracked in memory over time. Call [*Converter.GC] to clean up100// stale metrics.101func (conv *Converter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {102// NOTE(rfratto): OpenTelemetry Collector doesn't have any equivalent concept103// of storage.SeriesRef from Prometheus. This adds some extra CPU overhead in104// converting pmetric.Metrics to Prometheus data, since we'll always have to105// build a key to uniquely represent each data point.106//107// To reduce CPU and allocations as much as possible, each datapoint is108// tracked as an "active series." See memorySeries for information on what is109// cached.110111app := conv.next.Appender(ctx)112113for rcount := 0; rcount < md.ResourceMetrics().Len(); rcount++ {114rm := md.ResourceMetrics().At(rcount)115conv.consumeResourceMetrics(app, rm)116}117118return app.Commit()119}120121func (conv *Converter) consumeResourceMetrics(app storage.Appender, rm pmetric.ResourceMetrics) {122resourceMD := conv.createOrUpdateMetadata("target_info", metadata.Metadata{123Type: textparse.MetricTypeGauge,124Help: "Target metadata",125})126memResource := conv.getOrCreateResource(rm.Resource())127128if conv.getOpts().IncludeTargetInfo {129if err := resourceMD.WriteTo(app, time.Now()); err != nil {130level.Warn(conv.log).Log("msg", "failed to write target_info metadata", "err", err)131}132if err := memResource.WriteTo(app, time.Now()); err != nil {133level.Error(conv.log).Log("msg", "failed to write target_info metric", "err", err)134}135}136137for smcount := 0; smcount < rm.ScopeMetrics().Len(); smcount++ {138sm := rm.ScopeMetrics().At(smcount)139conv.consumeScopeMetrics(app, memResource, sm)140}141}142143func (conv *Converter) createOrUpdateMetadata(name string, md metadata.Metadata) *memoryMetadata {144entry := &memoryMetadata{145Name: name,146}147if actual, loaded := conv.metadataCache.LoadOrStore(name, entry); loaded {148entry = actual.(*memoryMetadata)149}150151entry.Update(md)152return entry153}154155// getOrCreateResource gets or creates a [*memorySeries] from the provided156// res. The LastSeen field of the *memorySeries is updated before returning.157func (conv *Converter) getOrCreateResource(res pcommon.Resource) *memorySeries {158targetInfoLabels := labels.FromStrings(model.MetricNameLabel, "target_info")159160var (161attrs = res.Attributes().Sort()162163jobLabel string164instanceLabel string165)166167if serviceName, ok := attrs.Get(semconv.AttributeServiceName); ok {168if serviceNamespace, ok := attrs.Get(semconv.AttributeServiceNamespace); ok {169jobLabel = fmt.Sprintf("%s/%s", serviceNamespace.AsString(), serviceName.AsString())170} else {171jobLabel = serviceName.AsString()172}173}174175if instanceID, ok := attrs.Get(semconv.AttributeServiceInstanceID); ok {176instanceLabel = instanceID.AsString()177}178179lb := labels.NewBuilder(targetInfoLabels)180lb.Set(model.JobLabel, jobLabel)181lb.Set(model.InstanceLabel, instanceLabel)182183attrs.Range(func(k string, v pcommon.Value) bool {184// Skip attributes that we used for determining the job and instance185// labels.186if k == semconv.AttributeServiceName ||187k == semconv.AttributeServiceNamespace ||188k == semconv.AttributeServiceInstanceID {189190return true191}192193lb.Set(prometheus.NormalizeLabel(k), v.AsString())194return true195})196197labels := lb.Labels(nil)198199entry := newMemorySeries(map[string]string{200model.JobLabel: jobLabel,201model.InstanceLabel: instanceLabel,202}, labels)203if actual, loaded := conv.seriesCache.LoadOrStore(labels.String(), entry); loaded {204entry = actual.(*memorySeries)205}206207entry.SetValue(1)208entry.Ping()209return entry210}211212func (conv *Converter) consumeScopeMetrics(app storage.Appender, memResource *memorySeries, sm pmetric.ScopeMetrics) {213scopeMD := conv.createOrUpdateMetadata("otel_scope_info", metadata.Metadata{214Type: textparse.MetricTypeGauge,215})216memScope := conv.getOrCreateScope(memResource, sm.Scope())217218if conv.getOpts().IncludeScopeInfo {219if err := scopeMD.WriteTo(app, time.Now()); err != nil {220level.Warn(conv.log).Log("msg", "failed to write otel_scope_info metadata", "err", err)221}222if err := memScope.WriteTo(app, time.Now()); err != nil {223level.Error(conv.log).Log("msg", "failed to write otel_scope_info metric", "err", err)224}225}226227for mcount := 0; mcount < sm.Metrics().Len(); mcount++ {228m := sm.Metrics().At(mcount)229conv.consumeMetric(app, memResource, memScope, m)230}231}232233// getOrCreateScope gets or creates a [*memorySeries] from the provided scope.234// The LastSeen field of the *memorySeries is updated before returning.235func (conv *Converter) getOrCreateScope(res *memorySeries, scope pcommon.InstrumentationScope) *memorySeries {236scopeInfoLabels := labels.FromStrings(237model.MetricNameLabel, "otel_scope_info",238model.JobLabel, res.metadata[model.JobLabel],239model.InstanceLabel, res.metadata[model.InstanceLabel],240"name", scope.Name(),241"version", scope.Version(),242)243244lb := labels.NewBuilder(scopeInfoLabels)245scope.Attributes().Sort().Range(func(k string, v pcommon.Value) bool {246lb.Set(prometheus.NormalizeLabel(k), v.AsString())247return true248})249250labels := lb.Labels(nil)251252entry := newMemorySeries(map[string]string{253scopeNameLabel: scope.Name(),254scopeVersionLabel: scope.Version(),255}, labels)256if actual, loaded := conv.seriesCache.LoadOrStore(labels.String(), entry); loaded {257entry = actual.(*memorySeries)258}259260entry.SetValue(1)261entry.Ping()262return entry263}264265func (conv *Converter) consumeMetric(app storage.Appender, memResource *memorySeries, memScope *memorySeries, m pmetric.Metric) {266switch m.Type() {267case pmetric.MetricTypeGauge:268conv.consumeGauge(app, memResource, memScope, m)269case pmetric.MetricTypeSum:270conv.consumeSum(app, memResource, memScope, m)271case pmetric.MetricTypeHistogram:272conv.consumeHistogram(app, memResource, memScope, m)273case pmetric.MetricTypeSummary:274conv.consumeSummary(app, memResource, memScope, m)275}276}277278func (conv *Converter) consumeGauge(app storage.Appender, memResource *memorySeries, memScope *memorySeries, m pmetric.Metric) {279metricName := prometheus.BuildPromCompliantName(m, "")280281metricMD := conv.createOrUpdateMetadata(metricName, metadata.Metadata{282Type: textparse.MetricTypeGauge,283Unit: m.Unit(),284Help: m.Description(),285})286if err := metricMD.WriteTo(app, time.Now()); err != nil {287level.Warn(conv.log).Log("msg", "failed to write metric family metadata", "err", err)288}289290for dpcount := 0; dpcount < m.Gauge().DataPoints().Len(); dpcount++ {291dp := m.Gauge().DataPoints().At(dpcount)292293memSeries := conv.getOrCreateSeries(memResource, memScope, metricName, dp.Attributes())294if err := writeSeries(app, memSeries, dp, getNumberDataPointValue(dp)); err != nil {295level.Error(conv.log).Log("msg", "failed to write metric sample", "err", err)296}297}298}299300type otelcolDataPoint interface {301Timestamp() pcommon.Timestamp302Flags() pmetric.DataPointFlags303}304305func writeSeries(app storage.Appender, series *memorySeries, dp otelcolDataPoint, val float64) error {306ts := dp.Timestamp().AsTime()307if ts.Before(series.Timestamp()) {308// Out-of-order; skip.309return nil310}311series.SetTimestamp(ts)312313if dp.Flags().NoRecordedValue() {314val = float64(value.StaleNaN)315}316series.SetValue(val)317318return series.WriteTo(app, ts)319}320321// getOrCreateSeries gets or creates a [*memorySeries] from the provided322// resource, scope, metric, and attributes. The LastSeen field of the323// *memorySeries is updated before returning.324func (conv *Converter) getOrCreateSeries(res *memorySeries, scope *memorySeries, name string, attrs pcommon.Map, extraLabels ...labels.Label) *memorySeries {325seriesBaseLabels := labels.FromStrings(326model.MetricNameLabel, name,327model.JobLabel, res.metadata[model.JobLabel],328model.InstanceLabel, res.metadata[model.InstanceLabel],329)330331lb := labels.NewBuilder(seriesBaseLabels)332for _, extraLabel := range extraLabels {333lb.Set(extraLabel.Name, extraLabel.Value)334}335336if conv.getOpts().IncludeScopeInfo {337lb.Set("otel_scope_name", scope.metadata[scopeNameLabel])338lb.Set("otel_scope_version", scope.metadata[scopeVersionLabel])339}340341attrs.Sort().Range(func(k string, v pcommon.Value) bool {342lb.Set(prometheus.NormalizeLabel(k), v.AsString())343return true344})345346labels := lb.Labels(nil)347348entry := newMemorySeries(nil, labels)349if actual, loaded := conv.seriesCache.LoadOrStore(labels.String(), entry); loaded {350entry = actual.(*memorySeries)351}352353entry.Ping()354return entry355}356357func getNumberDataPointValue(dp pmetric.NumberDataPoint) float64 {358switch dp.ValueType() {359case pmetric.NumberDataPointValueTypeDouble:360return dp.DoubleValue()361case pmetric.NumberDataPointValueTypeInt:362return float64(dp.IntValue())363}364365return 0366}367368func (conv *Converter) consumeSum(app storage.Appender, memResource *memorySeries, memScope *memorySeries, m pmetric.Metric) {369metricName := prometheus.BuildPromCompliantName(m, "")370371// Excerpt from the spec:372//373// * If the aggregation temporarlity is cumulative and sum is monotonic, it374// MUST be converted to a Prometheus Counter.375// * If the aggregation temporarlity is cumulative and sum is non-monotonic,376// it MUST be converted to a Prometheus Gauge.377// * If the aggregation temporarlity is delta and the sum is monotonic, it378// SHOULD be converted to a cumulative temporarlity and become a Prometheus379// Sum.380// * Otherwise, it MUST be dropped.381var convType textparse.MetricType382switch {383case m.Sum().AggregationTemporality() == pmetric.AggregationTemporalityCumulative && m.Sum().IsMonotonic():384convType = textparse.MetricTypeCounter385case m.Sum().AggregationTemporality() == pmetric.AggregationTemporalityCumulative && !m.Sum().IsMonotonic():386convType = textparse.MetricTypeGauge387case m.Sum().AggregationTemporality() == pmetric.AggregationTemporalityDelta && m.Sum().IsMonotonic():388// Drop non-cumulative summaries for now, which is permitted by the spec.389//390// TODO(rfratto): implement delta-to-cumulative for sums.391return392default:393// Drop the metric.394return395}396397metricMD := conv.createOrUpdateMetadata(metricName, metadata.Metadata{398Type: convType,399Unit: m.Unit(),400Help: m.Description(),401})402if err := metricMD.WriteTo(app, time.Now()); err != nil {403level.Warn(conv.log).Log("msg", "failed to write metric family metadata", "err", err)404}405406for dpcount := 0; dpcount < m.Sum().DataPoints().Len(); dpcount++ {407dp := m.Sum().DataPoints().At(dpcount)408409memSeries := conv.getOrCreateSeries(memResource, memScope, metricName, dp.Attributes())410411val := getNumberDataPointValue(dp)412if err := writeSeries(app, memSeries, dp, val); err != nil {413level.Error(conv.log).Log("msg", "failed to write metric sample", "err", err)414}415}416}417418func (conv *Converter) consumeHistogram(app storage.Appender, memResource *memorySeries, memScope *memorySeries, m pmetric.Metric) {419metricName := prometheus.BuildPromCompliantName(m, "")420421if m.Histogram().AggregationTemporality() != pmetric.AggregationTemporalityCumulative {422// Drop non-cumulative histograms for now, which is permitted by the spec.423//424// TODO(rfratto): implement delta-to-cumulative for histograms.425return426}427428metricMD := conv.createOrUpdateMetadata(metricName, metadata.Metadata{429Type: textparse.MetricTypeHistogram,430Unit: m.Unit(),431Help: m.Description(),432})433if err := metricMD.WriteTo(app, time.Now()); err != nil {434level.Warn(conv.log).Log("msg", "failed to write metric family metadata", "err", err)435}436437for dpcount := 0; dpcount < m.Histogram().DataPoints().Len(); dpcount++ {438dp := m.Histogram().DataPoints().At(dpcount)439440// Sum metric441if dp.HasSum() {442sumMetric := conv.getOrCreateSeries(memResource, memScope, metricName+"_sum", dp.Attributes())443sumMetricVal := dp.Sum()444445if err := writeSeries(app, sumMetric, dp, sumMetricVal); err != nil {446level.Error(conv.log).Log("msg", "failed to write histogram sum sample", "err", err)447}448}449450// Count metric451{452countMetric := conv.getOrCreateSeries(memResource, memScope, metricName+"_count", dp.Attributes())453countMetricVal := float64(dp.Count())454455if err := writeSeries(app, countMetric, dp, countMetricVal); err != nil {456level.Error(conv.log).Log("msg", "failed to write histogram count sample", "err", err)457}458}459460// Process the boundaries. The number of buckets = number of explicit461// bounds + 1.462for i := 0; i < dp.ExplicitBounds().Len() && i < dp.BucketCounts().Len(); i++ {463bound := dp.ExplicitBounds().At(i)464count := dp.BucketCounts().At(i)465466bucketLabel := labels.Label{467Name: model.BucketLabel,468Value: strconv.FormatFloat(bound, 'f', -1, 64),469}470471bucket := conv.getOrCreateSeries(memResource, memScope, metricName+"_bucket", dp.Attributes(), bucketLabel)472bucketVal := float64(count)473474if err := writeSeries(app, bucket, dp, bucketVal); err != nil {475level.Error(conv.log).Log("msg", "failed to write histogram bucket sample", "bucket", bucketLabel.Value, "err", err)476}477}478479// Add le=+Inf bucket. All values are <= +Inf, so the value is the same as480// the count of the datapoint.481{482bucketLabel := labels.Label{483Name: model.BucketLabel,484Value: "+Inf",485}486487infBucket := conv.getOrCreateSeries(memResource, memScope, metricName+"_bucket", dp.Attributes(), bucketLabel)488infBucketVal := float64(dp.Count())489490if err := writeSeries(app, infBucket, dp, infBucketVal); err != nil {491level.Error(conv.log).Log("msg", "failed to write histogram bucket sample", "bucket", bucketLabel.Value, "err", err)492}493}494}495}496497func (conv *Converter) consumeSummary(app storage.Appender, memResource *memorySeries, memScope *memorySeries, m pmetric.Metric) {498metricName := prometheus.BuildPromCompliantName(m, "")499500metricMD := conv.createOrUpdateMetadata(metricName, metadata.Metadata{501Type: textparse.MetricTypeSummary,502Unit: m.Unit(),503Help: m.Description(),504})505if err := metricMD.WriteTo(app, time.Now()); err != nil {506level.Warn(conv.log).Log("msg", "failed to write metric family metadata", "err", err)507}508509for dpcount := 0; dpcount < m.Summary().DataPoints().Len(); dpcount++ {510dp := m.Summary().DataPoints().At(dpcount)511512// Sum metric513{514sumMetric := conv.getOrCreateSeries(memResource, memScope, metricName+"_sum", dp.Attributes())515sumMetricVal := dp.Sum()516517if err := writeSeries(app, sumMetric, dp, sumMetricVal); err != nil {518level.Error(conv.log).Log("msg", "failed to write summary sum sample", "err", err)519}520}521522// Count metric523{524countMetric := conv.getOrCreateSeries(memResource, memScope, metricName+"_count", dp.Attributes())525countMetricVal := float64(dp.Count())526527if err := writeSeries(app, countMetric, dp, countMetricVal); err != nil {528level.Error(conv.log).Log("msg", "failed to write histogram count sample", "err", err)529}530}531532// Quantiles533for i := 0; i < dp.QuantileValues().Len(); i++ {534qp := dp.QuantileValues().At(i)535536quantileLabel := labels.Label{537Name: model.QuantileLabel,538Value: strconv.FormatFloat(qp.Quantile(), 'f', -1, 64),539}540541quantile := conv.getOrCreateSeries(memResource, memScope, metricName, dp.Attributes(), quantileLabel)542quantileVal := qp.Value()543544if err := writeSeries(app, quantile, dp, quantileVal); err != nil {545level.Error(conv.log).Log("msg", "failed to write histogram quantile sample", "quantile", quantileLabel.Value, "err", err)546}547}548}549}550551// GC cleans up stale metrics which have not been updated in the time specified552// by staleTime.553func (conv *Converter) GC(staleTime time.Duration) {554now := time.Now()555556// In the code below, we use TryLock as a small performance optimization.557//558// The garbage collector doesn't bother to wait for locks for anything in the559// cache; the lock being unavailable implies that the cached resource is560// still active.561562conv.seriesCache.Range(func(key, value any) bool {563series := value.(*memorySeries)564if !series.TryLock() {565return true566}567defer series.Unlock()568569if now.Sub(series.lastSeen) > staleTime {570conv.seriesCache.Delete(key)571}572return true573})574575conv.metadataCache.Range(func(key, value any) bool {576series := value.(*memoryMetadata)577if !series.TryLock() {578return true579}580defer series.Unlock()581582if now.Sub(series.lastSeen) > staleTime {583conv.seriesCache.Delete(key)584}585return true586})587}588589// FlushMetadata empties out the metadata cache, forcing metadata to get590// rewritten.591func (conv *Converter) FlushMetadata() {592// TODO(rfratto): this is fairly inefficient since it'll require rebuilding593// all of the metadata for every active series. However, it's the easiest594// thing to do for now.595conv.metadataCache.Range(func(key, _ any) bool {596conv.metadataCache.Delete(key)597return true598})599}600601602