Path: blob/main/component/otelcol/receiver/prometheus/internal/transaction.go
5417 views
// Copyright The OpenTelemetry Authors1//2// Licensed under the Apache License, Version 2.0 (the "License");3// you may not use this file except in compliance with the License.4// You may obtain a copy of the License at5//6// http://www.apache.org/licenses/LICENSE-2.07//8// Unless required by applicable law or agreed to in writing, software9// distributed under the License is distributed on an "AS IS" BASIS,10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.11// See the License for the specific language governing permissions and12// limitations under the License.1314package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/internal"1516import (17"context"18"errors"19"fmt"20"sort"2122"github.com/prometheus/common/model"23"github.com/prometheus/prometheus/model/exemplar"24"github.com/prometheus/prometheus/model/histogram"25"github.com/prometheus/prometheus/model/labels"26"github.com/prometheus/prometheus/model/metadata"27"github.com/prometheus/prometheus/model/value"28"github.com/prometheus/prometheus/scrape"29"github.com/prometheus/prometheus/storage"30"go.opentelemetry.io/collector/component"31"go.opentelemetry.io/collector/consumer"32"go.opentelemetry.io/collector/obsreport"33"go.opentelemetry.io/collector/pdata/pcommon"34"go.opentelemetry.io/collector/pdata/pmetric"35"go.uber.org/zap"36)3738const (39targetMetricName = "target_info"40)4142type transaction struct {43isNew bool44ctx context.Context45families map[string]*metricFamily46mc scrape.MetricMetadataStore47sink consumer.Metrics48externalLabels labels.Labels49nodeResource pcommon.Resource50logger *zap.Logger51metricAdjuster MetricsAdjuster52obsrecv *obsreport.Receiver53}5455func newTransaction(56ctx context.Context,57metricAdjuster MetricsAdjuster,58sink consumer.Metrics,59externalLabels labels.Labels,60settings component.ReceiverCreateSettings,61obsrecv *obsreport.Receiver) *transaction {6263return &transaction{64ctx: ctx,65families: make(map[string]*metricFamily),66isNew: true,67sink: sink,68metricAdjuster: metricAdjuster,69externalLabels: externalLabels,70logger: settings.Logger,71obsrecv: obsrecv,72}73}7475// Append always returns 0 to disable label caching.76func (t *transaction) Append(ref storage.SeriesRef, ls labels.Labels, atMs int64, val float64) (storage.SeriesRef, error) {77select {78case <-t.ctx.Done():79return 0, errTransactionAborted80default:81}8283if len(t.externalLabels) != 0 {84ls = append(ls, t.externalLabels...)85sort.Sort(ls)86}8788if t.isNew {89if err := t.initTransaction(ls); err != nil {90return 0, err91}92}9394// Any datapoint with duplicate labels MUST be rejected per:95// * https://github.com/open-telemetry/wg-prometheus/issues/4496// * https://github.com/open-telemetry/opentelemetry-collector/issues/340797// as Prometheus rejects such too as of version 2.16.0, released on 2020-02-13.98if dupLabel, hasDup := ls.HasDuplicateLabelNames(); hasDup {99return 0, fmt.Errorf("invalid sample: non-unique label names: %q", dupLabel)100}101102metricName := ls.Get(model.MetricNameLabel)103if metricName == "" {104return 0, errMetricNameNotFound105}106107// See https://www.prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series108// up: 1 if the instance is healthy, i.e. reachable, or 0 if the scrape failed.109// But it can also be a staleNaN, which is inserted when the target goes away.110if metricName == scrapeUpMetricName && val != 1.0 && !value.IsStaleNaN(val) {111if val == 0.0 {112t.logger.Warn("Failed to scrape Prometheus endpoint",113zap.Int64("scrape_timestamp", atMs),114zap.Stringer("target_labels", ls))115} else {116t.logger.Warn("The 'up' metric contains invalid value",117zap.Float64("value", val),118zap.Int64("scrape_timestamp", atMs),119zap.Stringer("target_labels", ls))120}121}122123// For the `target_info` metric we need to convert it to resource attributes.124if metricName == targetMetricName {125return 0, t.AddTargetInfo(ls)126}127128curMF, ok := t.families[metricName]129if !ok {130familyName := normalizeMetricName(metricName)131if mf, ok := t.families[familyName]; ok && mf.includesMetric(metricName) {132curMF = mf133} else {134curMF = newMetricFamily(metricName, t.mc, t.logger)135t.families[curMF.name] = curMF136}137}138139return 0, curMF.Add(metricName, ls, atMs, val)140}141142func (t *transaction) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {143return 0, nil144}145146// getMetrics returns all metrics to the given slice.147// The only error returned by this function is errNoDataToBuild.148func (t *transaction) getMetrics(resource pcommon.Resource) (pmetric.Metrics, error) {149if len(t.families) == 0 {150return pmetric.Metrics{}, errNoDataToBuild151}152153md := pmetric.NewMetrics()154rms := md.ResourceMetrics().AppendEmpty()155resource.CopyTo(rms.Resource())156metrics := rms.ScopeMetrics().AppendEmpty().Metrics()157158for _, mf := range t.families {159mf.appendMetric(metrics)160}161162return md, nil163}164165func (t *transaction) initTransaction(labels labels.Labels) error {166target, ok := scrape.TargetFromContext(t.ctx)167if !ok {168return errors.New("unable to find target in context")169}170t.mc, ok = scrape.MetricMetadataStoreFromContext(t.ctx)171if !ok {172return errors.New("unable to find MetricMetadataStore in context")173}174175job, instance := labels.Get(model.JobLabel), labels.Get(model.InstanceLabel)176if job == "" || instance == "" {177return errNoJobInstance178}179t.nodeResource = CreateResource(job, instance, target.DiscoveredLabels())180t.isNew = false181return nil182}183184func (t *transaction) Commit() error {185if t.isNew {186return nil187}188189ctx := t.obsrecv.StartMetricsOp(t.ctx)190md, err := t.getMetrics(t.nodeResource)191if err != nil {192t.obsrecv.EndMetricsOp(ctx, dataformat, 0, err)193return err194}195196numPoints := md.DataPointCount()197if numPoints == 0 {198return nil199}200201if err = t.metricAdjuster.AdjustMetrics(md); err != nil {202t.obsrecv.EndMetricsOp(ctx, dataformat, numPoints, err)203return err204}205206err = t.sink.ConsumeMetrics(ctx, md)207t.obsrecv.EndMetricsOp(ctx, dataformat, numPoints, err)208return err209}210211func (t *transaction) Rollback() error {212return nil213}214215func (t *transaction) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {216//TODO: implement this func217return 0, nil218}219220func (t *transaction) AppendHistogram(ref storage.SeriesRef, l labels.Labels, ts int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {221//TODO: implement this func222return 0, nil223}224225func (t *transaction) AddTargetInfo(labels labels.Labels) error {226attrs := t.nodeResource.Attributes()227228for _, lbl := range labels {229if lbl.Name == model.JobLabel || lbl.Name == model.InstanceLabel || lbl.Name == model.MetricNameLabel {230continue231}232233attrs.PutStr(lbl.Name, lbl.Value)234}235236return nil237}238239240