Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/otelcol/receiver/prometheus/internal/metrics_adjuster.go
5443 views
1
// Copyright The OpenTelemetry Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/internal"
16
17
import (
18
"errors"
19
"strings"
20
"sync"
21
"time"
22
23
"go.opentelemetry.io/collector/pdata/pcommon"
24
"go.opentelemetry.io/collector/pdata/pmetric"
25
semconv "go.opentelemetry.io/collector/semconv/v1.6.1"
26
"go.uber.org/zap"
27
)
28
29
// Notes on garbage collection (gc):
30
//
31
// Job-level gc:
32
// The Prometheus receiver will likely execute in a long running service whose lifetime may exceed
33
// the lifetimes of many of the jobs that it is collecting from. In order to keep the JobsMap from
34
// leaking memory for entries of no-longer existing jobs, the JobsMap needs to remove entries that
35
// haven't been accessed for a long period of time.
36
//
37
// Timeseries-level gc:
38
// Some jobs that the Prometheus receiver is collecting from may export timeseries based on metrics
39
// from other jobs (e.g. cAdvisor). In order to keep the timeseriesMap from leaking memory for entries
40
// of no-longer existing jobs, the timeseriesMap for each job needs to remove entries that haven't
41
// been accessed for a long period of time.
42
//
43
// The gc strategy uses a standard mark-and-sweep approach - each time a timeseriesMap is accessed,
44
// it is marked. Similarly, each time a timeseriesInfo is accessed, it is also marked.
45
//
46
// At the end of each JobsMap.get(), if the last time the JobsMap was gc'd exceeds the 'gcInterval',
47
// the JobsMap is locked and any timeseriesMaps that are unmarked are removed from the JobsMap
48
// otherwise the timeseriesMap is gc'd
49
//
50
// The gc for the timeseriesMap is straightforward - the map is locked and, for each timeseriesInfo
51
// in the map, if it has not been marked, it is removed otherwise it is unmarked.
52
//
53
// Alternative Strategies
54
// 1. If the job-level gc doesn't run often enough, or runs too often, a separate go routine can
55
// be spawned at JobMap creation time that gc's at periodic intervals. This approach potentially
56
// adds more contention and latency to each scrape so the current approach is used. Note that
57
// the go routine will need to be cancelled upon Shutdown().
58
// 2. If the gc of each timeseriesMap during the gc of the JobsMap causes too much contention,
59
// the gc of timeseriesMaps can be moved to the end of MetricsAdjuster().AdjustMetricSlice(). This
60
// approach requires adding 'lastGC' Time and (potentially) a gcInterval duration to
61
// timeseriesMap so the current approach is used instead.
62
63
// timeseriesInfo contains the information necessary to adjust from the initial point and to detect resets.
64
type timeseriesInfo struct {
65
mark bool
66
67
number numberInfo
68
histogram histogramInfo
69
summary summaryInfo
70
}
71
72
type numberInfo struct {
73
startTime pcommon.Timestamp
74
previousValue float64
75
}
76
77
type histogramInfo struct {
78
startTime pcommon.Timestamp
79
previousCount uint64
80
previousSum float64
81
}
82
83
type summaryInfo struct {
84
startTime pcommon.Timestamp
85
previousCount uint64
86
previousSum float64
87
}
88
89
type timeseriesKey struct {
90
name string
91
attributes string
92
aggTemporality pmetric.AggregationTemporality
93
}
94
95
// timeseriesMap maps from a timeseries instance (metric * label values) to the timeseries info for
96
// the instance.
97
type timeseriesMap struct {
98
sync.RWMutex
99
// The mutex is used to protect access to the member fields. It is acquired for the entirety of
100
// AdjustMetricSlice() and also acquired by gc().
101
102
mark bool
103
tsiMap map[timeseriesKey]*timeseriesInfo
104
}
105
106
// Get the timeseriesInfo for the timeseries associated with the metric and label values.
107
func (tsm *timeseriesMap) get(metric pmetric.Metric, kv pcommon.Map) (*timeseriesInfo, bool) {
108
// This should only be invoked be functions called (directly or indirectly) by AdjustMetricSlice().
109
// The lock protecting tsm.tsiMap is acquired there.
110
name := metric.Name()
111
key := timeseriesKey{
112
name: name,
113
attributes: getAttributesSignature(kv),
114
}
115
if metric.Type() == pmetric.MetricTypeHistogram {
116
// There are 2 types of Histograms whose aggregation temporality needs distinguishing:
117
// * CumulativeHistogram
118
// * GaugeHistogram
119
key.aggTemporality = metric.Histogram().AggregationTemporality()
120
}
121
122
tsm.mark = true
123
tsi, ok := tsm.tsiMap[key]
124
if !ok {
125
tsi = &timeseriesInfo{}
126
tsm.tsiMap[key] = tsi
127
}
128
tsi.mark = true
129
return tsi, ok
130
}
131
132
// Create a unique timeseries signature consisting of the metric name and label values.
133
func getAttributesSignature(kv pcommon.Map) string {
134
labelValues := make([]string, 0, kv.Len())
135
kv.Sort().Range(func(_ string, attrValue pcommon.Value) bool {
136
value := attrValue.Str()
137
if value != "" {
138
labelValues = append(labelValues, value)
139
}
140
return true
141
})
142
return strings.Join(labelValues, ",")
143
}
144
145
// Remove timeseries that have aged out.
146
func (tsm *timeseriesMap) gc() {
147
tsm.Lock()
148
defer tsm.Unlock()
149
// this shouldn't happen under the current gc() strategy
150
if !tsm.mark {
151
return
152
}
153
for ts, tsi := range tsm.tsiMap {
154
if !tsi.mark {
155
delete(tsm.tsiMap, ts)
156
} else {
157
tsi.mark = false
158
}
159
}
160
tsm.mark = false
161
}
162
163
func newTimeseriesMap() *timeseriesMap {
164
return &timeseriesMap{mark: true, tsiMap: map[timeseriesKey]*timeseriesInfo{}}
165
}
166
167
// JobsMap maps from a job instance to a map of timeseries instances for the job.
168
type JobsMap struct {
169
sync.RWMutex
170
// The mutex is used to protect access to the member fields. It is acquired for most of
171
// get() and also acquired by gc().
172
173
gcInterval time.Duration
174
lastGC time.Time
175
jobsMap map[string]*timeseriesMap
176
}
177
178
// NewJobsMap creates a new (empty) JobsMap.
179
func NewJobsMap(gcInterval time.Duration) *JobsMap {
180
return &JobsMap{gcInterval: gcInterval, lastGC: time.Now(), jobsMap: make(map[string]*timeseriesMap)}
181
}
182
183
// Remove jobs and timeseries that have aged out.
184
func (jm *JobsMap) gc() {
185
jm.Lock()
186
defer jm.Unlock()
187
// once the structure is locked, confirm that gc() is still necessary
188
if time.Since(jm.lastGC) > jm.gcInterval {
189
for sig, tsm := range jm.jobsMap {
190
tsm.RLock()
191
tsmNotMarked := !tsm.mark
192
// take a read lock here, no need to get a full lock as we have a lock on the JobsMap
193
tsm.RUnlock()
194
if tsmNotMarked {
195
delete(jm.jobsMap, sig)
196
} else {
197
// a full lock will be obtained in here, if required.
198
tsm.gc()
199
}
200
}
201
jm.lastGC = time.Now()
202
}
203
}
204
205
func (jm *JobsMap) maybeGC() {
206
// speculatively check if gc() is necessary, recheck once the structure is locked
207
jm.RLock()
208
defer jm.RUnlock()
209
if time.Since(jm.lastGC) > jm.gcInterval {
210
go jm.gc()
211
}
212
}
213
214
func (jm *JobsMap) get(job, instance string) *timeseriesMap {
215
sig := job + ":" + instance
216
// a read locke is taken here as we will not need to modify jobsMap if the target timeseriesMap is available.
217
jm.RLock()
218
tsm, ok := jm.jobsMap[sig]
219
jm.RUnlock()
220
defer jm.maybeGC()
221
if ok {
222
return tsm
223
}
224
jm.Lock()
225
defer jm.Unlock()
226
// Now that we've got an exclusive lock, check once more to ensure an entry wasn't created in the interim
227
// and then create a new timeseriesMap if required.
228
tsm2, ok2 := jm.jobsMap[sig]
229
if ok2 {
230
return tsm2
231
}
232
tsm2 = newTimeseriesMap()
233
jm.jobsMap[sig] = tsm2
234
return tsm2
235
}
236
237
// MetricsAdjuster adjusts the start time of metrics when converting between
238
// Prometheus and OTel.
239
type MetricsAdjuster interface {
240
AdjustMetrics(metrics pmetric.Metrics) error
241
}
242
243
// initialPointAdjuster takes a map from a metric instance to the initial point in the metrics instance
244
// and provides AdjustMetricSlice, which takes a sequence of metrics and adjust their start times based on
245
// the initial points.
246
type initialPointAdjuster struct {
247
jobsMap *JobsMap
248
logger *zap.Logger
249
}
250
251
// NewInitialPointAdjuster returns a new MetricsAdjuster that adjust metrics' start times based on the initial received points.
252
func NewInitialPointAdjuster(logger *zap.Logger, gcInterval time.Duration) MetricsAdjuster {
253
return &initialPointAdjuster{
254
jobsMap: NewJobsMap(gcInterval),
255
logger: logger,
256
}
257
}
258
259
// AdjustMetrics takes a sequence of metrics and adjust their start times based on the initial and
260
// previous points in the timeseriesMap.
261
func (ma *initialPointAdjuster) AdjustMetrics(metrics pmetric.Metrics) error {
262
// By contract metrics will have at least 1 data point, so for sure will have at least one ResourceMetrics.
263
264
job, found := metrics.ResourceMetrics().At(0).Resource().Attributes().Get(semconv.AttributeServiceName)
265
if !found {
266
return errors.New("adjusting metrics without job")
267
}
268
269
instance, found := metrics.ResourceMetrics().At(0).Resource().Attributes().Get(semconv.AttributeServiceInstanceID)
270
if !found {
271
return errors.New("adjusting metrics without instance")
272
}
273
tsm := ma.jobsMap.get(job.Str(), instance.Str())
274
275
// The lock on the relevant timeseriesMap is held throughout the adjustment process to ensure that
276
// nothing else can modify the data used for adjustment.
277
tsm.Lock()
278
defer tsm.Unlock()
279
for i := 0; i < metrics.ResourceMetrics().Len(); i++ {
280
rm := metrics.ResourceMetrics().At(i)
281
for j := 0; j < rm.ScopeMetrics().Len(); j++ {
282
ilm := rm.ScopeMetrics().At(j)
283
for k := 0; k < ilm.Metrics().Len(); k++ {
284
metric := ilm.Metrics().At(k)
285
switch dataType := metric.Type(); dataType {
286
case pmetric.MetricTypeGauge:
287
// gauges don't need to be adjusted so no additional processing is necessary
288
289
case pmetric.MetricTypeHistogram:
290
adjustMetricHistogram(tsm, metric)
291
292
case pmetric.MetricTypeSummary:
293
adjustMetricSummary(tsm, metric)
294
295
case pmetric.MetricTypeSum:
296
adjustMetricSum(tsm, metric)
297
298
default:
299
// this shouldn't happen
300
ma.logger.Info("Adjust - skipping unexpected point", zap.String("type", dataType.String()))
301
}
302
}
303
}
304
}
305
return nil
306
}
307
308
func adjustMetricHistogram(tsm *timeseriesMap, current pmetric.Metric) {
309
histogram := current.Histogram()
310
if histogram.AggregationTemporality() != pmetric.AggregationTemporalityCumulative {
311
// Only dealing with CumulativeDistributions.
312
return
313
}
314
315
currentPoints := histogram.DataPoints()
316
for i := 0; i < currentPoints.Len(); i++ {
317
currentDist := currentPoints.At(i)
318
tsi, found := tsm.get(current, currentDist.Attributes())
319
if !found {
320
// initialize everything.
321
tsi.histogram.startTime = currentDist.StartTimestamp()
322
tsi.histogram.previousCount = currentDist.Count()
323
tsi.histogram.previousSum = currentDist.Sum()
324
continue
325
}
326
327
if currentDist.Flags().NoRecordedValue() {
328
// TODO: Investigate why this does not reset.
329
currentDist.SetStartTimestamp(tsi.histogram.startTime)
330
continue
331
}
332
333
if currentDist.Count() < tsi.histogram.previousCount || currentDist.Sum() < tsi.histogram.previousSum {
334
// reset re-initialize everything.
335
tsi.histogram.startTime = currentDist.StartTimestamp()
336
tsi.histogram.previousCount = currentDist.Count()
337
tsi.histogram.previousSum = currentDist.Sum()
338
continue
339
}
340
341
// Update only previous values.
342
tsi.histogram.previousCount = currentDist.Count()
343
tsi.histogram.previousSum = currentDist.Sum()
344
currentDist.SetStartTimestamp(tsi.histogram.startTime)
345
}
346
}
347
348
func adjustMetricSum(tsm *timeseriesMap, current pmetric.Metric) {
349
currentPoints := current.Sum().DataPoints()
350
for i := 0; i < currentPoints.Len(); i++ {
351
currentSum := currentPoints.At(i)
352
tsi, found := tsm.get(current, currentSum.Attributes())
353
if !found {
354
// initialize everything.
355
tsi.number.startTime = currentSum.StartTimestamp()
356
tsi.number.previousValue = currentSum.DoubleValue()
357
continue
358
}
359
360
if currentSum.Flags().NoRecordedValue() {
361
// TODO: Investigate why this does not reset.
362
currentSum.SetStartTimestamp(tsi.number.startTime)
363
continue
364
}
365
366
if currentSum.DoubleValue() < tsi.number.previousValue {
367
// reset re-initialize everything.
368
tsi.number.startTime = currentSum.StartTimestamp()
369
tsi.number.previousValue = currentSum.DoubleValue()
370
continue
371
}
372
373
// Update only previous values.
374
tsi.number.previousValue = currentSum.DoubleValue()
375
currentSum.SetStartTimestamp(tsi.number.startTime)
376
}
377
}
378
379
func adjustMetricSummary(tsm *timeseriesMap, current pmetric.Metric) {
380
currentPoints := current.Summary().DataPoints()
381
382
for i := 0; i < currentPoints.Len(); i++ {
383
currentSummary := currentPoints.At(i)
384
tsi, found := tsm.get(current, currentSummary.Attributes())
385
if !found {
386
// initialize everything.
387
tsi.summary.startTime = currentSummary.StartTimestamp()
388
tsi.summary.previousCount = currentSummary.Count()
389
tsi.summary.previousSum = currentSummary.Sum()
390
continue
391
}
392
393
if currentSummary.Flags().NoRecordedValue() {
394
// TODO: Investigate why this does not reset.
395
currentSummary.SetStartTimestamp(tsi.summary.startTime)
396
continue
397
}
398
399
if (currentSummary.Count() != 0 &&
400
tsi.summary.previousCount != 0 &&
401
currentSummary.Count() < tsi.summary.previousCount) ||
402
(currentSummary.Sum() != 0 &&
403
tsi.summary.previousSum != 0 &&
404
currentSummary.Sum() < tsi.summary.previousSum) {
405
// reset re-initialize everything.
406
tsi.summary.startTime = currentSummary.StartTimestamp()
407
tsi.summary.previousCount = currentSummary.Count()
408
tsi.summary.previousSum = currentSummary.Sum()
409
continue
410
}
411
412
// Update only previous values.
413
tsi.summary.previousCount = currentSummary.Count()
414
tsi.summary.previousSum = currentSummary.Sum()
415
currentSummary.SetStartTimestamp(tsi.summary.startTime)
416
}
417
}
418
419