Path: blob/main/pkg/traces/remotewriteexporter/exporter.go
4096 views
package remotewriteexporter12import (3"context"4"fmt"5"strconv"6"strings"7"sync"8"time"910util "github.com/cortexproject/cortex/pkg/util/log"11"github.com/go-kit/log"12"github.com/go-kit/log/level"13"github.com/grafana/agent/pkg/metrics/instance"14"github.com/grafana/agent/pkg/traces/contextkeys"15"github.com/prometheus/prometheus/model/labels"16"go.opentelemetry.io/collector/component"17"go.opentelemetry.io/collector/consumer"18"go.opentelemetry.io/collector/pdata/pcommon"19"go.opentelemetry.io/collector/pdata/pmetric"20)2122const (23nameLabelKey = "__name__"24sumSuffix = "sum"25countSuffix = "count"26bucketSuffix = "bucket"27leStr = "le"28infBucket = "+Inf"29noSuffix = ""30)3132type datapoint struct {33ts int6434v float6435l labels.Labels36}3738type remoteWriteExporter struct {39mtx sync.Mutex4041close chan struct{}42closed chan struct{}4344manager instance.Manager45promInstance string4647constLabels labels.Labels48namespace string4950seriesMap map[uint64]*datapoint51staleTime int6452lastFlush int6453loopInterval time.Duration5455logger log.Logger56}5758func newRemoteWriteExporter(cfg *Config) (component.MetricsExporter, error) {59logger := log.With(util.Logger, "component", "traces remote write exporter")6061ls := make(labels.Labels, 0, len(cfg.ConstLabels))6263for name, value := range cfg.ConstLabels {64ls = append(ls, labels.Label{Name: name, Value: value})65}6667staleTime := (15 * time.Minute).Milliseconds()68if cfg.StaleTime > 0 {69staleTime = cfg.StaleTime.Milliseconds()70}7172loopInterval := time.Second73if cfg.LoopInterval > 0 {74loopInterval = cfg.LoopInterval75}7677return &remoteWriteExporter{78mtx: sync.Mutex{},79close: make(chan struct{}),80closed: make(chan struct{}),81constLabels: ls,82namespace: cfg.Namespace,83promInstance: cfg.PromInstance,84seriesMap: make(map[uint64]*datapoint),85staleTime: staleTime,86loopInterval: loopInterval,87logger: logger,88}, nil89}9091func (e *remoteWriteExporter) Start(ctx context.Context, _ component.Host) error {92manager, ok := ctx.Value(contextkeys.Metrics).(instance.Manager)93if !ok || manager == nil {94return fmt.Errorf("key does not contain a InstanceManager instance")95}96e.manager = manager9798go e.appenderLoop()99100return nil101}102103func (e *remoteWriteExporter) Shutdown(ctx context.Context) error {104close(e.close)105106select {107case <-e.closed:108return nil109case <-ctx.Done():110return ctx.Err()111}112}113114func (e *remoteWriteExporter) Capabilities() consumer.Capabilities {115return consumer.Capabilities{}116}117118func (e *remoteWriteExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {119select {120case <-e.closed:121return nil122default:123}124125resourceMetrics := md.ResourceMetrics()126for i := 0; i < resourceMetrics.Len(); i++ {127resourceMetric := resourceMetrics.At(i)128scopeMetricsSlice := resourceMetric.ScopeMetrics()129for j := 0; j < scopeMetricsSlice.Len(); j++ {130metricSlice := scopeMetricsSlice.At(j).Metrics()131for k := 0; k < metricSlice.Len(); k++ {132switch metric := metricSlice.At(k); metric.Type() {133case pmetric.MetricTypeGauge:134dataPoints := metric.Sum().DataPoints()135if err := e.handleNumberDataPoints(metric.Name(), dataPoints); err != nil {136return err137}138case pmetric.MetricTypeSum:139if metric.Sum().AggregationTemporality() != pmetric.AggregationTemporalityCumulative {140continue // Only cumulative metrics are supported141}142dataPoints := metric.Sum().DataPoints()143if err := e.handleNumberDataPoints(metric.Name(), dataPoints); err != nil {144return err145}146case pmetric.MetricTypeHistogram:147if metric.Histogram().AggregationTemporality() != pmetric.AggregationTemporalityCumulative {148continue // Only cumulative metrics are supported149}150dataPoints := metric.Histogram().DataPoints()151e.handleHistogramDataPoints(metric.Name(), dataPoints)152case pmetric.MetricTypeSummary:153return fmt.Errorf("unsupported metric data type %s", metric.Type())154default:155return fmt.Errorf("unsupported metric data type %s", metric.Type())156}157}158}159}160161return nil162}163164func (e *remoteWriteExporter) handleNumberDataPoints(name string, dataPoints pmetric.NumberDataPointSlice) error {165for ix := 0; ix < dataPoints.Len(); ix++ {166dataPoint := dataPoints.At(ix)167lbls := e.createLabelSet(name, noSuffix, dataPoint.Attributes(), labels.Labels{})168if err := e.appendNumberDataPoint(dataPoint, lbls); err != nil {169return fmt.Errorf("failed to process datapoints %s", err)170}171}172return nil173}174175func (e *remoteWriteExporter) appendNumberDataPoint(dataPoint pmetric.NumberDataPoint, labels labels.Labels) error {176var val float64177switch dataPoint.ValueType() {178case pmetric.NumberDataPointValueTypeDouble:179val = dataPoint.DoubleValue()180case pmetric.NumberDataPointValueTypeInt:181val = float64(dataPoint.IntValue())182default:183return fmt.Errorf("unknown data point type: %s", dataPoint.ValueType())184}185ts := e.timestamp()186187e.appendDatapointForSeries(labels, ts, val)188189return nil190}191192func (e *remoteWriteExporter) handleHistogramDataPoints(name string, dataPoints pmetric.HistogramDataPointSlice) {193for ix := 0; ix < dataPoints.Len(); ix++ {194dataPoint := dataPoints.At(ix)195ts := e.timestamp()196197// Append sum value198sumLabels := e.createLabelSet(name, sumSuffix, dataPoint.Attributes(), labels.Labels{})199e.appendDatapointForSeries(sumLabels, ts, dataPoint.Sum())200201// Append count value202countLabels := e.createLabelSet(name, countSuffix, dataPoint.Attributes(), labels.Labels{})203e.appendDatapointForSeries(countLabels, ts, float64(dataPoint.Count()))204205var cumulativeCount uint64206for ix := 0; ix < dataPoint.ExplicitBounds().Len(); ix++ {207eb := dataPoint.ExplicitBounds().At(ix)208209if ix >= dataPoint.BucketCounts().Len() {210break211}212cumulativeCount += dataPoint.BucketCounts().At(ix)213boundStr := strconv.FormatFloat(eb, 'f', -1, 64)214bucketLabels := e.createLabelSet(name, bucketSuffix, dataPoint.Attributes(), labels.Labels{{Name: leStr, Value: boundStr}})215e.appendDatapointForSeries(bucketLabels, ts, float64(cumulativeCount))216}217218// add le=+Inf bucket219cumulativeCount += dataPoint.BucketCounts().At(dataPoint.BucketCounts().Len() - 1)220infBucketLabels := e.createLabelSet(name, bucketSuffix, dataPoint.Attributes(), labels.Labels{{Name: leStr, Value: infBucket}})221e.appendDatapointForSeries(infBucketLabels, ts, float64(cumulativeCount))222}223}224225func (e *remoteWriteExporter) appendDatapointForSeries(l labels.Labels, ts int64, v float64) {226e.mtx.Lock()227defer e.mtx.Unlock()228229series := l.Hash()230if lastDatapoint, ok := e.seriesMap[series]; ok {231if lastDatapoint.ts >= ts {232return233}234lastDatapoint.ts = ts235lastDatapoint.v = v236return237}238239e.seriesMap[series] = &datapoint{l: l, ts: ts, v: v}240}241242func (e *remoteWriteExporter) appenderLoop() {243t := time.NewTicker(e.loopInterval)244245for {246select {247case <-t.C:248e.mtx.Lock()249inst, err := e.manager.GetInstance(e.promInstance)250if err != nil {251level.Error(e.logger).Log("msg", "failed to get prom instance", "err", err)252continue253}254appender := inst.Appender(context.Background())255256now := time.Now().UnixMilli()257for _, dp := range e.seriesMap {258// If the datapoint hasn't been updated since the last loop, don't append it259if dp.ts < e.lastFlush {260// If the datapoint is older than now - staleTime, it is stale and gets removed.261if now-dp.ts > e.staleTime {262delete(e.seriesMap, dp.l.Hash())263}264continue265}266267if _, err := appender.Append(0, dp.l, dp.ts, dp.v); err != nil {268level.Error(e.logger).Log("msg", "failed to append datapoint", "err", err)269}270}271272if err := appender.Commit(); err != nil {273level.Error(e.logger).Log("msg", "failed to commit appender", "err", err)274}275276e.lastFlush = now277e.mtx.Unlock()278279case <-e.close:280close(e.closed)281return282}283}284}285286func (e *remoteWriteExporter) createLabelSet(name, suffix string, labelMap pcommon.Map, customLabels labels.Labels) labels.Labels {287ls := make(labels.Labels, 0, labelMap.Len()+1+len(e.constLabels)+len(customLabels))288// Labels from spanmetrics processor289labelMap.Range(func(k string, v pcommon.Value) bool {290ls = append(ls, labels.Label{291Name: strings.Replace(k, ".", "_", -1),292Value: v.Str(),293})294return true295})296// Metric name label297ls = append(ls, labels.Label{298Name: nameLabelKey,299Value: metricName(e.namespace, name, suffix),300})301// Const labels302ls = append(ls, e.constLabels...)303// Custom labels304ls = append(ls, customLabels...)305return ls306}307308func (e *remoteWriteExporter) timestamp() int64 {309return time.Now().UnixMilli()310}311312func metricName(namespace, metric, suffix string) string {313if len(suffix) != 0 {314return fmt.Sprintf("%s_%s_%s", namespace, metric, suffix)315}316return fmt.Sprintf("%s_%s", namespace, metric)317}318319320