Path: blob/main/contrib/libevent/bufferevent_ratelim.c
39475 views
/*1* Copyright (c) 2007-2012 Niels Provos and Nick Mathewson2* Copyright (c) 2002-2006 Niels Provos <[email protected]>3* All rights reserved.4*5* Redistribution and use in source and binary forms, with or without6* modification, are permitted provided that the following conditions7* are met:8* 1. Redistributions of source code must retain the above copyright9* notice, this list of conditions and the following disclaimer.10* 2. Redistributions in binary form must reproduce the above copyright11* notice, this list of conditions and the following disclaimer in the12* documentation and/or other materials provided with the distribution.13* 3. The name of the author may not be used to endorse or promote products14* derived from this software without specific prior written permission.15*16* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR17* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES18* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.19* IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,20* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT21* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,22* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY23* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT24* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF25* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.26*/27#include "evconfig-private.h"2829#include <sys/types.h>30#include <limits.h>31#include <string.h>32#include <stdlib.h>3334#include "event2/event.h"35#include "event2/event_struct.h"36#include "event2/util.h"37#include "event2/bufferevent.h"38#include "event2/bufferevent_struct.h"39#include "event2/buffer.h"4041#include "ratelim-internal.h"4243#include "bufferevent-internal.h"44#include "mm-internal.h"45#include "util-internal.h"46#include "event-internal.h"4748int49ev_token_bucket_init_(struct ev_token_bucket *bucket,50const struct ev_token_bucket_cfg *cfg,51ev_uint32_t current_tick,52int reinitialize)53{54if (reinitialize) {55/* on reinitialization, we only clip downwards, since we've56already used who-knows-how-much bandwidth this tick. We57leave "last_updated" as it is; the next update will add the58appropriate amount of bandwidth to the bucket.59*/60if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)61bucket->read_limit = cfg->read_maximum;62if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)63bucket->write_limit = cfg->write_maximum;64} else {65bucket->read_limit = cfg->read_rate;66bucket->write_limit = cfg->write_rate;67bucket->last_updated = current_tick;68}69return 0;70}7172int73ev_token_bucket_update_(struct ev_token_bucket *bucket,74const struct ev_token_bucket_cfg *cfg,75ev_uint32_t current_tick)76{77/* It's okay if the tick number overflows, since we'll just78* wrap around when we do the unsigned substraction. */79unsigned n_ticks = current_tick - bucket->last_updated;8081/* Make sure some ticks actually happened, and that time didn't82* roll back. */83if (n_ticks == 0 || n_ticks > INT_MAX)84return 0;8586/* Naively, we would say87bucket->limit += n_ticks * cfg->rate;8889if (bucket->limit > cfg->maximum)90bucket->limit = cfg->maximum;9192But we're worried about overflow, so we do it like this:93*/9495if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)96bucket->read_limit = cfg->read_maximum;97else98bucket->read_limit += n_ticks * cfg->read_rate;99100101if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)102bucket->write_limit = cfg->write_maximum;103else104bucket->write_limit += n_ticks * cfg->write_rate;105106107bucket->last_updated = current_tick;108109return 1;110}111112static inline void113bufferevent_update_buckets(struct bufferevent_private *bev)114{115/* Must hold lock on bev. */116struct timeval now;117unsigned tick;118event_base_gettimeofday_cached(bev->bev.ev_base, &now);119tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);120if (tick != bev->rate_limiting->limit.last_updated)121ev_token_bucket_update_(&bev->rate_limiting->limit,122bev->rate_limiting->cfg, tick);123}124125ev_uint32_t126ev_token_bucket_get_tick_(const struct timeval *tv,127const struct ev_token_bucket_cfg *cfg)128{129/* This computation uses two multiplies and a divide. We could do130* fewer if we knew that the tick length was an integer number of131* seconds, or if we knew it divided evenly into a second. We should132* investigate that more.133*/134135/* We cast to an ev_uint64_t first, since we don't want to overflow136* before we do the final divide. */137ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;138return (unsigned)(msec / cfg->msec_per_tick);139}140141struct ev_token_bucket_cfg *142ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,143size_t write_rate, size_t write_burst,144const struct timeval *tick_len)145{146struct ev_token_bucket_cfg *r;147struct timeval g;148if (! tick_len) {149g.tv_sec = 1;150g.tv_usec = 0;151tick_len = &g;152}153if (read_rate > read_burst || write_rate > write_burst ||154read_rate < 1 || write_rate < 1)155return NULL;156if (read_rate > EV_RATE_LIMIT_MAX ||157write_rate > EV_RATE_LIMIT_MAX ||158read_burst > EV_RATE_LIMIT_MAX ||159write_burst > EV_RATE_LIMIT_MAX)160return NULL;161r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));162if (!r)163return NULL;164r->read_rate = read_rate;165r->write_rate = write_rate;166r->read_maximum = read_burst;167r->write_maximum = write_burst;168memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));169r->msec_per_tick = (tick_len->tv_sec * 1000) +170(tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;171return r;172}173174void175ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)176{177mm_free(cfg);178}179180/* Default values for max_single_read & max_single_write variables. */181#define MAX_SINGLE_READ_DEFAULT 16384182#define MAX_SINGLE_WRITE_DEFAULT 16384183184#define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)185#define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)186187static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);188static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);189static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);190static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);191192/** Helper: figure out the maximum amount we should write if is_write, or193the maximum amount we should read if is_read. Return that maximum, or1940 if our bucket is wholly exhausted.195*/196static inline ev_ssize_t197bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)198{199/* needs lock on bev. */200ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;201202#define LIM(x) \203(is_write ? (x).write_limit : (x).read_limit)204205#define GROUP_SUSPENDED(g) \206(is_write ? (g)->write_suspended : (g)->read_suspended)207208/* Sets max_so_far to MIN(x, max_so_far) */209#define CLAMPTO(x) \210do { \211if (max_so_far > (x)) \212max_so_far = (x); \213} while (0);214215if (!bev->rate_limiting)216return max_so_far;217218/* If rate-limiting is enabled at all, update the appropriate219bucket, and take the smaller of our rate limit and the group220rate limit.221*/222223if (bev->rate_limiting->cfg) {224bufferevent_update_buckets(bev);225max_so_far = LIM(bev->rate_limiting->limit);226}227if (bev->rate_limiting->group) {228struct bufferevent_rate_limit_group *g =229bev->rate_limiting->group;230ev_ssize_t share;231LOCK_GROUP(g);232if (GROUP_SUSPENDED(g)) {233/* We can get here if we failed to lock this234* particular bufferevent while suspending the whole235* group. */236if (is_write)237bufferevent_suspend_write_(&bev->bev,238BEV_SUSPEND_BW_GROUP);239else240bufferevent_suspend_read_(&bev->bev,241BEV_SUSPEND_BW_GROUP);242share = 0;243} else {244/* XXXX probably we should divide among the active245* members, not the total members. */246share = LIM(g->rate_limit) / g->n_members;247if (share < g->min_share)248share = g->min_share;249}250UNLOCK_GROUP(g);251CLAMPTO(share);252}253254if (max_so_far < 0)255max_so_far = 0;256return max_so_far;257}258259ev_ssize_t260bufferevent_get_read_max_(struct bufferevent_private *bev)261{262return bufferevent_get_rlim_max_(bev, 0);263}264265ev_ssize_t266bufferevent_get_write_max_(struct bufferevent_private *bev)267{268return bufferevent_get_rlim_max_(bev, 1);269}270271int272bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)273{274/* XXXXX Make sure all users of this function check its return value */275int r = 0;276/* need to hold lock on bev */277if (!bev->rate_limiting)278return 0;279280if (bev->rate_limiting->cfg) {281bev->rate_limiting->limit.read_limit -= bytes;282if (bev->rate_limiting->limit.read_limit <= 0) {283bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);284if (event_add(&bev->rate_limiting->refill_bucket_event,285&bev->rate_limiting->cfg->tick_timeout) < 0)286r = -1;287} else if (bev->read_suspended & BEV_SUSPEND_BW) {288if (!(bev->write_suspended & BEV_SUSPEND_BW))289event_del(&bev->rate_limiting->refill_bucket_event);290bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);291}292}293294if (bev->rate_limiting->group) {295LOCK_GROUP(bev->rate_limiting->group);296bev->rate_limiting->group->rate_limit.read_limit -= bytes;297bev->rate_limiting->group->total_read += bytes;298if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {299bev_group_suspend_reading_(bev->rate_limiting->group);300} else if (bev->rate_limiting->group->read_suspended) {301bev_group_unsuspend_reading_(bev->rate_limiting->group);302}303UNLOCK_GROUP(bev->rate_limiting->group);304}305306return r;307}308309int310bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)311{312/* XXXXX Make sure all users of this function check its return value */313int r = 0;314/* need to hold lock */315if (!bev->rate_limiting)316return 0;317318if (bev->rate_limiting->cfg) {319bev->rate_limiting->limit.write_limit -= bytes;320if (bev->rate_limiting->limit.write_limit <= 0) {321bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);322if (event_add(&bev->rate_limiting->refill_bucket_event,323&bev->rate_limiting->cfg->tick_timeout) < 0)324r = -1;325} else if (bev->write_suspended & BEV_SUSPEND_BW) {326if (!(bev->read_suspended & BEV_SUSPEND_BW))327event_del(&bev->rate_limiting->refill_bucket_event);328bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);329}330}331332if (bev->rate_limiting->group) {333LOCK_GROUP(bev->rate_limiting->group);334bev->rate_limiting->group->rate_limit.write_limit -= bytes;335bev->rate_limiting->group->total_written += bytes;336if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {337bev_group_suspend_writing_(bev->rate_limiting->group);338} else if (bev->rate_limiting->group->write_suspended) {339bev_group_unsuspend_writing_(bev->rate_limiting->group);340}341UNLOCK_GROUP(bev->rate_limiting->group);342}343344return r;345}346347/** Stop reading on every bufferevent in <b>g</b> */348static int349bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)350{351/* Needs group lock */352struct bufferevent_private *bev;353g->read_suspended = 1;354g->pending_unsuspend_read = 0;355356/* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,357to prevent a deadlock. (Ordinarily, the group lock nests inside358the bufferevent locks. If we are unable to lock any individual359bufferevent, it will find out later when it looks at its limit360and sees that its group is suspended.)361*/362LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {363if (EVLOCK_TRY_LOCK_(bev->lock)) {364bufferevent_suspend_read_(&bev->bev,365BEV_SUSPEND_BW_GROUP);366EVLOCK_UNLOCK(bev->lock, 0);367}368}369return 0;370}371372/** Stop writing on every bufferevent in <b>g</b> */373static int374bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)375{376/* Needs group lock */377struct bufferevent_private *bev;378g->write_suspended = 1;379g->pending_unsuspend_write = 0;380LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {381if (EVLOCK_TRY_LOCK_(bev->lock)) {382bufferevent_suspend_write_(&bev->bev,383BEV_SUSPEND_BW_GROUP);384EVLOCK_UNLOCK(bev->lock, 0);385}386}387return 0;388}389390/** Timer callback invoked on a single bufferevent with one or more exhausted391buckets when they are ready to refill. */392static void393bev_refill_callback_(evutil_socket_t fd, short what, void *arg)394{395unsigned tick;396struct timeval now;397struct bufferevent_private *bev = arg;398int again = 0;399BEV_LOCK(&bev->bev);400if (!bev->rate_limiting || !bev->rate_limiting->cfg) {401BEV_UNLOCK(&bev->bev);402return;403}404405/* First, update the bucket */406event_base_gettimeofday_cached(bev->bev.ev_base, &now);407tick = ev_token_bucket_get_tick_(&now,408bev->rate_limiting->cfg);409ev_token_bucket_update_(&bev->rate_limiting->limit,410bev->rate_limiting->cfg,411tick);412413/* Now unsuspend any read/write operations as appropriate. */414if ((bev->read_suspended & BEV_SUSPEND_BW)) {415if (bev->rate_limiting->limit.read_limit > 0)416bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);417else418again = 1;419}420if ((bev->write_suspended & BEV_SUSPEND_BW)) {421if (bev->rate_limiting->limit.write_limit > 0)422bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);423else424again = 1;425}426if (again) {427/* One or more of the buckets may need another refill if they428started negative.429430XXXX if we need to be quiet for more ticks, we should431maybe figure out what timeout we really want.432*/433/* XXXX Handle event_add failure somehow */434event_add(&bev->rate_limiting->refill_bucket_event,435&bev->rate_limiting->cfg->tick_timeout);436}437BEV_UNLOCK(&bev->bev);438}439440/** Helper: grab a random element from a bufferevent group.441*442* Requires that we hold the lock on the group.443*/444static struct bufferevent_private *445bev_group_random_element_(struct bufferevent_rate_limit_group *group)446{447int which;448struct bufferevent_private *bev;449450/* requires group lock */451452if (!group->n_members)453return NULL;454455EVUTIL_ASSERT(! LIST_EMPTY(&group->members));456457which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);458459bev = LIST_FIRST(&group->members);460while (which--)461bev = LIST_NEXT(bev, rate_limiting->next_in_group);462463return bev;464}465466/** Iterate over the elements of a rate-limiting group 'g' with a random467starting point, assigning each to the variable 'bev', and executing the468block 'block'.469470We do this in a half-baked effort to get fairness among group members.471XXX Round-robin or some kind of priority queue would be even more fair.472*/473#define FOREACH_RANDOM_ORDER(block) \474do { \475first = bev_group_random_element_(g); \476for (bev = first; bev != LIST_END(&g->members); \477bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \478block ; \479} \480for (bev = LIST_FIRST(&g->members); bev && bev != first; \481bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \482block ; \483} \484} while (0)485486static void487bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)488{489int again = 0;490struct bufferevent_private *bev, *first;491492g->read_suspended = 0;493FOREACH_RANDOM_ORDER({494if (EVLOCK_TRY_LOCK_(bev->lock)) {495bufferevent_unsuspend_read_(&bev->bev,496BEV_SUSPEND_BW_GROUP);497EVLOCK_UNLOCK(bev->lock, 0);498} else {499again = 1;500}501});502g->pending_unsuspend_read = again;503}504505static void506bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)507{508int again = 0;509struct bufferevent_private *bev, *first;510g->write_suspended = 0;511512FOREACH_RANDOM_ORDER({513if (EVLOCK_TRY_LOCK_(bev->lock)) {514bufferevent_unsuspend_write_(&bev->bev,515BEV_SUSPEND_BW_GROUP);516EVLOCK_UNLOCK(bev->lock, 0);517} else {518again = 1;519}520});521g->pending_unsuspend_write = again;522}523524/** Callback invoked every tick to add more elements to the group bucket525and unsuspend group members as needed.526*/527static void528bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)529{530struct bufferevent_rate_limit_group *g = arg;531unsigned tick;532struct timeval now;533534event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);535536LOCK_GROUP(g);537538tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);539ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);540541if (g->pending_unsuspend_read ||542(g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {543bev_group_unsuspend_reading_(g);544}545if (g->pending_unsuspend_write ||546(g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){547bev_group_unsuspend_writing_(g);548}549550/* XXXX Rather than waiting to the next tick to unsuspend stuff551* with pending_unsuspend_write/read, we should do it on the552* next iteration of the mainloop.553*/554555UNLOCK_GROUP(g);556}557558int559bufferevent_set_rate_limit(struct bufferevent *bev,560struct ev_token_bucket_cfg *cfg)561{562struct bufferevent_private *bevp = BEV_UPCAST(bev);563int r = -1;564struct bufferevent_rate_limit *rlim;565struct timeval now;566ev_uint32_t tick;567int reinit = 0, suspended = 0;568/* XXX reference-count cfg */569570BEV_LOCK(bev);571572if (cfg == NULL) {573if (bevp->rate_limiting) {574rlim = bevp->rate_limiting;575rlim->cfg = NULL;576bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);577bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);578if (event_initialized(&rlim->refill_bucket_event))579event_del(&rlim->refill_bucket_event);580}581r = 0;582goto done;583}584585event_base_gettimeofday_cached(bev->ev_base, &now);586tick = ev_token_bucket_get_tick_(&now, cfg);587588if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {589/* no-op */590r = 0;591goto done;592}593if (bevp->rate_limiting == NULL) {594rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));595if (!rlim)596goto done;597bevp->rate_limiting = rlim;598} else {599rlim = bevp->rate_limiting;600}601reinit = rlim->cfg != NULL;602603rlim->cfg = cfg;604ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);605606if (reinit) {607EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));608event_del(&rlim->refill_bucket_event);609}610event_assign(&rlim->refill_bucket_event, bev->ev_base,611-1, EV_FINALIZE, bev_refill_callback_, bevp);612613if (rlim->limit.read_limit > 0) {614bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);615} else {616bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);617suspended=1;618}619if (rlim->limit.write_limit > 0) {620bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);621} else {622bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);623suspended = 1;624}625626if (suspended)627event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);628629r = 0;630631done:632BEV_UNLOCK(bev);633return r;634}635636struct bufferevent_rate_limit_group *637bufferevent_rate_limit_group_new(struct event_base *base,638const struct ev_token_bucket_cfg *cfg)639{640struct bufferevent_rate_limit_group *g;641struct timeval now;642ev_uint32_t tick;643644event_base_gettimeofday_cached(base, &now);645tick = ev_token_bucket_get_tick_(&now, cfg);646647g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));648if (!g)649return NULL;650memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));651LIST_INIT(&g->members);652653ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);654655event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,656bev_group_refill_callback_, g);657/*XXXX handle event_add failure */658event_add(&g->master_refill_event, &cfg->tick_timeout);659660EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);661662bufferevent_rate_limit_group_set_min_share(g, 64);663664evutil_weakrand_seed_(&g->weakrand_seed,665(ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));666667return g;668}669670int671bufferevent_rate_limit_group_set_cfg(672struct bufferevent_rate_limit_group *g,673const struct ev_token_bucket_cfg *cfg)674{675int same_tick;676if (!g || !cfg)677return -1;678679LOCK_GROUP(g);680same_tick = evutil_timercmp(681&g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);682memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));683684if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)685g->rate_limit.read_limit = cfg->read_maximum;686if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)687g->rate_limit.write_limit = cfg->write_maximum;688689if (!same_tick) {690/* This can cause a hiccup in the schedule */691event_add(&g->master_refill_event, &cfg->tick_timeout);692}693694/* The new limits might force us to adjust min_share differently. */695bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);696697UNLOCK_GROUP(g);698return 0;699}700701int702bufferevent_rate_limit_group_set_min_share(703struct bufferevent_rate_limit_group *g,704size_t share)705{706if (share > EV_SSIZE_MAX)707return -1;708709g->configured_min_share = share;710711/* Can't set share to less than the one-tick maximum. IOW, at steady712* state, at least one connection can go per tick. */713if (share > g->rate_limit_cfg.read_rate)714share = g->rate_limit_cfg.read_rate;715if (share > g->rate_limit_cfg.write_rate)716share = g->rate_limit_cfg.write_rate;717718g->min_share = share;719return 0;720}721722void723bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)724{725LOCK_GROUP(g);726EVUTIL_ASSERT(0 == g->n_members);727event_del(&g->master_refill_event);728UNLOCK_GROUP(g);729EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);730mm_free(g);731}732733int734bufferevent_add_to_rate_limit_group(struct bufferevent *bev,735struct bufferevent_rate_limit_group *g)736{737int wsuspend, rsuspend;738struct bufferevent_private *bevp = BEV_UPCAST(bev);739BEV_LOCK(bev);740741if (!bevp->rate_limiting) {742struct bufferevent_rate_limit *rlim;743rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));744if (!rlim) {745BEV_UNLOCK(bev);746return -1;747}748event_assign(&rlim->refill_bucket_event, bev->ev_base,749-1, EV_FINALIZE, bev_refill_callback_, bevp);750bevp->rate_limiting = rlim;751}752753if (bevp->rate_limiting->group == g) {754BEV_UNLOCK(bev);755return 0;756}757if (bevp->rate_limiting->group)758bufferevent_remove_from_rate_limit_group(bev);759760LOCK_GROUP(g);761bevp->rate_limiting->group = g;762++g->n_members;763LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);764765rsuspend = g->read_suspended;766wsuspend = g->write_suspended;767768UNLOCK_GROUP(g);769770if (rsuspend)771bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);772if (wsuspend)773bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);774775BEV_UNLOCK(bev);776return 0;777}778779int780bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)781{782return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);783}784785int786bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,787int unsuspend)788{789struct bufferevent_private *bevp = BEV_UPCAST(bev);790BEV_LOCK(bev);791if (bevp->rate_limiting && bevp->rate_limiting->group) {792struct bufferevent_rate_limit_group *g =793bevp->rate_limiting->group;794LOCK_GROUP(g);795bevp->rate_limiting->group = NULL;796--g->n_members;797LIST_REMOVE(bevp, rate_limiting->next_in_group);798UNLOCK_GROUP(g);799}800if (unsuspend) {801bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);802bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);803}804BEV_UNLOCK(bev);805return 0;806}807808/* ===809* API functions to expose rate limits.810*811* Don't use these from inside Libevent; they're meant to be for use by812* the program.813* === */814815/* Mostly you don't want to use this function from inside libevent;816* bufferevent_get_read_max_() is more likely what you want*/817ev_ssize_t818bufferevent_get_read_limit(struct bufferevent *bev)819{820ev_ssize_t r;821struct bufferevent_private *bevp;822BEV_LOCK(bev);823bevp = BEV_UPCAST(bev);824if (bevp->rate_limiting && bevp->rate_limiting->cfg) {825bufferevent_update_buckets(bevp);826r = bevp->rate_limiting->limit.read_limit;827} else {828r = EV_SSIZE_MAX;829}830BEV_UNLOCK(bev);831return r;832}833834/* Mostly you don't want to use this function from inside libevent;835* bufferevent_get_write_max_() is more likely what you want*/836ev_ssize_t837bufferevent_get_write_limit(struct bufferevent *bev)838{839ev_ssize_t r;840struct bufferevent_private *bevp;841BEV_LOCK(bev);842bevp = BEV_UPCAST(bev);843if (bevp->rate_limiting && bevp->rate_limiting->cfg) {844bufferevent_update_buckets(bevp);845r = bevp->rate_limiting->limit.write_limit;846} else {847r = EV_SSIZE_MAX;848}849BEV_UNLOCK(bev);850return r;851}852853int854bufferevent_set_max_single_read(struct bufferevent *bev, size_t size)855{856struct bufferevent_private *bevp;857BEV_LOCK(bev);858bevp = BEV_UPCAST(bev);859if (size == 0 || size > EV_SSIZE_MAX)860bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;861else862bevp->max_single_read = size;863BEV_UNLOCK(bev);864return 0;865}866867int868bufferevent_set_max_single_write(struct bufferevent *bev, size_t size)869{870struct bufferevent_private *bevp;871BEV_LOCK(bev);872bevp = BEV_UPCAST(bev);873if (size == 0 || size > EV_SSIZE_MAX)874bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;875else876bevp->max_single_write = size;877BEV_UNLOCK(bev);878return 0;879}880881ev_ssize_t882bufferevent_get_max_single_read(struct bufferevent *bev)883{884ev_ssize_t r;885886BEV_LOCK(bev);887r = BEV_UPCAST(bev)->max_single_read;888BEV_UNLOCK(bev);889return r;890}891892ev_ssize_t893bufferevent_get_max_single_write(struct bufferevent *bev)894{895ev_ssize_t r;896897BEV_LOCK(bev);898r = BEV_UPCAST(bev)->max_single_write;899BEV_UNLOCK(bev);900return r;901}902903ev_ssize_t904bufferevent_get_max_to_read(struct bufferevent *bev)905{906ev_ssize_t r;907BEV_LOCK(bev);908r = bufferevent_get_read_max_(BEV_UPCAST(bev));909BEV_UNLOCK(bev);910return r;911}912913ev_ssize_t914bufferevent_get_max_to_write(struct bufferevent *bev)915{916ev_ssize_t r;917BEV_LOCK(bev);918r = bufferevent_get_write_max_(BEV_UPCAST(bev));919BEV_UNLOCK(bev);920return r;921}922923const struct ev_token_bucket_cfg *924bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) {925struct bufferevent_private *bufev_private = BEV_UPCAST(bev);926struct ev_token_bucket_cfg *cfg;927928BEV_LOCK(bev);929930if (bufev_private->rate_limiting) {931cfg = bufev_private->rate_limiting->cfg;932} else {933cfg = NULL;934}935936BEV_UNLOCK(bev);937938return cfg;939}940941/* Mostly you don't want to use this function from inside libevent;942* bufferevent_get_read_max_() is more likely what you want*/943ev_ssize_t944bufferevent_rate_limit_group_get_read_limit(945struct bufferevent_rate_limit_group *grp)946{947ev_ssize_t r;948LOCK_GROUP(grp);949r = grp->rate_limit.read_limit;950UNLOCK_GROUP(grp);951return r;952}953954/* Mostly you don't want to use this function from inside libevent;955* bufferevent_get_write_max_() is more likely what you want. */956ev_ssize_t957bufferevent_rate_limit_group_get_write_limit(958struct bufferevent_rate_limit_group *grp)959{960ev_ssize_t r;961LOCK_GROUP(grp);962r = grp->rate_limit.write_limit;963UNLOCK_GROUP(grp);964return r;965}966967int968bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)969{970int r = 0;971ev_ssize_t old_limit, new_limit;972struct bufferevent_private *bevp;973BEV_LOCK(bev);974bevp = BEV_UPCAST(bev);975EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);976old_limit = bevp->rate_limiting->limit.read_limit;977978new_limit = (bevp->rate_limiting->limit.read_limit -= decr);979if (old_limit > 0 && new_limit <= 0) {980bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);981if (event_add(&bevp->rate_limiting->refill_bucket_event,982&bevp->rate_limiting->cfg->tick_timeout) < 0)983r = -1;984} else if (old_limit <= 0 && new_limit > 0) {985if (!(bevp->write_suspended & BEV_SUSPEND_BW))986event_del(&bevp->rate_limiting->refill_bucket_event);987bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);988}989990BEV_UNLOCK(bev);991return r;992}993994int995bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)996{997/* XXXX this is mostly copy-and-paste from998* bufferevent_decrement_read_limit */999int r = 0;1000ev_ssize_t old_limit, new_limit;1001struct bufferevent_private *bevp;1002BEV_LOCK(bev);1003bevp = BEV_UPCAST(bev);1004EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);1005old_limit = bevp->rate_limiting->limit.write_limit;10061007new_limit = (bevp->rate_limiting->limit.write_limit -= decr);1008if (old_limit > 0 && new_limit <= 0) {1009bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);1010if (event_add(&bevp->rate_limiting->refill_bucket_event,1011&bevp->rate_limiting->cfg->tick_timeout) < 0)1012r = -1;1013} else if (old_limit <= 0 && new_limit > 0) {1014if (!(bevp->read_suspended & BEV_SUSPEND_BW))1015event_del(&bevp->rate_limiting->refill_bucket_event);1016bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);1017}10181019BEV_UNLOCK(bev);1020return r;1021}10221023int1024bufferevent_rate_limit_group_decrement_read(1025struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)1026{1027int r = 0;1028ev_ssize_t old_limit, new_limit;1029LOCK_GROUP(grp);1030old_limit = grp->rate_limit.read_limit;1031new_limit = (grp->rate_limit.read_limit -= decr);10321033if (old_limit > 0 && new_limit <= 0) {1034bev_group_suspend_reading_(grp);1035} else if (old_limit <= 0 && new_limit > 0) {1036bev_group_unsuspend_reading_(grp);1037}10381039UNLOCK_GROUP(grp);1040return r;1041}10421043int1044bufferevent_rate_limit_group_decrement_write(1045struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)1046{1047int r = 0;1048ev_ssize_t old_limit, new_limit;1049LOCK_GROUP(grp);1050old_limit = grp->rate_limit.write_limit;1051new_limit = (grp->rate_limit.write_limit -= decr);10521053if (old_limit > 0 && new_limit <= 0) {1054bev_group_suspend_writing_(grp);1055} else if (old_limit <= 0 && new_limit > 0) {1056bev_group_unsuspend_writing_(grp);1057}10581059UNLOCK_GROUP(grp);1060return r;1061}10621063void1064bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,1065ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)1066{1067EVUTIL_ASSERT(grp != NULL);1068if (total_read_out)1069*total_read_out = grp->total_read;1070if (total_written_out)1071*total_written_out = grp->total_written;1072}10731074void1075bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)1076{1077grp->total_read = grp->total_written = 0;1078}10791080int1081bufferevent_ratelim_init_(struct bufferevent_private *bev)1082{1083bev->rate_limiting = NULL;1084bev->max_single_read = MAX_SINGLE_READ_DEFAULT;1085bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;10861087return 0;1088}108910901091