Path: blob/main/pkg/flow/tracing/internal/jaegerremote/sampler.go
4096 views
// Copyright The OpenTelemetry Authors1// Copyright (c) 2021 The Jaeger Authors.2// Copyright (c) 2017 Uber Technologies, Inc.3//4// Licensed under the Apache License, Version 2.0 (the "License");5// you may not use this file except in compliance with the License.6// You may obtain a copy of the License at7//8// http://www.apache.org/licenses/LICENSE-2.09//10// Unless required by applicable law or agreed to in writing, software11// distributed under the License is distributed on an "AS IS" BASIS,12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13// See the License for the specific language governing permissions and14// limitations under the License.1516//nolint:all17package jaegerremote1819import (20"encoding/binary"21"fmt"22"math"23"sync"2425"github.com/grafana/agent/pkg/flow/tracing/internal/jaegerremote/utils"26jaeger_api_v2 "github.com/jaegertracing/jaeger/proto-gen/api_v2"27"go.opentelemetry.io/otel/sdk/trace"28oteltrace "go.opentelemetry.io/otel/trace"29)3031const (32defaultMaxOperations = 200033)3435// -----------------------3637// probabilisticSampler is a sampler that randomly samples a certain percentage38// of traces.39type probabilisticSampler struct {40samplingRate float6441samplingBoundary uint6442}4344const maxRandomNumber = ^(uint64(1) << 63) // i.e. 0x7fffffffffffffff4546// newProbabilisticSampler creates a sampler that randomly samples a certain percentage of traces specified by the47// samplingRate, in the range between 0.0 and 1.0.48//49// It relies on the fact that new trace IDs are 63bit random numbers themselves, thus making the sampling decision50// without generating a new random number, but simply calculating if traceID < (samplingRate * 2^63).51func newProbabilisticSampler(samplingRate float64) *probabilisticSampler {52s := new(probabilisticSampler)53return s.init(samplingRate)54}5556func (s *probabilisticSampler) init(samplingRate float64) *probabilisticSampler {57s.samplingRate = math.Max(0.0, math.Min(samplingRate, 1.0))58s.samplingBoundary = uint64(float64(maxRandomNumber) * s.samplingRate)59return s60}6162// SamplingRate returns the sampling probability this sampled was constructed with.63func (s *probabilisticSampler) SamplingRate() float64 {64return s.samplingRate65}6667func (s *probabilisticSampler) ShouldSample(p trace.SamplingParameters) trace.SamplingResult {68psc := oteltrace.SpanContextFromContext(p.ParentContext)69traceID := binary.BigEndian.Uint64(p.TraceID[0:8])70if s.samplingBoundary >= traceID&maxRandomNumber {71return trace.SamplingResult{72Decision: trace.RecordAndSample,73Tracestate: psc.TraceState(),74}75}76return trace.SamplingResult{77Decision: trace.Drop,78Tracestate: psc.TraceState(),79}80}8182// Equal compares with another sampler.83func (s *probabilisticSampler) Equal(other trace.Sampler) bool {84if o, ok := other.(*probabilisticSampler); ok {85return s.samplingBoundary == o.samplingBoundary86}87return false88}8990// Update modifies in-place the sampling rate. Locking must be done externally.91func (s *probabilisticSampler) Update(samplingRate float64) error {92if samplingRate < 0.0 || samplingRate > 1.0 {93return fmt.Errorf("sampling rate must be between 0.0 and 1.0, received %f", samplingRate)94}95s.init(samplingRate)96return nil97}9899func (s *probabilisticSampler) Description() string {100return "probabilisticSampler{}"101}102103// -----------------------104105// rateLimitingSampler samples at most maxTracesPerSecond. The distribution of sampled traces follows106// burstiness of the service, i.e. a service with uniformly distributed requests will have those107// requests sampled uniformly as well, but if requests are bursty, especially sub-second, then a108// number of sequential requests can be sampled each second.109type rateLimitingSampler struct {110maxTracesPerSecond float64111rateLimiter *utils.RateLimiter112}113114// newRateLimitingSampler creates new rateLimitingSampler.115func newRateLimitingSampler(maxTracesPerSecond float64) *rateLimitingSampler {116s := new(rateLimitingSampler)117return s.init(maxTracesPerSecond)118}119120func (s *rateLimitingSampler) init(maxTracesPerSecond float64) *rateLimitingSampler {121if s.rateLimiter == nil {122s.rateLimiter = utils.NewRateLimiter(maxTracesPerSecond, math.Max(maxTracesPerSecond, 1.0))123} else {124s.rateLimiter.Update(maxTracesPerSecond, math.Max(maxTracesPerSecond, 1.0))125}126s.maxTracesPerSecond = maxTracesPerSecond127return s128}129130func (s *rateLimitingSampler) ShouldSample(p trace.SamplingParameters) trace.SamplingResult {131psc := oteltrace.SpanContextFromContext(p.ParentContext)132if s.rateLimiter.CheckCredit(1.0) {133return trace.SamplingResult{134Decision: trace.RecordAndSample,135Tracestate: psc.TraceState(),136}137}138return trace.SamplingResult{139Decision: trace.Drop,140Tracestate: psc.TraceState(),141}142}143144// Update reconfigures the rate limiter, while preserving its accumulated balance.145// Locking must be done externally.146func (s *rateLimitingSampler) Update(maxTracesPerSecond float64) {147if s.maxTracesPerSecond != maxTracesPerSecond {148s.init(maxTracesPerSecond)149}150}151152// Equal compares with another sampler.153func (s *rateLimitingSampler) Equal(other trace.Sampler) bool {154if o, ok := other.(*rateLimitingSampler); ok {155return s.maxTracesPerSecond == o.maxTracesPerSecond156}157return false158}159160func (s *rateLimitingSampler) Description() string {161return "rateLimitingSampler{}"162}163164// -----------------------165166// guaranteedThroughputProbabilisticSampler is a sampler that leverages both probabilisticSampler and167// rateLimitingSampler. The rateLimitingSampler is used as a guaranteed lower bound sampler such that168// every operation is sampled at least once in a time interval defined by the lowerBound. ie a lowerBound169// of 1.0 / (60 * 10) will sample an operation at least once every 10 minutes.170//171// The probabilisticSampler is given higher priority when tags are emitted, i.e. if IsSampled() for both172// samplers return true, the tags for probabilisticSampler will be used.173type guaranteedThroughputProbabilisticSampler struct {174probabilisticSampler *probabilisticSampler175lowerBoundSampler *rateLimitingSampler176samplingRate float64177lowerBound float64178}179180func newGuaranteedThroughputProbabilisticSampler(lowerBound, samplingRate float64) *guaranteedThroughputProbabilisticSampler {181s := &guaranteedThroughputProbabilisticSampler{182lowerBoundSampler: newRateLimitingSampler(lowerBound),183lowerBound: lowerBound,184}185s.setProbabilisticSampler(samplingRate)186return s187}188189func (s *guaranteedThroughputProbabilisticSampler) setProbabilisticSampler(samplingRate float64) {190if s.probabilisticSampler == nil {191s.probabilisticSampler = newProbabilisticSampler(samplingRate)192} else if s.samplingRate != samplingRate {193s.probabilisticSampler.init(samplingRate)194}195// since we don't validate samplingRate, sampler may have clamped it to [0, 1] interval196s.samplingRate = s.probabilisticSampler.SamplingRate()197}198199func (s *guaranteedThroughputProbabilisticSampler) ShouldSample(p trace.SamplingParameters) trace.SamplingResult {200if result := s.probabilisticSampler.ShouldSample(p); result.Decision == trace.RecordAndSample {201s.lowerBoundSampler.ShouldSample(p)202return result203}204result := s.lowerBoundSampler.ShouldSample(p)205return result206}207208// this function should only be called while holding a Write lock.209func (s *guaranteedThroughputProbabilisticSampler) update(lowerBound, samplingRate float64) {210s.setProbabilisticSampler(samplingRate)211if s.lowerBound != lowerBound {212s.lowerBoundSampler.Update(lowerBound)213s.lowerBound = lowerBound214}215}216217func (s *guaranteedThroughputProbabilisticSampler) Description() string {218return "guaranteedThroughputProbabilisticSampler{}"219}220221// -----------------------222223// perOperationSampler is a delegating sampler that applies guaranteedThroughputProbabilisticSampler224// on a per-operation basis.225type perOperationSampler struct {226sync.RWMutex227228samplers map[string]*guaranteedThroughputProbabilisticSampler229defaultSampler *probabilisticSampler230lowerBound float64231maxOperations int232233// see description in perOperationSamplerParams234operationNameLateBinding bool235}236237// perOperationSamplerParams defines parameters when creating perOperationSampler.238type perOperationSamplerParams struct {239// Max number of operations that will be tracked. Other operations will be given default strategy.240MaxOperations int241242// Opt-in feature for applications that require late binding of span name via explicit call to SetOperationName.243// When this feature is enabled, the sampler will return retryable=true from OnCreateSpan(), thus leaving244// the sampling decision as non-final (and the span as writeable). This may lead to degraded performance245// in applications that always provide the correct span name on oteltrace creation.246//247// For backwards compatibility this option is off by default.248OperationNameLateBinding bool249250// Initial configuration of the sampling strategies (usually retrieved from the backend by Remote Sampler).251Strategies *jaeger_api_v2.PerOperationSamplingStrategies252}253254// newPerOperationSampler returns a new perOperationSampler.255func newPerOperationSampler(params perOperationSamplerParams) *perOperationSampler {256if params.MaxOperations <= 0 {257params.MaxOperations = defaultMaxOperations258}259samplers := make(map[string]*guaranteedThroughputProbabilisticSampler)260for _, strategy := range params.Strategies.PerOperationStrategies {261sampler := newGuaranteedThroughputProbabilisticSampler(262params.Strategies.DefaultLowerBoundTracesPerSecond,263strategy.ProbabilisticSampling.SamplingRate,264)265samplers[strategy.Operation] = sampler266}267return &perOperationSampler{268samplers: samplers,269defaultSampler: newProbabilisticSampler(params.Strategies.DefaultSamplingProbability),270lowerBound: params.Strategies.DefaultLowerBoundTracesPerSecond,271maxOperations: params.MaxOperations,272operationNameLateBinding: params.OperationNameLateBinding,273}274}275276func (s *perOperationSampler) ShouldSample(p trace.SamplingParameters) trace.SamplingResult {277sampler := s.getSamplerForOperation(p.Name)278return sampler.ShouldSample(p)279}280281func (s *perOperationSampler) getSamplerForOperation(operation string) trace.Sampler {282s.RLock()283sampler, ok := s.samplers[operation]284if ok {285defer s.RUnlock()286return sampler287}288s.RUnlock()289s.Lock()290defer s.Unlock()291292// Check if sampler has already been created293sampler, ok = s.samplers[operation]294if ok {295return sampler296}297// Store only up to maxOperations of unique ops.298if len(s.samplers) >= s.maxOperations {299return s.defaultSampler300}301newSampler := newGuaranteedThroughputProbabilisticSampler(s.lowerBound, s.defaultSampler.SamplingRate())302s.samplers[operation] = newSampler303return newSampler304}305306func (s *perOperationSampler) Description() string {307return "perOperationSampler{}"308}309310func (s *perOperationSampler) update(strategies *jaeger_api_v2.PerOperationSamplingStrategies) {311s.Lock()312defer s.Unlock()313newSamplers := map[string]*guaranteedThroughputProbabilisticSampler{}314for _, strategy := range strategies.PerOperationStrategies {315operation := strategy.Operation316samplingRate := strategy.ProbabilisticSampling.SamplingRate317lowerBound := strategies.DefaultLowerBoundTracesPerSecond318if sampler, ok := s.samplers[operation]; ok {319sampler.update(lowerBound, samplingRate)320newSamplers[operation] = sampler321} else {322sampler := newGuaranteedThroughputProbabilisticSampler(323lowerBound,324samplingRate,325)326newSamplers[operation] = sampler327}328}329s.lowerBound = strategies.DefaultLowerBoundTracesPerSecond330if s.defaultSampler.SamplingRate() != strategies.DefaultSamplingProbability {331s.defaultSampler = newProbabilisticSampler(strategies.DefaultSamplingProbability)332}333s.samplers = newSamplers334}335336337