Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/pkg/flow/tracing/internal/jaegerremote/sampler_remote.go
4096 views
1
// Copyright The OpenTelemetry Authors
2
// Copyright (c) 2021 The Jaeger Authors.
3
// Copyright (c) 2017 Uber Technologies, Inc.
4
//
5
// Licensed under the Apache License, Version 2.0 (the "License");
6
// you may not use this file except in compliance with the License.
7
// You may obtain a copy of the License at
8
//
9
// http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing, software
12
// distributed under the License is distributed on an "AS IS" BASIS,
13
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
// See the License for the specific language governing permissions and
15
// limitations under the License.
16
17
//nolint:all
18
package jaegerremote
19
20
import (
21
"bytes"
22
"fmt"
23
"io"
24
"net/http"
25
"net/url"
26
"sync"
27
"sync/atomic"
28
"time"
29
30
"github.com/golang/protobuf/jsonpb"
31
jaeger_api_v2 "github.com/jaegertracing/jaeger/proto-gen/api_v2"
32
"go.opentelemetry.io/otel"
33
"go.opentelemetry.io/otel/sdk/trace"
34
)
35
36
const (
37
defaultRemoteSamplingTimeout = 10 * time.Second
38
defaultSamplingRefreshInterval = time.Minute
39
defaultSamplingMaxOperations = 256
40
defaultSamplingOperationNameLateBinding = true
41
)
42
43
// samplingStrategyFetcher is used to fetch sampling strategy updates from remote server.
44
type samplingStrategyFetcher interface {
45
Fetch(service string) ([]byte, error)
46
}
47
48
// samplingStrategyParser is used to parse sampling strategy updates. The output object
49
// should be of the type that is recognized by the SamplerUpdaters.
50
type samplingStrategyParser interface {
51
Parse(response []byte) (interface{}, error)
52
}
53
54
// samplerUpdater is used by Sampler to apply sampling strategies,
55
// retrieved from remote config server, to the current sampler. The updater can modify
56
// the sampler in-place if sampler supports it, or create a new one.
57
//
58
// If the strategy does not contain configuration for the sampler in question,
59
// updater must return modifiedSampler=nil to give other updaters a chance to inspect
60
// the sampling strategy response.
61
//
62
// Sampler invokes the updaters while holding a lock on the main sampler.
63
type samplerUpdater interface {
64
Update(sampler trace.Sampler, strategy interface{}) (modified trace.Sampler, err error)
65
}
66
67
// Sampler is a delegating sampler that polls a remote server
68
// for the appropriate sampling strategy, constructs a corresponding sampler and
69
// delegates to it for sampling decisions.
70
type Sampler struct {
71
// These fields must be first in the struct because `sync/atomic` expects 64-bit alignment.
72
// Cf. https://github.com/uber/jaeger-client-go/issues/155, https://goo.gl/zW7dgq
73
closed int64 // 0 - not closed, 1 - closed
74
75
sync.RWMutex // used to serialize access to samplerConfig.sampler
76
config
77
78
serviceName string
79
doneChan chan *sync.WaitGroup
80
}
81
82
// New creates a sampler that periodically pulls
83
// the sampling strategy from an HTTP sampling server (e.g. jaeger-agent).
84
func New(
85
serviceName string,
86
opts ...Option,
87
) *Sampler {
88
options := newConfig(opts...)
89
sampler := &Sampler{
90
config: options,
91
serviceName: serviceName,
92
doneChan: make(chan *sync.WaitGroup),
93
}
94
go sampler.pollController()
95
return sampler
96
}
97
98
// ShouldSample returns a sampling choice based on the passed sampling
99
// parameters.
100
func (s *Sampler) ShouldSample(p trace.SamplingParameters) trace.SamplingResult {
101
s.RLock()
102
defer s.RUnlock()
103
return s.sampler.ShouldSample(p)
104
}
105
106
// Close does a clean shutdown of the sampler, stopping any background
107
// go-routines it may have started.
108
func (s *Sampler) Close() {
109
if swapped := atomic.CompareAndSwapInt64(&s.closed, 0, 1); !swapped {
110
otel.Handle(fmt.Errorf("repeated attempt to close the sampler is ignored"))
111
return
112
}
113
114
var wg sync.WaitGroup
115
wg.Add(1)
116
s.doneChan <- &wg
117
wg.Wait()
118
}
119
120
// Description returns a human-readable name for the Sampler.
121
func (s *Sampler) Description() string {
122
return "JaegerRemoteSampler{}"
123
}
124
125
func (s *Sampler) pollController() {
126
ticker := time.NewTicker(s.samplingRefreshInterval)
127
defer ticker.Stop()
128
s.pollControllerWithTicker(ticker)
129
}
130
131
func (s *Sampler) pollControllerWithTicker(ticker *time.Ticker) {
132
s.UpdateSampler()
133
134
for {
135
select {
136
case <-ticker.C:
137
s.UpdateSampler()
138
case wg := <-s.doneChan:
139
wg.Done()
140
return
141
}
142
}
143
}
144
145
func (s *Sampler) setSampler(sampler trace.Sampler) {
146
s.Lock()
147
defer s.Unlock()
148
s.sampler = sampler
149
}
150
151
// UpdateSampler forces the sampler to fetch sampling strategy from backend server.
152
// This function is called automatically on a timer, but can also be safely called manually, e.g. from tests.
153
func (s *Sampler) UpdateSampler() {
154
res, err := s.samplingFetcher.Fetch(s.serviceName)
155
if err != nil {
156
// log.Printf("failed to fetch sampling strategy: %v", err)
157
return
158
}
159
strategy, err := s.samplingParser.Parse(res)
160
if err != nil {
161
// log.Printf("failed to parse sampling strategy response: %v", err)
162
return
163
}
164
165
s.Lock()
166
defer s.Unlock()
167
168
if err := s.updateSamplerViaUpdaters(strategy); err != nil {
169
// c.logger.Infof("failed to handle sampling strategy response %+v. Got error: %v", res, err)
170
return
171
}
172
}
173
174
// NB: this function should only be called while holding a Write lock.
175
func (s *Sampler) updateSamplerViaUpdaters(strategy interface{}) error {
176
for _, updater := range s.updaters {
177
sampler, err := updater.Update(s.sampler, strategy)
178
if err != nil {
179
return err
180
}
181
if sampler != nil {
182
s.sampler = sampler
183
return nil
184
}
185
}
186
return fmt.Errorf("unsupported sampling strategy %+v", strategy)
187
}
188
189
// -----------------------
190
191
// probabilisticSamplerUpdater is used by Sampler to parse sampling configuration.
192
type probabilisticSamplerUpdater struct{}
193
194
// Update implements Update of samplerUpdater.
195
func (u *probabilisticSamplerUpdater) Update(sampler trace.Sampler, strategy interface{}) (trace.Sampler, error) {
196
type response interface {
197
GetProbabilisticSampling() *jaeger_api_v2.ProbabilisticSamplingStrategy
198
}
199
var _ response = new(jaeger_api_v2.SamplingStrategyResponse) // sanity signature check
200
if resp, ok := strategy.(response); ok {
201
if probabilistic := resp.GetProbabilisticSampling(); probabilistic != nil {
202
if ps, ok := sampler.(*probabilisticSampler); ok {
203
if err := ps.Update(probabilistic.SamplingRate); err != nil {
204
return nil, err
205
}
206
return sampler, nil
207
}
208
return newProbabilisticSampler(probabilistic.SamplingRate), nil
209
}
210
}
211
return nil, nil
212
}
213
214
// -----------------------
215
216
// rateLimitingSamplerUpdater is used by Sampler to parse sampling configuration.
217
type rateLimitingSamplerUpdater struct{}
218
219
// Update implements Update of samplerUpdater.
220
func (u *rateLimitingSamplerUpdater) Update(sampler trace.Sampler, strategy interface{}) (trace.Sampler, error) {
221
type response interface {
222
GetRateLimitingSampling() *jaeger_api_v2.RateLimitingSamplingStrategy
223
}
224
var _ response = new(jaeger_api_v2.SamplingStrategyResponse) // sanity signature check
225
if resp, ok := strategy.(response); ok {
226
if rateLimiting := resp.GetRateLimitingSampling(); rateLimiting != nil {
227
rateLimit := float64(rateLimiting.MaxTracesPerSecond)
228
if rl, ok := sampler.(*rateLimitingSampler); ok {
229
rl.Update(rateLimit)
230
return rl, nil
231
}
232
return newRateLimitingSampler(rateLimit), nil
233
}
234
}
235
return nil, nil
236
}
237
238
// -----------------------
239
240
// perOperationSamplerUpdater is used by Sampler to parse sampling configuration.
241
// Fields have the same meaning as in perOperationSamplerParams.
242
type perOperationSamplerUpdater struct {
243
MaxOperations int
244
OperationNameLateBinding bool
245
}
246
247
// Update implements Update of samplerUpdater.
248
func (u *perOperationSamplerUpdater) Update(sampler trace.Sampler, strategy interface{}) (trace.Sampler, error) {
249
type response interface {
250
GetOperationSampling() *jaeger_api_v2.PerOperationSamplingStrategies
251
}
252
var _ response = new(jaeger_api_v2.SamplingStrategyResponse) // sanity signature check
253
if p, ok := strategy.(response); ok {
254
if operations := p.GetOperationSampling(); operations != nil {
255
if as, ok := sampler.(*perOperationSampler); ok {
256
as.update(operations)
257
return as, nil
258
}
259
return newPerOperationSampler(perOperationSamplerParams{
260
MaxOperations: u.MaxOperations,
261
OperationNameLateBinding: u.OperationNameLateBinding,
262
Strategies: operations,
263
}), nil
264
}
265
}
266
return nil, nil
267
}
268
269
// -----------------------
270
271
type httpSamplingStrategyFetcher struct {
272
serverURL string
273
httpClient http.Client
274
}
275
276
func newHTTPSamplingStrategyFetcher(serverURL string) *httpSamplingStrategyFetcher {
277
customTransport := http.DefaultTransport.(*http.Transport).Clone()
278
customTransport.ResponseHeaderTimeout = defaultRemoteSamplingTimeout
279
280
return &httpSamplingStrategyFetcher{
281
serverURL: serverURL,
282
httpClient: http.Client{
283
Transport: customTransport,
284
},
285
}
286
}
287
288
func (f *httpSamplingStrategyFetcher) Fetch(serviceName string) ([]byte, error) {
289
v := url.Values{}
290
v.Set("service", serviceName)
291
uri := f.serverURL + "?" + v.Encode()
292
293
resp, err := f.httpClient.Get(uri)
294
if err != nil {
295
return nil, err
296
}
297
defer resp.Body.Close()
298
299
body, err := io.ReadAll(resp.Body)
300
if err != nil {
301
return nil, err
302
}
303
304
if resp.StatusCode >= 400 {
305
return nil, fmt.Errorf("status code: %d, body: %c", resp.StatusCode, body)
306
}
307
308
return body, nil
309
}
310
311
// -----------------------
312
313
type samplingStrategyParserImpl struct{}
314
315
func (p *samplingStrategyParserImpl) Parse(response []byte) (interface{}, error) {
316
strategy := new(jaeger_api_v2.SamplingStrategyResponse)
317
if err := jsonpb.Unmarshal(bytes.NewReader(response), strategy); err != nil {
318
return nil, err
319
}
320
return strategy, nil
321
}
322
323