Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/otelcol/receiver/prometheus/internal/transaction.go
5417 views
1
// Copyright The OpenTelemetry Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/internal"
16
17
import (
18
"context"
19
"errors"
20
"fmt"
21
"sort"
22
23
"github.com/prometheus/common/model"
24
"github.com/prometheus/prometheus/model/exemplar"
25
"github.com/prometheus/prometheus/model/histogram"
26
"github.com/prometheus/prometheus/model/labels"
27
"github.com/prometheus/prometheus/model/metadata"
28
"github.com/prometheus/prometheus/model/value"
29
"github.com/prometheus/prometheus/scrape"
30
"github.com/prometheus/prometheus/storage"
31
"go.opentelemetry.io/collector/component"
32
"go.opentelemetry.io/collector/consumer"
33
"go.opentelemetry.io/collector/obsreport"
34
"go.opentelemetry.io/collector/pdata/pcommon"
35
"go.opentelemetry.io/collector/pdata/pmetric"
36
"go.uber.org/zap"
37
)
38
39
const (
40
targetMetricName = "target_info"
41
)
42
43
type transaction struct {
44
isNew bool
45
ctx context.Context
46
families map[string]*metricFamily
47
mc scrape.MetricMetadataStore
48
sink consumer.Metrics
49
externalLabels labels.Labels
50
nodeResource pcommon.Resource
51
logger *zap.Logger
52
metricAdjuster MetricsAdjuster
53
obsrecv *obsreport.Receiver
54
}
55
56
func newTransaction(
57
ctx context.Context,
58
metricAdjuster MetricsAdjuster,
59
sink consumer.Metrics,
60
externalLabels labels.Labels,
61
settings component.ReceiverCreateSettings,
62
obsrecv *obsreport.Receiver) *transaction {
63
64
return &transaction{
65
ctx: ctx,
66
families: make(map[string]*metricFamily),
67
isNew: true,
68
sink: sink,
69
metricAdjuster: metricAdjuster,
70
externalLabels: externalLabels,
71
logger: settings.Logger,
72
obsrecv: obsrecv,
73
}
74
}
75
76
// Append always returns 0 to disable label caching.
77
func (t *transaction) Append(ref storage.SeriesRef, ls labels.Labels, atMs int64, val float64) (storage.SeriesRef, error) {
78
select {
79
case <-t.ctx.Done():
80
return 0, errTransactionAborted
81
default:
82
}
83
84
if len(t.externalLabels) != 0 {
85
ls = append(ls, t.externalLabels...)
86
sort.Sort(ls)
87
}
88
89
if t.isNew {
90
if err := t.initTransaction(ls); err != nil {
91
return 0, err
92
}
93
}
94
95
// Any datapoint with duplicate labels MUST be rejected per:
96
// * https://github.com/open-telemetry/wg-prometheus/issues/44
97
// * https://github.com/open-telemetry/opentelemetry-collector/issues/3407
98
// as Prometheus rejects such too as of version 2.16.0, released on 2020-02-13.
99
if dupLabel, hasDup := ls.HasDuplicateLabelNames(); hasDup {
100
return 0, fmt.Errorf("invalid sample: non-unique label names: %q", dupLabel)
101
}
102
103
metricName := ls.Get(model.MetricNameLabel)
104
if metricName == "" {
105
return 0, errMetricNameNotFound
106
}
107
108
// See https://www.prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series
109
// up: 1 if the instance is healthy, i.e. reachable, or 0 if the scrape failed.
110
// But it can also be a staleNaN, which is inserted when the target goes away.
111
if metricName == scrapeUpMetricName && val != 1.0 && !value.IsStaleNaN(val) {
112
if val == 0.0 {
113
t.logger.Warn("Failed to scrape Prometheus endpoint",
114
zap.Int64("scrape_timestamp", atMs),
115
zap.Stringer("target_labels", ls))
116
} else {
117
t.logger.Warn("The 'up' metric contains invalid value",
118
zap.Float64("value", val),
119
zap.Int64("scrape_timestamp", atMs),
120
zap.Stringer("target_labels", ls))
121
}
122
}
123
124
// For the `target_info` metric we need to convert it to resource attributes.
125
if metricName == targetMetricName {
126
return 0, t.AddTargetInfo(ls)
127
}
128
129
curMF, ok := t.families[metricName]
130
if !ok {
131
familyName := normalizeMetricName(metricName)
132
if mf, ok := t.families[familyName]; ok && mf.includesMetric(metricName) {
133
curMF = mf
134
} else {
135
curMF = newMetricFamily(metricName, t.mc, t.logger)
136
t.families[curMF.name] = curMF
137
}
138
}
139
140
return 0, curMF.Add(metricName, ls, atMs, val)
141
}
142
143
func (t *transaction) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
144
return 0, nil
145
}
146
147
// getMetrics returns all metrics to the given slice.
148
// The only error returned by this function is errNoDataToBuild.
149
func (t *transaction) getMetrics(resource pcommon.Resource) (pmetric.Metrics, error) {
150
if len(t.families) == 0 {
151
return pmetric.Metrics{}, errNoDataToBuild
152
}
153
154
md := pmetric.NewMetrics()
155
rms := md.ResourceMetrics().AppendEmpty()
156
resource.CopyTo(rms.Resource())
157
metrics := rms.ScopeMetrics().AppendEmpty().Metrics()
158
159
for _, mf := range t.families {
160
mf.appendMetric(metrics)
161
}
162
163
return md, nil
164
}
165
166
func (t *transaction) initTransaction(labels labels.Labels) error {
167
target, ok := scrape.TargetFromContext(t.ctx)
168
if !ok {
169
return errors.New("unable to find target in context")
170
}
171
t.mc, ok = scrape.MetricMetadataStoreFromContext(t.ctx)
172
if !ok {
173
return errors.New("unable to find MetricMetadataStore in context")
174
}
175
176
job, instance := labels.Get(model.JobLabel), labels.Get(model.InstanceLabel)
177
if job == "" || instance == "" {
178
return errNoJobInstance
179
}
180
t.nodeResource = CreateResource(job, instance, target.DiscoveredLabels())
181
t.isNew = false
182
return nil
183
}
184
185
func (t *transaction) Commit() error {
186
if t.isNew {
187
return nil
188
}
189
190
ctx := t.obsrecv.StartMetricsOp(t.ctx)
191
md, err := t.getMetrics(t.nodeResource)
192
if err != nil {
193
t.obsrecv.EndMetricsOp(ctx, dataformat, 0, err)
194
return err
195
}
196
197
numPoints := md.DataPointCount()
198
if numPoints == 0 {
199
return nil
200
}
201
202
if err = t.metricAdjuster.AdjustMetrics(md); err != nil {
203
t.obsrecv.EndMetricsOp(ctx, dataformat, numPoints, err)
204
return err
205
}
206
207
err = t.sink.ConsumeMetrics(ctx, md)
208
t.obsrecv.EndMetricsOp(ctx, dataformat, numPoints, err)
209
return err
210
}
211
212
func (t *transaction) Rollback() error {
213
return nil
214
}
215
216
func (t *transaction) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
217
//TODO: implement this func
218
return 0, nil
219
}
220
221
func (t *transaction) AppendHistogram(ref storage.SeriesRef, l labels.Labels, ts int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
222
//TODO: implement this func
223
return 0, nil
224
}
225
226
func (t *transaction) AddTargetInfo(labels labels.Labels) error {
227
attrs := t.nodeResource.Attributes()
228
229
for _, lbl := range labels {
230
if lbl.Name == model.JobLabel || lbl.Name == model.InstanceLabel || lbl.Name == model.MetricNameLabel {
231
continue
232
}
233
234
attrs.PutStr(lbl.Name, lbl.Value)
235
}
236
237
return nil
238
}
239
240