Path: blob/main/component/otelcol/receiver/prometheus/internal/metricfamily.go
5460 views
// Copyright The OpenTelemetry Authors1//2// Licensed under the Apache License, Version 2.0 (the "License");3// you may not use this file except in compliance with the License.4// You may obtain a copy of the License at5//6// http://www.apache.org/licenses/LICENSE-2.07//8// Unless required by applicable law or agreed to in writing, software9// distributed under the License is distributed on an "AS IS" BASIS,10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.11// See the License for the specific language governing permissions and12// limitations under the License.1314package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/internal"1516import (17"fmt"18"sort"19"strings"2021"github.com/prometheus/prometheus/model/labels"22"github.com/prometheus/prometheus/model/value"23"github.com/prometheus/prometheus/scrape"24"go.opentelemetry.io/collector/pdata/pcommon"25"go.opentelemetry.io/collector/pdata/pmetric"26"go.uber.org/zap"27)2829type metricFamily struct {30mtype pmetric.MetricType31// isMonotonic only applies to sums32isMonotonic bool33groups map[uint64]*metricGroup34name string35metadata *scrape.MetricMetadata36groupOrders []*metricGroup37}3839// metricGroup, represents a single metric of a metric family. for example a histogram metric is usually represent by40// a couple data complexValue (buckets and count/sum), a group of a metric family always share a same set of tags. for41// simple types like counter and gauge, each data point is a group of itself42type metricGroup struct {43family *metricFamily44ts int6445ls labels.Labels46count float6447hasCount bool48sum float6449hasSum bool50value float6451complexValue []*dataPoint52}5354func newMetricFamily(metricName string, mc scrape.MetricMetadataStore, logger *zap.Logger) *metricFamily {55metadata, familyName := metadataForMetric(metricName, mc)56mtype, isMonotonic := convToMetricType(metadata.Type)57if mtype == pmetric.MetricTypeEmpty {58logger.Debug(fmt.Sprintf("Unknown-typed metric : %s %+v", metricName, metadata))59}6061return &metricFamily{62mtype: mtype,63isMonotonic: isMonotonic,64groups: make(map[uint64]*metricGroup),65name: familyName,66metadata: metadata,67}68}6970// includesMetric returns true if the metric is part of the family71func (mf *metricFamily) includesMetric(metricName string) bool {72if mf.mtype != pmetric.MetricTypeGauge {73// If it is a merged family type, then it should match the74// family name when suffixes are trimmed.75return normalizeMetricName(metricName) == mf.name76}77// If it isn't a merged type, the metricName and family name should match78return metricName == mf.name79}8081func (mf *metricFamily) getGroupKey(ls labels.Labels) uint64 {82bytes := make([]byte, 0, 2048)83hash, _ := ls.HashWithoutLabels(bytes, getSortedNotUsefulLabels(mf.mtype)...)84return hash85}8687func (mg *metricGroup) sortPoints() {88sort.Slice(mg.complexValue, func(i, j int) bool {89return mg.complexValue[i].boundary < mg.complexValue[j].boundary90})91}9293func (mg *metricGroup) toDistributionPoint(dest pmetric.HistogramDataPointSlice) {94if !mg.hasCount || len(mg.complexValue) == 0 {95return96}9798mg.sortPoints()99100// for OCAgent Proto, the bounds won't include +inf101// TODO: (@odeke-em) should we also check OpenTelemetry Pdata for bucket bounds?102bounds := make([]float64, len(mg.complexValue)-1)103bucketCounts := make([]uint64, len(mg.complexValue))104105pointIsStale := value.IsStaleNaN(mg.sum) || value.IsStaleNaN(mg.count)106107for i := 0; i < len(mg.complexValue); i++ {108if i != len(mg.complexValue)-1 {109// not need to add +inf as bound to oc proto110bounds[i] = mg.complexValue[i].boundary111}112adjustedCount := mg.complexValue[i].value113// Buckets still need to be sent to know to set them as stale,114// but a staleness NaN converted to uint64 would be an extremely large number.115// Setting to 0 instead.116if pointIsStale {117adjustedCount = 0118} else if i != 0 {119adjustedCount -= mg.complexValue[i-1].value120}121bucketCounts[i] = uint64(adjustedCount)122}123124point := dest.AppendEmpty()125126if pointIsStale {127point.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true))128} else {129point.SetCount(uint64(mg.count))130if mg.hasSum {131point.SetSum(mg.sum)132}133}134135point.ExplicitBounds().FromRaw(bounds)136point.BucketCounts().FromRaw(bucketCounts)137138// The timestamp MUST be in retrieved from milliseconds and converted to nanoseconds.139tsNanos := timestampFromMs(mg.ts)140point.SetStartTimestamp(tsNanos) // metrics_adjuster adjusts the startTimestamp to the initial scrape timestamp141point.SetTimestamp(tsNanos)142populateAttributes(pmetric.MetricTypeHistogram, mg.ls, point.Attributes())143}144145func (mg *metricGroup) toSummaryPoint(dest pmetric.SummaryDataPointSlice) {146// expecting count to be provided, however, in the following two cases, they can be missed.147// 1. data is corrupted148// 2. ignored by startValue evaluation149if !mg.hasCount {150return151}152153mg.sortPoints()154155point := dest.AppendEmpty()156pointIsStale := value.IsStaleNaN(mg.sum) || value.IsStaleNaN(mg.count)157if pointIsStale {158point.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true))159} else {160if mg.hasSum {161point.SetSum(mg.sum)162}163point.SetCount(uint64(mg.count))164}165166quantileValues := point.QuantileValues()167for _, p := range mg.complexValue {168quantile := quantileValues.AppendEmpty()169// Quantiles still need to be sent to know to set them as stale,170// but a staleness NaN converted to uint64 would be an extremely large number.171// By not setting the quantile value, it will default to 0.172if !pointIsStale {173quantile.SetValue(p.value)174}175quantile.SetQuantile(p.boundary)176}177178// Based on the summary description from https://prometheus.io/docs/concepts/metric_types/#summary179// the quantiles are calculated over a sliding time window, however, the count is the total count of180// observations and the corresponding sum is a sum of all observed values, thus the sum and count used181// at the global level of the metricspb.SummaryValue182// The timestamp MUST be in retrieved from milliseconds and converted to nanoseconds.183tsNanos := timestampFromMs(mg.ts)184point.SetTimestamp(tsNanos)185point.SetStartTimestamp(tsNanos) // metrics_adjuster adjusts the startTimestamp to the initial scrape timestamp186populateAttributes(pmetric.MetricTypeSummary, mg.ls, point.Attributes())187}188189func (mg *metricGroup) toNumberDataPoint(dest pmetric.NumberDataPointSlice) {190tsNanos := timestampFromMs(mg.ts)191point := dest.AppendEmpty()192// gauge/undefined types have no start time.193if mg.family.mtype == pmetric.MetricTypeSum {194point.SetStartTimestamp(tsNanos) // metrics_adjuster adjusts the startTimestamp to the initial scrape timestamp195}196point.SetTimestamp(tsNanos)197if value.IsStaleNaN(mg.value) {198point.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true))199} else {200point.SetDoubleValue(mg.value)201}202populateAttributes(pmetric.MetricTypeGauge, mg.ls, point.Attributes())203}204205func populateAttributes(mType pmetric.MetricType, ls labels.Labels, dest pcommon.Map) {206dest.EnsureCapacity(ls.Len())207names := getSortedNotUsefulLabels(mType)208j := 0209for i := range ls {210for j < len(names) && names[j] < ls[i].Name {211j++212}213if j < len(names) && ls[i].Name == names[j] {214continue215}216if ls[i].Value == "" {217// empty label values should be omitted218continue219}220dest.PutStr(ls[i].Name, ls[i].Value)221}222}223224func (mf *metricFamily) loadMetricGroupOrCreate(groupKey uint64, ls labels.Labels, ts int64) *metricGroup {225mg, ok := mf.groups[groupKey]226if !ok {227mg = &metricGroup{228family: mf,229ts: ts,230ls: ls,231}232mf.groups[groupKey] = mg233// maintaining data insertion order is helpful to generate stable/reproducible metric output234mf.groupOrders = append(mf.groupOrders, mg)235}236return mg237}238239func (mf *metricFamily) Add(metricName string, ls labels.Labels, t int64, v float64) error {240groupKey := mf.getGroupKey(ls)241mg := mf.loadMetricGroupOrCreate(groupKey, ls, t)242if mg.ts != t {243return fmt.Errorf("inconsistent timestamps on metric points for metric %v", metricName)244}245switch mf.mtype {246case pmetric.MetricTypeHistogram, pmetric.MetricTypeSummary:247switch {248case strings.HasSuffix(metricName, metricsSuffixSum):249mg.sum = v250mg.hasSum = true251case strings.HasSuffix(metricName, metricsSuffixCount):252// always use the timestamp from count, because is the only required field for histograms and summaries.253mg.ts = t254mg.count = v255mg.hasCount = true256default:257boundary, err := getBoundary(mf.mtype, ls)258if err != nil {259return err260}261mg.complexValue = append(mg.complexValue, &dataPoint{value: v, boundary: boundary})262}263default:264mg.value = v265}266267return nil268}269270func (mf *metricFamily) appendMetric(metrics pmetric.MetricSlice) {271metric := pmetric.NewMetric()272metric.SetName(mf.name)273metric.SetDescription(mf.metadata.Help)274metric.SetUnit(mf.metadata.Unit)275276pointCount := 0277278switch mf.mtype {279case pmetric.MetricTypeHistogram:280histogram := metric.SetEmptyHistogram()281histogram.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)282hdpL := histogram.DataPoints()283for _, mg := range mf.groupOrders {284mg.toDistributionPoint(hdpL)285}286pointCount = hdpL.Len()287288case pmetric.MetricTypeSummary:289summary := metric.SetEmptySummary()290sdpL := summary.DataPoints()291for _, mg := range mf.groupOrders {292mg.toSummaryPoint(sdpL)293}294pointCount = sdpL.Len()295296case pmetric.MetricTypeSum:297sum := metric.SetEmptySum()298sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)299sum.SetIsMonotonic(mf.isMonotonic)300sdpL := sum.DataPoints()301for _, mg := range mf.groupOrders {302mg.toNumberDataPoint(sdpL)303}304pointCount = sdpL.Len()305306default: // Everything else should be set to a Gauge.307gauge := metric.SetEmptyGauge()308gdpL := gauge.DataPoints()309for _, mg := range mf.groupOrders {310mg.toNumberDataPoint(gdpL)311}312pointCount = gdpL.Len()313}314315if pointCount == 0 {316return317}318319metric.MoveTo(metrics.AppendEmpty())320}321322323