Path: blob/main/pkg/flow/tracing/internal/jaegerremote/utils/rate_limiter.go
4096 views
// Copyright The OpenTelemetry Authors1// Copyright (c) 2021 The Jaeger Authors.2// Copyright (c) 2017 Uber Technologies, Inc.3//4// Licensed under the Apache License, Version 2.0 (the "License");5// you may not use this file except in compliance with the License.6// You may obtain a copy of the License at7//8// http://www.apache.org/licenses/LICENSE-2.09//10// Unless required by applicable law or agreed to in writing, software11// distributed under the License is distributed on an "AS IS" BASIS,12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13// See the License for the specific language governing permissions and14// limitations under the License.1516//nolint:all17package utils1819import (20"sync"21"time"22)2324// RateLimiter is a filter used to check if a message that is worth itemCost units is within the rate limits.25//2627// RateLimiter is a rate limiter based on leaky bucket algorithm, formulated in terms of a28// credits balance that is replenished every time CheckCredit() method is called (tick) by the amount proportional29// to the time elapsed since the last tick, up to max of creditsPerSecond. A call to CheckCredit() takes a cost30// of an item we want to pay with the balance. If the balance exceeds the cost of the item, the item is "purchased"31// and the balance reduced, indicated by returned value of true. Otherwise the balance is unchanged and return false.32//33// This can be used to limit a rate of messages emitted by a service by instantiating the Rate Limiter with the34// max number of messages a service is allowed to emit per second, and calling CheckCredit(1.0) for each message35// to determine if the message is within the rate limit.36//37// It can also be used to limit the rate of traffic in bytes, by setting creditsPerSecond to desired throughput38// as bytes/second, and calling CheckCredit() with the actual message size.39type RateLimiter struct {40lock sync.Mutex4142creditsPerSecond float6443balance float6444maxBalance float6445lastTick time.Time4647timeNow func() time.Time48}4950// NewRateLimiter creates a new RateLimiter.51func NewRateLimiter(creditsPerSecond, maxBalance float64) *RateLimiter {52return &RateLimiter{53creditsPerSecond: creditsPerSecond,54balance: maxBalance,55maxBalance: maxBalance,56lastTick: time.Now(),57timeNow: time.Now,58}59}6061// CheckCredit tries to reduce the current balance by itemCost provided that the current balance62// is not lest than itemCost.63func (rl *RateLimiter) CheckCredit(itemCost float64) bool {64rl.lock.Lock()65defer rl.lock.Unlock()6667// if we have enough credits to pay for current item, then reduce balance and allow68if rl.balance >= itemCost {69rl.balance -= itemCost70return true71}72// otherwise check if balance can be increased due to time elapsed, and try again73rl.updateBalance()74if rl.balance >= itemCost {75rl.balance -= itemCost76return true77}78return false79}8081// updateBalance recalculates current balance based on time elapsed. Must be called while holding a lock.82func (rl *RateLimiter) updateBalance() {83// calculate how much time passed since the last tick, and update current tick84currentTime := rl.timeNow()85elapsedTime := currentTime.Sub(rl.lastTick)86rl.lastTick = currentTime87// calculate how much credit have we accumulated since the last tick88rl.balance += elapsedTime.Seconds() * rl.creditsPerSecond89if rl.balance > rl.maxBalance {90rl.balance = rl.maxBalance91}92}9394// Update changes the main parameters of the rate limiter in-place, while retaining95// the current accumulated balance (pro-rated to the new maxBalance value). Using this method96// instead of creating a new rate limiter helps to avoid thundering herd when sampling97// strategies are updated.98func (rl *RateLimiter) Update(creditsPerSecond, maxBalance float64) {99rl.lock.Lock()100defer rl.lock.Unlock()101102rl.updateBalance() // get up-to-date balance103rl.balance = rl.balance * maxBalance / rl.maxBalance104rl.creditsPerSecond = creditsPerSecond105rl.maxBalance = maxBalance106}107108109