Path: blob/main/component/otelcol/receiver/prometheus/internal/metrics_adjuster.go
5443 views
// Copyright The OpenTelemetry Authors1//2// Licensed under the Apache License, Version 2.0 (the "License");3// you may not use this file except in compliance with the License.4// You may obtain a copy of the License at5//6// http://www.apache.org/licenses/LICENSE-2.07//8// Unless required by applicable law or agreed to in writing, software9// distributed under the License is distributed on an "AS IS" BASIS,10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.11// See the License for the specific language governing permissions and12// limitations under the License.1314package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/internal"1516import (17"errors"18"strings"19"sync"20"time"2122"go.opentelemetry.io/collector/pdata/pcommon"23"go.opentelemetry.io/collector/pdata/pmetric"24semconv "go.opentelemetry.io/collector/semconv/v1.6.1"25"go.uber.org/zap"26)2728// Notes on garbage collection (gc):29//30// Job-level gc:31// The Prometheus receiver will likely execute in a long running service whose lifetime may exceed32// the lifetimes of many of the jobs that it is collecting from. In order to keep the JobsMap from33// leaking memory for entries of no-longer existing jobs, the JobsMap needs to remove entries that34// haven't been accessed for a long period of time.35//36// Timeseries-level gc:37// Some jobs that the Prometheus receiver is collecting from may export timeseries based on metrics38// from other jobs (e.g. cAdvisor). In order to keep the timeseriesMap from leaking memory for entries39// of no-longer existing jobs, the timeseriesMap for each job needs to remove entries that haven't40// been accessed for a long period of time.41//42// The gc strategy uses a standard mark-and-sweep approach - each time a timeseriesMap is accessed,43// it is marked. Similarly, each time a timeseriesInfo is accessed, it is also marked.44//45// At the end of each JobsMap.get(), if the last time the JobsMap was gc'd exceeds the 'gcInterval',46// the JobsMap is locked and any timeseriesMaps that are unmarked are removed from the JobsMap47// otherwise the timeseriesMap is gc'd48//49// The gc for the timeseriesMap is straightforward - the map is locked and, for each timeseriesInfo50// in the map, if it has not been marked, it is removed otherwise it is unmarked.51//52// Alternative Strategies53// 1. If the job-level gc doesn't run often enough, or runs too often, a separate go routine can54// be spawned at JobMap creation time that gc's at periodic intervals. This approach potentially55// adds more contention and latency to each scrape so the current approach is used. Note that56// the go routine will need to be cancelled upon Shutdown().57// 2. If the gc of each timeseriesMap during the gc of the JobsMap causes too much contention,58// the gc of timeseriesMaps can be moved to the end of MetricsAdjuster().AdjustMetricSlice(). This59// approach requires adding 'lastGC' Time and (potentially) a gcInterval duration to60// timeseriesMap so the current approach is used instead.6162// timeseriesInfo contains the information necessary to adjust from the initial point and to detect resets.63type timeseriesInfo struct {64mark bool6566number numberInfo67histogram histogramInfo68summary summaryInfo69}7071type numberInfo struct {72startTime pcommon.Timestamp73previousValue float6474}7576type histogramInfo struct {77startTime pcommon.Timestamp78previousCount uint6479previousSum float6480}8182type summaryInfo struct {83startTime pcommon.Timestamp84previousCount uint6485previousSum float6486}8788type timeseriesKey struct {89name string90attributes string91aggTemporality pmetric.AggregationTemporality92}9394// timeseriesMap maps from a timeseries instance (metric * label values) to the timeseries info for95// the instance.96type timeseriesMap struct {97sync.RWMutex98// The mutex is used to protect access to the member fields. It is acquired for the entirety of99// AdjustMetricSlice() and also acquired by gc().100101mark bool102tsiMap map[timeseriesKey]*timeseriesInfo103}104105// Get the timeseriesInfo for the timeseries associated with the metric and label values.106func (tsm *timeseriesMap) get(metric pmetric.Metric, kv pcommon.Map) (*timeseriesInfo, bool) {107// This should only be invoked be functions called (directly or indirectly) by AdjustMetricSlice().108// The lock protecting tsm.tsiMap is acquired there.109name := metric.Name()110key := timeseriesKey{111name: name,112attributes: getAttributesSignature(kv),113}114if metric.Type() == pmetric.MetricTypeHistogram {115// There are 2 types of Histograms whose aggregation temporality needs distinguishing:116// * CumulativeHistogram117// * GaugeHistogram118key.aggTemporality = metric.Histogram().AggregationTemporality()119}120121tsm.mark = true122tsi, ok := tsm.tsiMap[key]123if !ok {124tsi = ×eriesInfo{}125tsm.tsiMap[key] = tsi126}127tsi.mark = true128return tsi, ok129}130131// Create a unique timeseries signature consisting of the metric name and label values.132func getAttributesSignature(kv pcommon.Map) string {133labelValues := make([]string, 0, kv.Len())134kv.Sort().Range(func(_ string, attrValue pcommon.Value) bool {135value := attrValue.Str()136if value != "" {137labelValues = append(labelValues, value)138}139return true140})141return strings.Join(labelValues, ",")142}143144// Remove timeseries that have aged out.145func (tsm *timeseriesMap) gc() {146tsm.Lock()147defer tsm.Unlock()148// this shouldn't happen under the current gc() strategy149if !tsm.mark {150return151}152for ts, tsi := range tsm.tsiMap {153if !tsi.mark {154delete(tsm.tsiMap, ts)155} else {156tsi.mark = false157}158}159tsm.mark = false160}161162func newTimeseriesMap() *timeseriesMap {163return ×eriesMap{mark: true, tsiMap: map[timeseriesKey]*timeseriesInfo{}}164}165166// JobsMap maps from a job instance to a map of timeseries instances for the job.167type JobsMap struct {168sync.RWMutex169// The mutex is used to protect access to the member fields. It is acquired for most of170// get() and also acquired by gc().171172gcInterval time.Duration173lastGC time.Time174jobsMap map[string]*timeseriesMap175}176177// NewJobsMap creates a new (empty) JobsMap.178func NewJobsMap(gcInterval time.Duration) *JobsMap {179return &JobsMap{gcInterval: gcInterval, lastGC: time.Now(), jobsMap: make(map[string]*timeseriesMap)}180}181182// Remove jobs and timeseries that have aged out.183func (jm *JobsMap) gc() {184jm.Lock()185defer jm.Unlock()186// once the structure is locked, confirm that gc() is still necessary187if time.Since(jm.lastGC) > jm.gcInterval {188for sig, tsm := range jm.jobsMap {189tsm.RLock()190tsmNotMarked := !tsm.mark191// take a read lock here, no need to get a full lock as we have a lock on the JobsMap192tsm.RUnlock()193if tsmNotMarked {194delete(jm.jobsMap, sig)195} else {196// a full lock will be obtained in here, if required.197tsm.gc()198}199}200jm.lastGC = time.Now()201}202}203204func (jm *JobsMap) maybeGC() {205// speculatively check if gc() is necessary, recheck once the structure is locked206jm.RLock()207defer jm.RUnlock()208if time.Since(jm.lastGC) > jm.gcInterval {209go jm.gc()210}211}212213func (jm *JobsMap) get(job, instance string) *timeseriesMap {214sig := job + ":" + instance215// a read locke is taken here as we will not need to modify jobsMap if the target timeseriesMap is available.216jm.RLock()217tsm, ok := jm.jobsMap[sig]218jm.RUnlock()219defer jm.maybeGC()220if ok {221return tsm222}223jm.Lock()224defer jm.Unlock()225// Now that we've got an exclusive lock, check once more to ensure an entry wasn't created in the interim226// and then create a new timeseriesMap if required.227tsm2, ok2 := jm.jobsMap[sig]228if ok2 {229return tsm2230}231tsm2 = newTimeseriesMap()232jm.jobsMap[sig] = tsm2233return tsm2234}235236// MetricsAdjuster adjusts the start time of metrics when converting between237// Prometheus and OTel.238type MetricsAdjuster interface {239AdjustMetrics(metrics pmetric.Metrics) error240}241242// initialPointAdjuster takes a map from a metric instance to the initial point in the metrics instance243// and provides AdjustMetricSlice, which takes a sequence of metrics and adjust their start times based on244// the initial points.245type initialPointAdjuster struct {246jobsMap *JobsMap247logger *zap.Logger248}249250// NewInitialPointAdjuster returns a new MetricsAdjuster that adjust metrics' start times based on the initial received points.251func NewInitialPointAdjuster(logger *zap.Logger, gcInterval time.Duration) MetricsAdjuster {252return &initialPointAdjuster{253jobsMap: NewJobsMap(gcInterval),254logger: logger,255}256}257258// AdjustMetrics takes a sequence of metrics and adjust their start times based on the initial and259// previous points in the timeseriesMap.260func (ma *initialPointAdjuster) AdjustMetrics(metrics pmetric.Metrics) error {261// By contract metrics will have at least 1 data point, so for sure will have at least one ResourceMetrics.262263job, found := metrics.ResourceMetrics().At(0).Resource().Attributes().Get(semconv.AttributeServiceName)264if !found {265return errors.New("adjusting metrics without job")266}267268instance, found := metrics.ResourceMetrics().At(0).Resource().Attributes().Get(semconv.AttributeServiceInstanceID)269if !found {270return errors.New("adjusting metrics without instance")271}272tsm := ma.jobsMap.get(job.Str(), instance.Str())273274// The lock on the relevant timeseriesMap is held throughout the adjustment process to ensure that275// nothing else can modify the data used for adjustment.276tsm.Lock()277defer tsm.Unlock()278for i := 0; i < metrics.ResourceMetrics().Len(); i++ {279rm := metrics.ResourceMetrics().At(i)280for j := 0; j < rm.ScopeMetrics().Len(); j++ {281ilm := rm.ScopeMetrics().At(j)282for k := 0; k < ilm.Metrics().Len(); k++ {283metric := ilm.Metrics().At(k)284switch dataType := metric.Type(); dataType {285case pmetric.MetricTypeGauge:286// gauges don't need to be adjusted so no additional processing is necessary287288case pmetric.MetricTypeHistogram:289adjustMetricHistogram(tsm, metric)290291case pmetric.MetricTypeSummary:292adjustMetricSummary(tsm, metric)293294case pmetric.MetricTypeSum:295adjustMetricSum(tsm, metric)296297default:298// this shouldn't happen299ma.logger.Info("Adjust - skipping unexpected point", zap.String("type", dataType.String()))300}301}302}303}304return nil305}306307func adjustMetricHistogram(tsm *timeseriesMap, current pmetric.Metric) {308histogram := current.Histogram()309if histogram.AggregationTemporality() != pmetric.AggregationTemporalityCumulative {310// Only dealing with CumulativeDistributions.311return312}313314currentPoints := histogram.DataPoints()315for i := 0; i < currentPoints.Len(); i++ {316currentDist := currentPoints.At(i)317tsi, found := tsm.get(current, currentDist.Attributes())318if !found {319// initialize everything.320tsi.histogram.startTime = currentDist.StartTimestamp()321tsi.histogram.previousCount = currentDist.Count()322tsi.histogram.previousSum = currentDist.Sum()323continue324}325326if currentDist.Flags().NoRecordedValue() {327// TODO: Investigate why this does not reset.328currentDist.SetStartTimestamp(tsi.histogram.startTime)329continue330}331332if currentDist.Count() < tsi.histogram.previousCount || currentDist.Sum() < tsi.histogram.previousSum {333// reset re-initialize everything.334tsi.histogram.startTime = currentDist.StartTimestamp()335tsi.histogram.previousCount = currentDist.Count()336tsi.histogram.previousSum = currentDist.Sum()337continue338}339340// Update only previous values.341tsi.histogram.previousCount = currentDist.Count()342tsi.histogram.previousSum = currentDist.Sum()343currentDist.SetStartTimestamp(tsi.histogram.startTime)344}345}346347func adjustMetricSum(tsm *timeseriesMap, current pmetric.Metric) {348currentPoints := current.Sum().DataPoints()349for i := 0; i < currentPoints.Len(); i++ {350currentSum := currentPoints.At(i)351tsi, found := tsm.get(current, currentSum.Attributes())352if !found {353// initialize everything.354tsi.number.startTime = currentSum.StartTimestamp()355tsi.number.previousValue = currentSum.DoubleValue()356continue357}358359if currentSum.Flags().NoRecordedValue() {360// TODO: Investigate why this does not reset.361currentSum.SetStartTimestamp(tsi.number.startTime)362continue363}364365if currentSum.DoubleValue() < tsi.number.previousValue {366// reset re-initialize everything.367tsi.number.startTime = currentSum.StartTimestamp()368tsi.number.previousValue = currentSum.DoubleValue()369continue370}371372// Update only previous values.373tsi.number.previousValue = currentSum.DoubleValue()374currentSum.SetStartTimestamp(tsi.number.startTime)375}376}377378func adjustMetricSummary(tsm *timeseriesMap, current pmetric.Metric) {379currentPoints := current.Summary().DataPoints()380381for i := 0; i < currentPoints.Len(); i++ {382currentSummary := currentPoints.At(i)383tsi, found := tsm.get(current, currentSummary.Attributes())384if !found {385// initialize everything.386tsi.summary.startTime = currentSummary.StartTimestamp()387tsi.summary.previousCount = currentSummary.Count()388tsi.summary.previousSum = currentSummary.Sum()389continue390}391392if currentSummary.Flags().NoRecordedValue() {393// TODO: Investigate why this does not reset.394currentSummary.SetStartTimestamp(tsi.summary.startTime)395continue396}397398if (currentSummary.Count() != 0 &&399tsi.summary.previousCount != 0 &&400currentSummary.Count() < tsi.summary.previousCount) ||401(currentSummary.Sum() != 0 &&402tsi.summary.previousSum != 0 &&403currentSummary.Sum() < tsi.summary.previousSum) {404// reset re-initialize everything.405tsi.summary.startTime = currentSummary.StartTimestamp()406tsi.summary.previousCount = currentSummary.Count()407tsi.summary.previousSum = currentSummary.Sum()408continue409}410411// Update only previous values.412tsi.summary.previousCount = currentSummary.Count()413tsi.summary.previousSum = currentSummary.Sum()414currentSummary.SetStartTimestamp(tsi.summary.startTime)415}416}417418419