Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/otelcol/receiver/prometheus/internal/metricfamily.go
5460 views
1
// Copyright The OpenTelemetry Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/internal"
16
17
import (
18
"fmt"
19
"sort"
20
"strings"
21
22
"github.com/prometheus/prometheus/model/labels"
23
"github.com/prometheus/prometheus/model/value"
24
"github.com/prometheus/prometheus/scrape"
25
"go.opentelemetry.io/collector/pdata/pcommon"
26
"go.opentelemetry.io/collector/pdata/pmetric"
27
"go.uber.org/zap"
28
)
29
30
type metricFamily struct {
31
mtype pmetric.MetricType
32
// isMonotonic only applies to sums
33
isMonotonic bool
34
groups map[uint64]*metricGroup
35
name string
36
metadata *scrape.MetricMetadata
37
groupOrders []*metricGroup
38
}
39
40
// metricGroup, represents a single metric of a metric family. for example a histogram metric is usually represent by
41
// a couple data complexValue (buckets and count/sum), a group of a metric family always share a same set of tags. for
42
// simple types like counter and gauge, each data point is a group of itself
43
type metricGroup struct {
44
family *metricFamily
45
ts int64
46
ls labels.Labels
47
count float64
48
hasCount bool
49
sum float64
50
hasSum bool
51
value float64
52
complexValue []*dataPoint
53
}
54
55
func newMetricFamily(metricName string, mc scrape.MetricMetadataStore, logger *zap.Logger) *metricFamily {
56
metadata, familyName := metadataForMetric(metricName, mc)
57
mtype, isMonotonic := convToMetricType(metadata.Type)
58
if mtype == pmetric.MetricTypeEmpty {
59
logger.Debug(fmt.Sprintf("Unknown-typed metric : %s %+v", metricName, metadata))
60
}
61
62
return &metricFamily{
63
mtype: mtype,
64
isMonotonic: isMonotonic,
65
groups: make(map[uint64]*metricGroup),
66
name: familyName,
67
metadata: metadata,
68
}
69
}
70
71
// includesMetric returns true if the metric is part of the family
72
func (mf *metricFamily) includesMetric(metricName string) bool {
73
if mf.mtype != pmetric.MetricTypeGauge {
74
// If it is a merged family type, then it should match the
75
// family name when suffixes are trimmed.
76
return normalizeMetricName(metricName) == mf.name
77
}
78
// If it isn't a merged type, the metricName and family name should match
79
return metricName == mf.name
80
}
81
82
func (mf *metricFamily) getGroupKey(ls labels.Labels) uint64 {
83
bytes := make([]byte, 0, 2048)
84
hash, _ := ls.HashWithoutLabels(bytes, getSortedNotUsefulLabels(mf.mtype)...)
85
return hash
86
}
87
88
func (mg *metricGroup) sortPoints() {
89
sort.Slice(mg.complexValue, func(i, j int) bool {
90
return mg.complexValue[i].boundary < mg.complexValue[j].boundary
91
})
92
}
93
94
func (mg *metricGroup) toDistributionPoint(dest pmetric.HistogramDataPointSlice) {
95
if !mg.hasCount || len(mg.complexValue) == 0 {
96
return
97
}
98
99
mg.sortPoints()
100
101
// for OCAgent Proto, the bounds won't include +inf
102
// TODO: (@odeke-em) should we also check OpenTelemetry Pdata for bucket bounds?
103
bounds := make([]float64, len(mg.complexValue)-1)
104
bucketCounts := make([]uint64, len(mg.complexValue))
105
106
pointIsStale := value.IsStaleNaN(mg.sum) || value.IsStaleNaN(mg.count)
107
108
for i := 0; i < len(mg.complexValue); i++ {
109
if i != len(mg.complexValue)-1 {
110
// not need to add +inf as bound to oc proto
111
bounds[i] = mg.complexValue[i].boundary
112
}
113
adjustedCount := mg.complexValue[i].value
114
// Buckets still need to be sent to know to set them as stale,
115
// but a staleness NaN converted to uint64 would be an extremely large number.
116
// Setting to 0 instead.
117
if pointIsStale {
118
adjustedCount = 0
119
} else if i != 0 {
120
adjustedCount -= mg.complexValue[i-1].value
121
}
122
bucketCounts[i] = uint64(adjustedCount)
123
}
124
125
point := dest.AppendEmpty()
126
127
if pointIsStale {
128
point.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true))
129
} else {
130
point.SetCount(uint64(mg.count))
131
if mg.hasSum {
132
point.SetSum(mg.sum)
133
}
134
}
135
136
point.ExplicitBounds().FromRaw(bounds)
137
point.BucketCounts().FromRaw(bucketCounts)
138
139
// The timestamp MUST be in retrieved from milliseconds and converted to nanoseconds.
140
tsNanos := timestampFromMs(mg.ts)
141
point.SetStartTimestamp(tsNanos) // metrics_adjuster adjusts the startTimestamp to the initial scrape timestamp
142
point.SetTimestamp(tsNanos)
143
populateAttributes(pmetric.MetricTypeHistogram, mg.ls, point.Attributes())
144
}
145
146
func (mg *metricGroup) toSummaryPoint(dest pmetric.SummaryDataPointSlice) {
147
// expecting count to be provided, however, in the following two cases, they can be missed.
148
// 1. data is corrupted
149
// 2. ignored by startValue evaluation
150
if !mg.hasCount {
151
return
152
}
153
154
mg.sortPoints()
155
156
point := dest.AppendEmpty()
157
pointIsStale := value.IsStaleNaN(mg.sum) || value.IsStaleNaN(mg.count)
158
if pointIsStale {
159
point.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true))
160
} else {
161
if mg.hasSum {
162
point.SetSum(mg.sum)
163
}
164
point.SetCount(uint64(mg.count))
165
}
166
167
quantileValues := point.QuantileValues()
168
for _, p := range mg.complexValue {
169
quantile := quantileValues.AppendEmpty()
170
// Quantiles still need to be sent to know to set them as stale,
171
// but a staleness NaN converted to uint64 would be an extremely large number.
172
// By not setting the quantile value, it will default to 0.
173
if !pointIsStale {
174
quantile.SetValue(p.value)
175
}
176
quantile.SetQuantile(p.boundary)
177
}
178
179
// Based on the summary description from https://prometheus.io/docs/concepts/metric_types/#summary
180
// the quantiles are calculated over a sliding time window, however, the count is the total count of
181
// observations and the corresponding sum is a sum of all observed values, thus the sum and count used
182
// at the global level of the metricspb.SummaryValue
183
// The timestamp MUST be in retrieved from milliseconds and converted to nanoseconds.
184
tsNanos := timestampFromMs(mg.ts)
185
point.SetTimestamp(tsNanos)
186
point.SetStartTimestamp(tsNanos) // metrics_adjuster adjusts the startTimestamp to the initial scrape timestamp
187
populateAttributes(pmetric.MetricTypeSummary, mg.ls, point.Attributes())
188
}
189
190
func (mg *metricGroup) toNumberDataPoint(dest pmetric.NumberDataPointSlice) {
191
tsNanos := timestampFromMs(mg.ts)
192
point := dest.AppendEmpty()
193
// gauge/undefined types have no start time.
194
if mg.family.mtype == pmetric.MetricTypeSum {
195
point.SetStartTimestamp(tsNanos) // metrics_adjuster adjusts the startTimestamp to the initial scrape timestamp
196
}
197
point.SetTimestamp(tsNanos)
198
if value.IsStaleNaN(mg.value) {
199
point.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true))
200
} else {
201
point.SetDoubleValue(mg.value)
202
}
203
populateAttributes(pmetric.MetricTypeGauge, mg.ls, point.Attributes())
204
}
205
206
func populateAttributes(mType pmetric.MetricType, ls labels.Labels, dest pcommon.Map) {
207
dest.EnsureCapacity(ls.Len())
208
names := getSortedNotUsefulLabels(mType)
209
j := 0
210
for i := range ls {
211
for j < len(names) && names[j] < ls[i].Name {
212
j++
213
}
214
if j < len(names) && ls[i].Name == names[j] {
215
continue
216
}
217
if ls[i].Value == "" {
218
// empty label values should be omitted
219
continue
220
}
221
dest.PutStr(ls[i].Name, ls[i].Value)
222
}
223
}
224
225
func (mf *metricFamily) loadMetricGroupOrCreate(groupKey uint64, ls labels.Labels, ts int64) *metricGroup {
226
mg, ok := mf.groups[groupKey]
227
if !ok {
228
mg = &metricGroup{
229
family: mf,
230
ts: ts,
231
ls: ls,
232
}
233
mf.groups[groupKey] = mg
234
// maintaining data insertion order is helpful to generate stable/reproducible metric output
235
mf.groupOrders = append(mf.groupOrders, mg)
236
}
237
return mg
238
}
239
240
func (mf *metricFamily) Add(metricName string, ls labels.Labels, t int64, v float64) error {
241
groupKey := mf.getGroupKey(ls)
242
mg := mf.loadMetricGroupOrCreate(groupKey, ls, t)
243
if mg.ts != t {
244
return fmt.Errorf("inconsistent timestamps on metric points for metric %v", metricName)
245
}
246
switch mf.mtype {
247
case pmetric.MetricTypeHistogram, pmetric.MetricTypeSummary:
248
switch {
249
case strings.HasSuffix(metricName, metricsSuffixSum):
250
mg.sum = v
251
mg.hasSum = true
252
case strings.HasSuffix(metricName, metricsSuffixCount):
253
// always use the timestamp from count, because is the only required field for histograms and summaries.
254
mg.ts = t
255
mg.count = v
256
mg.hasCount = true
257
default:
258
boundary, err := getBoundary(mf.mtype, ls)
259
if err != nil {
260
return err
261
}
262
mg.complexValue = append(mg.complexValue, &dataPoint{value: v, boundary: boundary})
263
}
264
default:
265
mg.value = v
266
}
267
268
return nil
269
}
270
271
func (mf *metricFamily) appendMetric(metrics pmetric.MetricSlice) {
272
metric := pmetric.NewMetric()
273
metric.SetName(mf.name)
274
metric.SetDescription(mf.metadata.Help)
275
metric.SetUnit(mf.metadata.Unit)
276
277
pointCount := 0
278
279
switch mf.mtype {
280
case pmetric.MetricTypeHistogram:
281
histogram := metric.SetEmptyHistogram()
282
histogram.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
283
hdpL := histogram.DataPoints()
284
for _, mg := range mf.groupOrders {
285
mg.toDistributionPoint(hdpL)
286
}
287
pointCount = hdpL.Len()
288
289
case pmetric.MetricTypeSummary:
290
summary := metric.SetEmptySummary()
291
sdpL := summary.DataPoints()
292
for _, mg := range mf.groupOrders {
293
mg.toSummaryPoint(sdpL)
294
}
295
pointCount = sdpL.Len()
296
297
case pmetric.MetricTypeSum:
298
sum := metric.SetEmptySum()
299
sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
300
sum.SetIsMonotonic(mf.isMonotonic)
301
sdpL := sum.DataPoints()
302
for _, mg := range mf.groupOrders {
303
mg.toNumberDataPoint(sdpL)
304
}
305
pointCount = sdpL.Len()
306
307
default: // Everything else should be set to a Gauge.
308
gauge := metric.SetEmptyGauge()
309
gdpL := gauge.DataPoints()
310
for _, mg := range mf.groupOrders {
311
mg.toNumberDataPoint(gdpL)
312
}
313
pointCount = gdpL.Len()
314
}
315
316
if pointCount == 0 {
317
return
318
}
319
320
metric.MoveTo(metrics.AppendEmpty())
321
}
322
323