Path: blob/main/pkg/flow/tracing/internal/jaegerremote/sampler_remote.go
4096 views
// Copyright The OpenTelemetry Authors1// Copyright (c) 2021 The Jaeger Authors.2// Copyright (c) 2017 Uber Technologies, Inc.3//4// Licensed under the Apache License, Version 2.0 (the "License");5// you may not use this file except in compliance with the License.6// You may obtain a copy of the License at7//8// http://www.apache.org/licenses/LICENSE-2.09//10// Unless required by applicable law or agreed to in writing, software11// distributed under the License is distributed on an "AS IS" BASIS,12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13// See the License for the specific language governing permissions and14// limitations under the License.1516//nolint:all17package jaegerremote1819import (20"bytes"21"fmt"22"io"23"net/http"24"net/url"25"sync"26"sync/atomic"27"time"2829"github.com/golang/protobuf/jsonpb"30jaeger_api_v2 "github.com/jaegertracing/jaeger/proto-gen/api_v2"31"go.opentelemetry.io/otel"32"go.opentelemetry.io/otel/sdk/trace"33)3435const (36defaultRemoteSamplingTimeout = 10 * time.Second37defaultSamplingRefreshInterval = time.Minute38defaultSamplingMaxOperations = 25639defaultSamplingOperationNameLateBinding = true40)4142// samplingStrategyFetcher is used to fetch sampling strategy updates from remote server.43type samplingStrategyFetcher interface {44Fetch(service string) ([]byte, error)45}4647// samplingStrategyParser is used to parse sampling strategy updates. The output object48// should be of the type that is recognized by the SamplerUpdaters.49type samplingStrategyParser interface {50Parse(response []byte) (interface{}, error)51}5253// samplerUpdater is used by Sampler to apply sampling strategies,54// retrieved from remote config server, to the current sampler. The updater can modify55// the sampler in-place if sampler supports it, or create a new one.56//57// If the strategy does not contain configuration for the sampler in question,58// updater must return modifiedSampler=nil to give other updaters a chance to inspect59// the sampling strategy response.60//61// Sampler invokes the updaters while holding a lock on the main sampler.62type samplerUpdater interface {63Update(sampler trace.Sampler, strategy interface{}) (modified trace.Sampler, err error)64}6566// Sampler is a delegating sampler that polls a remote server67// for the appropriate sampling strategy, constructs a corresponding sampler and68// delegates to it for sampling decisions.69type Sampler struct {70// These fields must be first in the struct because `sync/atomic` expects 64-bit alignment.71// Cf. https://github.com/uber/jaeger-client-go/issues/155, https://goo.gl/zW7dgq72closed int64 // 0 - not closed, 1 - closed7374sync.RWMutex // used to serialize access to samplerConfig.sampler75config7677serviceName string78doneChan chan *sync.WaitGroup79}8081// New creates a sampler that periodically pulls82// the sampling strategy from an HTTP sampling server (e.g. jaeger-agent).83func New(84serviceName string,85opts ...Option,86) *Sampler {87options := newConfig(opts...)88sampler := &Sampler{89config: options,90serviceName: serviceName,91doneChan: make(chan *sync.WaitGroup),92}93go sampler.pollController()94return sampler95}9697// ShouldSample returns a sampling choice based on the passed sampling98// parameters.99func (s *Sampler) ShouldSample(p trace.SamplingParameters) trace.SamplingResult {100s.RLock()101defer s.RUnlock()102return s.sampler.ShouldSample(p)103}104105// Close does a clean shutdown of the sampler, stopping any background106// go-routines it may have started.107func (s *Sampler) Close() {108if swapped := atomic.CompareAndSwapInt64(&s.closed, 0, 1); !swapped {109otel.Handle(fmt.Errorf("repeated attempt to close the sampler is ignored"))110return111}112113var wg sync.WaitGroup114wg.Add(1)115s.doneChan <- &wg116wg.Wait()117}118119// Description returns a human-readable name for the Sampler.120func (s *Sampler) Description() string {121return "JaegerRemoteSampler{}"122}123124func (s *Sampler) pollController() {125ticker := time.NewTicker(s.samplingRefreshInterval)126defer ticker.Stop()127s.pollControllerWithTicker(ticker)128}129130func (s *Sampler) pollControllerWithTicker(ticker *time.Ticker) {131s.UpdateSampler()132133for {134select {135case <-ticker.C:136s.UpdateSampler()137case wg := <-s.doneChan:138wg.Done()139return140}141}142}143144func (s *Sampler) setSampler(sampler trace.Sampler) {145s.Lock()146defer s.Unlock()147s.sampler = sampler148}149150// UpdateSampler forces the sampler to fetch sampling strategy from backend server.151// This function is called automatically on a timer, but can also be safely called manually, e.g. from tests.152func (s *Sampler) UpdateSampler() {153res, err := s.samplingFetcher.Fetch(s.serviceName)154if err != nil {155// log.Printf("failed to fetch sampling strategy: %v", err)156return157}158strategy, err := s.samplingParser.Parse(res)159if err != nil {160// log.Printf("failed to parse sampling strategy response: %v", err)161return162}163164s.Lock()165defer s.Unlock()166167if err := s.updateSamplerViaUpdaters(strategy); err != nil {168// c.logger.Infof("failed to handle sampling strategy response %+v. Got error: %v", res, err)169return170}171}172173// NB: this function should only be called while holding a Write lock.174func (s *Sampler) updateSamplerViaUpdaters(strategy interface{}) error {175for _, updater := range s.updaters {176sampler, err := updater.Update(s.sampler, strategy)177if err != nil {178return err179}180if sampler != nil {181s.sampler = sampler182return nil183}184}185return fmt.Errorf("unsupported sampling strategy %+v", strategy)186}187188// -----------------------189190// probabilisticSamplerUpdater is used by Sampler to parse sampling configuration.191type probabilisticSamplerUpdater struct{}192193// Update implements Update of samplerUpdater.194func (u *probabilisticSamplerUpdater) Update(sampler trace.Sampler, strategy interface{}) (trace.Sampler, error) {195type response interface {196GetProbabilisticSampling() *jaeger_api_v2.ProbabilisticSamplingStrategy197}198var _ response = new(jaeger_api_v2.SamplingStrategyResponse) // sanity signature check199if resp, ok := strategy.(response); ok {200if probabilistic := resp.GetProbabilisticSampling(); probabilistic != nil {201if ps, ok := sampler.(*probabilisticSampler); ok {202if err := ps.Update(probabilistic.SamplingRate); err != nil {203return nil, err204}205return sampler, nil206}207return newProbabilisticSampler(probabilistic.SamplingRate), nil208}209}210return nil, nil211}212213// -----------------------214215// rateLimitingSamplerUpdater is used by Sampler to parse sampling configuration.216type rateLimitingSamplerUpdater struct{}217218// Update implements Update of samplerUpdater.219func (u *rateLimitingSamplerUpdater) Update(sampler trace.Sampler, strategy interface{}) (trace.Sampler, error) {220type response interface {221GetRateLimitingSampling() *jaeger_api_v2.RateLimitingSamplingStrategy222}223var _ response = new(jaeger_api_v2.SamplingStrategyResponse) // sanity signature check224if resp, ok := strategy.(response); ok {225if rateLimiting := resp.GetRateLimitingSampling(); rateLimiting != nil {226rateLimit := float64(rateLimiting.MaxTracesPerSecond)227if rl, ok := sampler.(*rateLimitingSampler); ok {228rl.Update(rateLimit)229return rl, nil230}231return newRateLimitingSampler(rateLimit), nil232}233}234return nil, nil235}236237// -----------------------238239// perOperationSamplerUpdater is used by Sampler to parse sampling configuration.240// Fields have the same meaning as in perOperationSamplerParams.241type perOperationSamplerUpdater struct {242MaxOperations int243OperationNameLateBinding bool244}245246// Update implements Update of samplerUpdater.247func (u *perOperationSamplerUpdater) Update(sampler trace.Sampler, strategy interface{}) (trace.Sampler, error) {248type response interface {249GetOperationSampling() *jaeger_api_v2.PerOperationSamplingStrategies250}251var _ response = new(jaeger_api_v2.SamplingStrategyResponse) // sanity signature check252if p, ok := strategy.(response); ok {253if operations := p.GetOperationSampling(); operations != nil {254if as, ok := sampler.(*perOperationSampler); ok {255as.update(operations)256return as, nil257}258return newPerOperationSampler(perOperationSamplerParams{259MaxOperations: u.MaxOperations,260OperationNameLateBinding: u.OperationNameLateBinding,261Strategies: operations,262}), nil263}264}265return nil, nil266}267268// -----------------------269270type httpSamplingStrategyFetcher struct {271serverURL string272httpClient http.Client273}274275func newHTTPSamplingStrategyFetcher(serverURL string) *httpSamplingStrategyFetcher {276customTransport := http.DefaultTransport.(*http.Transport).Clone()277customTransport.ResponseHeaderTimeout = defaultRemoteSamplingTimeout278279return &httpSamplingStrategyFetcher{280serverURL: serverURL,281httpClient: http.Client{282Transport: customTransport,283},284}285}286287func (f *httpSamplingStrategyFetcher) Fetch(serviceName string) ([]byte, error) {288v := url.Values{}289v.Set("service", serviceName)290uri := f.serverURL + "?" + v.Encode()291292resp, err := f.httpClient.Get(uri)293if err != nil {294return nil, err295}296defer resp.Body.Close()297298body, err := io.ReadAll(resp.Body)299if err != nil {300return nil, err301}302303if resp.StatusCode >= 400 {304return nil, fmt.Errorf("status code: %d, body: %c", resp.StatusCode, body)305}306307return body, nil308}309310// -----------------------311312type samplingStrategyParserImpl struct{}313314func (p *samplingStrategyParserImpl) Parse(response []byte) (interface{}, error) {315strategy := new(jaeger_api_v2.SamplingStrategyResponse)316if err := jsonpb.Unmarshal(bytes.NewReader(response), strategy); err != nil {317return nil, err318}319return strategy, nil320}321322323