Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/flow/tracing/internal/jaegerremote/sampler.go
4096 views
1
// Copyright The OpenTelemetry Authors
2
// Copyright (c) 2021 The Jaeger Authors.
3
// Copyright (c) 2017 Uber Technologies, Inc.
4
//
5
// Licensed under the Apache License, Version 2.0 (the "License");
6
// you may not use this file except in compliance with the License.
7
// You may obtain a copy of the License at
8
//
9
// http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing, software
12
// distributed under the License is distributed on an "AS IS" BASIS,
13
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
// See the License for the specific language governing permissions and
15
// limitations under the License.
16
17
//nolint:all
18
package jaegerremote
19
20
import (
21
"encoding/binary"
22
"fmt"
23
"math"
24
"sync"
25
26
"github.com/grafana/agent/pkg/flow/tracing/internal/jaegerremote/utils"
27
jaeger_api_v2 "github.com/jaegertracing/jaeger/proto-gen/api_v2"
28
"go.opentelemetry.io/otel/sdk/trace"
29
oteltrace "go.opentelemetry.io/otel/trace"
30
)
31
32
const (
33
defaultMaxOperations = 2000
34
)
35
36
// -----------------------
37
38
// probabilisticSampler is a sampler that randomly samples a certain percentage
39
// of traces.
40
type probabilisticSampler struct {
41
samplingRate float64
42
samplingBoundary uint64
43
}
44
45
const maxRandomNumber = ^(uint64(1) << 63) // i.e. 0x7fffffffffffffff
46
47
// newProbabilisticSampler creates a sampler that randomly samples a certain percentage of traces specified by the
48
// samplingRate, in the range between 0.0 and 1.0.
49
//
50
// It relies on the fact that new trace IDs are 63bit random numbers themselves, thus making the sampling decision
51
// without generating a new random number, but simply calculating if traceID < (samplingRate * 2^63).
52
func newProbabilisticSampler(samplingRate float64) *probabilisticSampler {
53
s := new(probabilisticSampler)
54
return s.init(samplingRate)
55
}
56
57
func (s *probabilisticSampler) init(samplingRate float64) *probabilisticSampler {
58
s.samplingRate = math.Max(0.0, math.Min(samplingRate, 1.0))
59
s.samplingBoundary = uint64(float64(maxRandomNumber) * s.samplingRate)
60
return s
61
}
62
63
// SamplingRate returns the sampling probability this sampled was constructed with.
64
func (s *probabilisticSampler) SamplingRate() float64 {
65
return s.samplingRate
66
}
67
68
func (s *probabilisticSampler) ShouldSample(p trace.SamplingParameters) trace.SamplingResult {
69
psc := oteltrace.SpanContextFromContext(p.ParentContext)
70
traceID := binary.BigEndian.Uint64(p.TraceID[0:8])
71
if s.samplingBoundary >= traceID&maxRandomNumber {
72
return trace.SamplingResult{
73
Decision: trace.RecordAndSample,
74
Tracestate: psc.TraceState(),
75
}
76
}
77
return trace.SamplingResult{
78
Decision: trace.Drop,
79
Tracestate: psc.TraceState(),
80
}
81
}
82
83
// Equal compares with another sampler.
84
func (s *probabilisticSampler) Equal(other trace.Sampler) bool {
85
if o, ok := other.(*probabilisticSampler); ok {
86
return s.samplingBoundary == o.samplingBoundary
87
}
88
return false
89
}
90
91
// Update modifies in-place the sampling rate. Locking must be done externally.
92
func (s *probabilisticSampler) Update(samplingRate float64) error {
93
if samplingRate < 0.0 || samplingRate > 1.0 {
94
return fmt.Errorf("sampling rate must be between 0.0 and 1.0, received %f", samplingRate)
95
}
96
s.init(samplingRate)
97
return nil
98
}
99
100
func (s *probabilisticSampler) Description() string {
101
return "probabilisticSampler{}"
102
}
103
104
// -----------------------
105
106
// rateLimitingSampler samples at most maxTracesPerSecond. The distribution of sampled traces follows
107
// burstiness of the service, i.e. a service with uniformly distributed requests will have those
108
// requests sampled uniformly as well, but if requests are bursty, especially sub-second, then a
109
// number of sequential requests can be sampled each second.
110
type rateLimitingSampler struct {
111
maxTracesPerSecond float64
112
rateLimiter *utils.RateLimiter
113
}
114
115
// newRateLimitingSampler creates new rateLimitingSampler.
116
func newRateLimitingSampler(maxTracesPerSecond float64) *rateLimitingSampler {
117
s := new(rateLimitingSampler)
118
return s.init(maxTracesPerSecond)
119
}
120
121
func (s *rateLimitingSampler) init(maxTracesPerSecond float64) *rateLimitingSampler {
122
if s.rateLimiter == nil {
123
s.rateLimiter = utils.NewRateLimiter(maxTracesPerSecond, math.Max(maxTracesPerSecond, 1.0))
124
} else {
125
s.rateLimiter.Update(maxTracesPerSecond, math.Max(maxTracesPerSecond, 1.0))
126
}
127
s.maxTracesPerSecond = maxTracesPerSecond
128
return s
129
}
130
131
func (s *rateLimitingSampler) ShouldSample(p trace.SamplingParameters) trace.SamplingResult {
132
psc := oteltrace.SpanContextFromContext(p.ParentContext)
133
if s.rateLimiter.CheckCredit(1.0) {
134
return trace.SamplingResult{
135
Decision: trace.RecordAndSample,
136
Tracestate: psc.TraceState(),
137
}
138
}
139
return trace.SamplingResult{
140
Decision: trace.Drop,
141
Tracestate: psc.TraceState(),
142
}
143
}
144
145
// Update reconfigures the rate limiter, while preserving its accumulated balance.
146
// Locking must be done externally.
147
func (s *rateLimitingSampler) Update(maxTracesPerSecond float64) {
148
if s.maxTracesPerSecond != maxTracesPerSecond {
149
s.init(maxTracesPerSecond)
150
}
151
}
152
153
// Equal compares with another sampler.
154
func (s *rateLimitingSampler) Equal(other trace.Sampler) bool {
155
if o, ok := other.(*rateLimitingSampler); ok {
156
return s.maxTracesPerSecond == o.maxTracesPerSecond
157
}
158
return false
159
}
160
161
func (s *rateLimitingSampler) Description() string {
162
return "rateLimitingSampler{}"
163
}
164
165
// -----------------------
166
167
// guaranteedThroughputProbabilisticSampler is a sampler that leverages both probabilisticSampler and
168
// rateLimitingSampler. The rateLimitingSampler is used as a guaranteed lower bound sampler such that
169
// every operation is sampled at least once in a time interval defined by the lowerBound. ie a lowerBound
170
// of 1.0 / (60 * 10) will sample an operation at least once every 10 minutes.
171
//
172
// The probabilisticSampler is given higher priority when tags are emitted, i.e. if IsSampled() for both
173
// samplers return true, the tags for probabilisticSampler will be used.
174
type guaranteedThroughputProbabilisticSampler struct {
175
probabilisticSampler *probabilisticSampler
176
lowerBoundSampler *rateLimitingSampler
177
samplingRate float64
178
lowerBound float64
179
}
180
181
func newGuaranteedThroughputProbabilisticSampler(lowerBound, samplingRate float64) *guaranteedThroughputProbabilisticSampler {
182
s := &guaranteedThroughputProbabilisticSampler{
183
lowerBoundSampler: newRateLimitingSampler(lowerBound),
184
lowerBound: lowerBound,
185
}
186
s.setProbabilisticSampler(samplingRate)
187
return s
188
}
189
190
func (s *guaranteedThroughputProbabilisticSampler) setProbabilisticSampler(samplingRate float64) {
191
if s.probabilisticSampler == nil {
192
s.probabilisticSampler = newProbabilisticSampler(samplingRate)
193
} else if s.samplingRate != samplingRate {
194
s.probabilisticSampler.init(samplingRate)
195
}
196
// since we don't validate samplingRate, sampler may have clamped it to [0, 1] interval
197
s.samplingRate = s.probabilisticSampler.SamplingRate()
198
}
199
200
func (s *guaranteedThroughputProbabilisticSampler) ShouldSample(p trace.SamplingParameters) trace.SamplingResult {
201
if result := s.probabilisticSampler.ShouldSample(p); result.Decision == trace.RecordAndSample {
202
s.lowerBoundSampler.ShouldSample(p)
203
return result
204
}
205
result := s.lowerBoundSampler.ShouldSample(p)
206
return result
207
}
208
209
// this function should only be called while holding a Write lock.
210
func (s *guaranteedThroughputProbabilisticSampler) update(lowerBound, samplingRate float64) {
211
s.setProbabilisticSampler(samplingRate)
212
if s.lowerBound != lowerBound {
213
s.lowerBoundSampler.Update(lowerBound)
214
s.lowerBound = lowerBound
215
}
216
}
217
218
func (s *guaranteedThroughputProbabilisticSampler) Description() string {
219
return "guaranteedThroughputProbabilisticSampler{}"
220
}
221
222
// -----------------------
223
224
// perOperationSampler is a delegating sampler that applies guaranteedThroughputProbabilisticSampler
225
// on a per-operation basis.
226
type perOperationSampler struct {
227
sync.RWMutex
228
229
samplers map[string]*guaranteedThroughputProbabilisticSampler
230
defaultSampler *probabilisticSampler
231
lowerBound float64
232
maxOperations int
233
234
// see description in perOperationSamplerParams
235
operationNameLateBinding bool
236
}
237
238
// perOperationSamplerParams defines parameters when creating perOperationSampler.
239
type perOperationSamplerParams struct {
240
// Max number of operations that will be tracked. Other operations will be given default strategy.
241
MaxOperations int
242
243
// Opt-in feature for applications that require late binding of span name via explicit call to SetOperationName.
244
// When this feature is enabled, the sampler will return retryable=true from OnCreateSpan(), thus leaving
245
// the sampling decision as non-final (and the span as writeable). This may lead to degraded performance
246
// in applications that always provide the correct span name on oteltrace creation.
247
//
248
// For backwards compatibility this option is off by default.
249
OperationNameLateBinding bool
250
251
// Initial configuration of the sampling strategies (usually retrieved from the backend by Remote Sampler).
252
Strategies *jaeger_api_v2.PerOperationSamplingStrategies
253
}
254
255
// newPerOperationSampler returns a new perOperationSampler.
256
func newPerOperationSampler(params perOperationSamplerParams) *perOperationSampler {
257
if params.MaxOperations <= 0 {
258
params.MaxOperations = defaultMaxOperations
259
}
260
samplers := make(map[string]*guaranteedThroughputProbabilisticSampler)
261
for _, strategy := range params.Strategies.PerOperationStrategies {
262
sampler := newGuaranteedThroughputProbabilisticSampler(
263
params.Strategies.DefaultLowerBoundTracesPerSecond,
264
strategy.ProbabilisticSampling.SamplingRate,
265
)
266
samplers[strategy.Operation] = sampler
267
}
268
return &perOperationSampler{
269
samplers: samplers,
270
defaultSampler: newProbabilisticSampler(params.Strategies.DefaultSamplingProbability),
271
lowerBound: params.Strategies.DefaultLowerBoundTracesPerSecond,
272
maxOperations: params.MaxOperations,
273
operationNameLateBinding: params.OperationNameLateBinding,
274
}
275
}
276
277
func (s *perOperationSampler) ShouldSample(p trace.SamplingParameters) trace.SamplingResult {
278
sampler := s.getSamplerForOperation(p.Name)
279
return sampler.ShouldSample(p)
280
}
281
282
func (s *perOperationSampler) getSamplerForOperation(operation string) trace.Sampler {
283
s.RLock()
284
sampler, ok := s.samplers[operation]
285
if ok {
286
defer s.RUnlock()
287
return sampler
288
}
289
s.RUnlock()
290
s.Lock()
291
defer s.Unlock()
292
293
// Check if sampler has already been created
294
sampler, ok = s.samplers[operation]
295
if ok {
296
return sampler
297
}
298
// Store only up to maxOperations of unique ops.
299
if len(s.samplers) >= s.maxOperations {
300
return s.defaultSampler
301
}
302
newSampler := newGuaranteedThroughputProbabilisticSampler(s.lowerBound, s.defaultSampler.SamplingRate())
303
s.samplers[operation] = newSampler
304
return newSampler
305
}
306
307
func (s *perOperationSampler) Description() string {
308
return "perOperationSampler{}"
309
}
310
311
func (s *perOperationSampler) update(strategies *jaeger_api_v2.PerOperationSamplingStrategies) {
312
s.Lock()
313
defer s.Unlock()
314
newSamplers := map[string]*guaranteedThroughputProbabilisticSampler{}
315
for _, strategy := range strategies.PerOperationStrategies {
316
operation := strategy.Operation
317
samplingRate := strategy.ProbabilisticSampling.SamplingRate
318
lowerBound := strategies.DefaultLowerBoundTracesPerSecond
319
if sampler, ok := s.samplers[operation]; ok {
320
sampler.update(lowerBound, samplingRate)
321
newSamplers[operation] = sampler
322
} else {
323
sampler := newGuaranteedThroughputProbabilisticSampler(
324
lowerBound,
325
samplingRate,
326
)
327
newSamplers[operation] = sampler
328
}
329
}
330
s.lowerBound = strategies.DefaultLowerBoundTracesPerSecond
331
if s.defaultSampler.SamplingRate() != strategies.DefaultSamplingProbability {
332
s.defaultSampler = newProbabilisticSampler(strategies.DefaultSamplingProbability)
333
}
334
s.samplers = newSamplers
335
}
336
337