Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/freebsd-src
Path: blob/main/contrib/libevent/bufferevent_ratelim.c
39475 views
1
/*
2
* Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
3
* Copyright (c) 2002-2006 Niels Provos <[email protected]>
4
* All rights reserved.
5
*
6
* Redistribution and use in source and binary forms, with or without
7
* modification, are permitted provided that the following conditions
8
* are met:
9
* 1. Redistributions of source code must retain the above copyright
10
* notice, this list of conditions and the following disclaimer.
11
* 2. Redistributions in binary form must reproduce the above copyright
12
* notice, this list of conditions and the following disclaimer in the
13
* documentation and/or other materials provided with the distribution.
14
* 3. The name of the author may not be used to endorse or promote products
15
* derived from this software without specific prior written permission.
16
*
17
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20
* IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27
*/
28
#include "evconfig-private.h"
29
30
#include <sys/types.h>
31
#include <limits.h>
32
#include <string.h>
33
#include <stdlib.h>
34
35
#include "event2/event.h"
36
#include "event2/event_struct.h"
37
#include "event2/util.h"
38
#include "event2/bufferevent.h"
39
#include "event2/bufferevent_struct.h"
40
#include "event2/buffer.h"
41
42
#include "ratelim-internal.h"
43
44
#include "bufferevent-internal.h"
45
#include "mm-internal.h"
46
#include "util-internal.h"
47
#include "event-internal.h"
48
49
int
50
ev_token_bucket_init_(struct ev_token_bucket *bucket,
51
const struct ev_token_bucket_cfg *cfg,
52
ev_uint32_t current_tick,
53
int reinitialize)
54
{
55
if (reinitialize) {
56
/* on reinitialization, we only clip downwards, since we've
57
already used who-knows-how-much bandwidth this tick. We
58
leave "last_updated" as it is; the next update will add the
59
appropriate amount of bandwidth to the bucket.
60
*/
61
if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
62
bucket->read_limit = cfg->read_maximum;
63
if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
64
bucket->write_limit = cfg->write_maximum;
65
} else {
66
bucket->read_limit = cfg->read_rate;
67
bucket->write_limit = cfg->write_rate;
68
bucket->last_updated = current_tick;
69
}
70
return 0;
71
}
72
73
int
74
ev_token_bucket_update_(struct ev_token_bucket *bucket,
75
const struct ev_token_bucket_cfg *cfg,
76
ev_uint32_t current_tick)
77
{
78
/* It's okay if the tick number overflows, since we'll just
79
* wrap around when we do the unsigned substraction. */
80
unsigned n_ticks = current_tick - bucket->last_updated;
81
82
/* Make sure some ticks actually happened, and that time didn't
83
* roll back. */
84
if (n_ticks == 0 || n_ticks > INT_MAX)
85
return 0;
86
87
/* Naively, we would say
88
bucket->limit += n_ticks * cfg->rate;
89
90
if (bucket->limit > cfg->maximum)
91
bucket->limit = cfg->maximum;
92
93
But we're worried about overflow, so we do it like this:
94
*/
95
96
if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
97
bucket->read_limit = cfg->read_maximum;
98
else
99
bucket->read_limit += n_ticks * cfg->read_rate;
100
101
102
if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
103
bucket->write_limit = cfg->write_maximum;
104
else
105
bucket->write_limit += n_ticks * cfg->write_rate;
106
107
108
bucket->last_updated = current_tick;
109
110
return 1;
111
}
112
113
static inline void
114
bufferevent_update_buckets(struct bufferevent_private *bev)
115
{
116
/* Must hold lock on bev. */
117
struct timeval now;
118
unsigned tick;
119
event_base_gettimeofday_cached(bev->bev.ev_base, &now);
120
tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);
121
if (tick != bev->rate_limiting->limit.last_updated)
122
ev_token_bucket_update_(&bev->rate_limiting->limit,
123
bev->rate_limiting->cfg, tick);
124
}
125
126
ev_uint32_t
127
ev_token_bucket_get_tick_(const struct timeval *tv,
128
const struct ev_token_bucket_cfg *cfg)
129
{
130
/* This computation uses two multiplies and a divide. We could do
131
* fewer if we knew that the tick length was an integer number of
132
* seconds, or if we knew it divided evenly into a second. We should
133
* investigate that more.
134
*/
135
136
/* We cast to an ev_uint64_t first, since we don't want to overflow
137
* before we do the final divide. */
138
ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
139
return (unsigned)(msec / cfg->msec_per_tick);
140
}
141
142
struct ev_token_bucket_cfg *
143
ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
144
size_t write_rate, size_t write_burst,
145
const struct timeval *tick_len)
146
{
147
struct ev_token_bucket_cfg *r;
148
struct timeval g;
149
if (! tick_len) {
150
g.tv_sec = 1;
151
g.tv_usec = 0;
152
tick_len = &g;
153
}
154
if (read_rate > read_burst || write_rate > write_burst ||
155
read_rate < 1 || write_rate < 1)
156
return NULL;
157
if (read_rate > EV_RATE_LIMIT_MAX ||
158
write_rate > EV_RATE_LIMIT_MAX ||
159
read_burst > EV_RATE_LIMIT_MAX ||
160
write_burst > EV_RATE_LIMIT_MAX)
161
return NULL;
162
r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
163
if (!r)
164
return NULL;
165
r->read_rate = read_rate;
166
r->write_rate = write_rate;
167
r->read_maximum = read_burst;
168
r->write_maximum = write_burst;
169
memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
170
r->msec_per_tick = (tick_len->tv_sec * 1000) +
171
(tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
172
return r;
173
}
174
175
void
176
ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
177
{
178
mm_free(cfg);
179
}
180
181
/* Default values for max_single_read & max_single_write variables. */
182
#define MAX_SINGLE_READ_DEFAULT 16384
183
#define MAX_SINGLE_WRITE_DEFAULT 16384
184
185
#define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
186
#define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
187
188
static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);
189
static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);
190
static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);
191
static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);
192
193
/** Helper: figure out the maximum amount we should write if is_write, or
194
the maximum amount we should read if is_read. Return that maximum, or
195
0 if our bucket is wholly exhausted.
196
*/
197
static inline ev_ssize_t
198
bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)
199
{
200
/* needs lock on bev. */
201
ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
202
203
#define LIM(x) \
204
(is_write ? (x).write_limit : (x).read_limit)
205
206
#define GROUP_SUSPENDED(g) \
207
(is_write ? (g)->write_suspended : (g)->read_suspended)
208
209
/* Sets max_so_far to MIN(x, max_so_far) */
210
#define CLAMPTO(x) \
211
do { \
212
if (max_so_far > (x)) \
213
max_so_far = (x); \
214
} while (0);
215
216
if (!bev->rate_limiting)
217
return max_so_far;
218
219
/* If rate-limiting is enabled at all, update the appropriate
220
bucket, and take the smaller of our rate limit and the group
221
rate limit.
222
*/
223
224
if (bev->rate_limiting->cfg) {
225
bufferevent_update_buckets(bev);
226
max_so_far = LIM(bev->rate_limiting->limit);
227
}
228
if (bev->rate_limiting->group) {
229
struct bufferevent_rate_limit_group *g =
230
bev->rate_limiting->group;
231
ev_ssize_t share;
232
LOCK_GROUP(g);
233
if (GROUP_SUSPENDED(g)) {
234
/* We can get here if we failed to lock this
235
* particular bufferevent while suspending the whole
236
* group. */
237
if (is_write)
238
bufferevent_suspend_write_(&bev->bev,
239
BEV_SUSPEND_BW_GROUP);
240
else
241
bufferevent_suspend_read_(&bev->bev,
242
BEV_SUSPEND_BW_GROUP);
243
share = 0;
244
} else {
245
/* XXXX probably we should divide among the active
246
* members, not the total members. */
247
share = LIM(g->rate_limit) / g->n_members;
248
if (share < g->min_share)
249
share = g->min_share;
250
}
251
UNLOCK_GROUP(g);
252
CLAMPTO(share);
253
}
254
255
if (max_so_far < 0)
256
max_so_far = 0;
257
return max_so_far;
258
}
259
260
ev_ssize_t
261
bufferevent_get_read_max_(struct bufferevent_private *bev)
262
{
263
return bufferevent_get_rlim_max_(bev, 0);
264
}
265
266
ev_ssize_t
267
bufferevent_get_write_max_(struct bufferevent_private *bev)
268
{
269
return bufferevent_get_rlim_max_(bev, 1);
270
}
271
272
int
273
bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
274
{
275
/* XXXXX Make sure all users of this function check its return value */
276
int r = 0;
277
/* need to hold lock on bev */
278
if (!bev->rate_limiting)
279
return 0;
280
281
if (bev->rate_limiting->cfg) {
282
bev->rate_limiting->limit.read_limit -= bytes;
283
if (bev->rate_limiting->limit.read_limit <= 0) {
284
bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);
285
if (event_add(&bev->rate_limiting->refill_bucket_event,
286
&bev->rate_limiting->cfg->tick_timeout) < 0)
287
r = -1;
288
} else if (bev->read_suspended & BEV_SUSPEND_BW) {
289
if (!(bev->write_suspended & BEV_SUSPEND_BW))
290
event_del(&bev->rate_limiting->refill_bucket_event);
291
bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
292
}
293
}
294
295
if (bev->rate_limiting->group) {
296
LOCK_GROUP(bev->rate_limiting->group);
297
bev->rate_limiting->group->rate_limit.read_limit -= bytes;
298
bev->rate_limiting->group->total_read += bytes;
299
if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
300
bev_group_suspend_reading_(bev->rate_limiting->group);
301
} else if (bev->rate_limiting->group->read_suspended) {
302
bev_group_unsuspend_reading_(bev->rate_limiting->group);
303
}
304
UNLOCK_GROUP(bev->rate_limiting->group);
305
}
306
307
return r;
308
}
309
310
int
311
bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
312
{
313
/* XXXXX Make sure all users of this function check its return value */
314
int r = 0;
315
/* need to hold lock */
316
if (!bev->rate_limiting)
317
return 0;
318
319
if (bev->rate_limiting->cfg) {
320
bev->rate_limiting->limit.write_limit -= bytes;
321
if (bev->rate_limiting->limit.write_limit <= 0) {
322
bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);
323
if (event_add(&bev->rate_limiting->refill_bucket_event,
324
&bev->rate_limiting->cfg->tick_timeout) < 0)
325
r = -1;
326
} else if (bev->write_suspended & BEV_SUSPEND_BW) {
327
if (!(bev->read_suspended & BEV_SUSPEND_BW))
328
event_del(&bev->rate_limiting->refill_bucket_event);
329
bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
330
}
331
}
332
333
if (bev->rate_limiting->group) {
334
LOCK_GROUP(bev->rate_limiting->group);
335
bev->rate_limiting->group->rate_limit.write_limit -= bytes;
336
bev->rate_limiting->group->total_written += bytes;
337
if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
338
bev_group_suspend_writing_(bev->rate_limiting->group);
339
} else if (bev->rate_limiting->group->write_suspended) {
340
bev_group_unsuspend_writing_(bev->rate_limiting->group);
341
}
342
UNLOCK_GROUP(bev->rate_limiting->group);
343
}
344
345
return r;
346
}
347
348
/** Stop reading on every bufferevent in <b>g</b> */
349
static int
350
bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)
351
{
352
/* Needs group lock */
353
struct bufferevent_private *bev;
354
g->read_suspended = 1;
355
g->pending_unsuspend_read = 0;
356
357
/* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,
358
to prevent a deadlock. (Ordinarily, the group lock nests inside
359
the bufferevent locks. If we are unable to lock any individual
360
bufferevent, it will find out later when it looks at its limit
361
and sees that its group is suspended.)
362
*/
363
LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
364
if (EVLOCK_TRY_LOCK_(bev->lock)) {
365
bufferevent_suspend_read_(&bev->bev,
366
BEV_SUSPEND_BW_GROUP);
367
EVLOCK_UNLOCK(bev->lock, 0);
368
}
369
}
370
return 0;
371
}
372
373
/** Stop writing on every bufferevent in <b>g</b> */
374
static int
375
bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)
376
{
377
/* Needs group lock */
378
struct bufferevent_private *bev;
379
g->write_suspended = 1;
380
g->pending_unsuspend_write = 0;
381
LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
382
if (EVLOCK_TRY_LOCK_(bev->lock)) {
383
bufferevent_suspend_write_(&bev->bev,
384
BEV_SUSPEND_BW_GROUP);
385
EVLOCK_UNLOCK(bev->lock, 0);
386
}
387
}
388
return 0;
389
}
390
391
/** Timer callback invoked on a single bufferevent with one or more exhausted
392
buckets when they are ready to refill. */
393
static void
394
bev_refill_callback_(evutil_socket_t fd, short what, void *arg)
395
{
396
unsigned tick;
397
struct timeval now;
398
struct bufferevent_private *bev = arg;
399
int again = 0;
400
BEV_LOCK(&bev->bev);
401
if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
402
BEV_UNLOCK(&bev->bev);
403
return;
404
}
405
406
/* First, update the bucket */
407
event_base_gettimeofday_cached(bev->bev.ev_base, &now);
408
tick = ev_token_bucket_get_tick_(&now,
409
bev->rate_limiting->cfg);
410
ev_token_bucket_update_(&bev->rate_limiting->limit,
411
bev->rate_limiting->cfg,
412
tick);
413
414
/* Now unsuspend any read/write operations as appropriate. */
415
if ((bev->read_suspended & BEV_SUSPEND_BW)) {
416
if (bev->rate_limiting->limit.read_limit > 0)
417
bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
418
else
419
again = 1;
420
}
421
if ((bev->write_suspended & BEV_SUSPEND_BW)) {
422
if (bev->rate_limiting->limit.write_limit > 0)
423
bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
424
else
425
again = 1;
426
}
427
if (again) {
428
/* One or more of the buckets may need another refill if they
429
started negative.
430
431
XXXX if we need to be quiet for more ticks, we should
432
maybe figure out what timeout we really want.
433
*/
434
/* XXXX Handle event_add failure somehow */
435
event_add(&bev->rate_limiting->refill_bucket_event,
436
&bev->rate_limiting->cfg->tick_timeout);
437
}
438
BEV_UNLOCK(&bev->bev);
439
}
440
441
/** Helper: grab a random element from a bufferevent group.
442
*
443
* Requires that we hold the lock on the group.
444
*/
445
static struct bufferevent_private *
446
bev_group_random_element_(struct bufferevent_rate_limit_group *group)
447
{
448
int which;
449
struct bufferevent_private *bev;
450
451
/* requires group lock */
452
453
if (!group->n_members)
454
return NULL;
455
456
EVUTIL_ASSERT(! LIST_EMPTY(&group->members));
457
458
which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);
459
460
bev = LIST_FIRST(&group->members);
461
while (which--)
462
bev = LIST_NEXT(bev, rate_limiting->next_in_group);
463
464
return bev;
465
}
466
467
/** Iterate over the elements of a rate-limiting group 'g' with a random
468
starting point, assigning each to the variable 'bev', and executing the
469
block 'block'.
470
471
We do this in a half-baked effort to get fairness among group members.
472
XXX Round-robin or some kind of priority queue would be even more fair.
473
*/
474
#define FOREACH_RANDOM_ORDER(block) \
475
do { \
476
first = bev_group_random_element_(g); \
477
for (bev = first; bev != LIST_END(&g->members); \
478
bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
479
block ; \
480
} \
481
for (bev = LIST_FIRST(&g->members); bev && bev != first; \
482
bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
483
block ; \
484
} \
485
} while (0)
486
487
static void
488
bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)
489
{
490
int again = 0;
491
struct bufferevent_private *bev, *first;
492
493
g->read_suspended = 0;
494
FOREACH_RANDOM_ORDER({
495
if (EVLOCK_TRY_LOCK_(bev->lock)) {
496
bufferevent_unsuspend_read_(&bev->bev,
497
BEV_SUSPEND_BW_GROUP);
498
EVLOCK_UNLOCK(bev->lock, 0);
499
} else {
500
again = 1;
501
}
502
});
503
g->pending_unsuspend_read = again;
504
}
505
506
static void
507
bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)
508
{
509
int again = 0;
510
struct bufferevent_private *bev, *first;
511
g->write_suspended = 0;
512
513
FOREACH_RANDOM_ORDER({
514
if (EVLOCK_TRY_LOCK_(bev->lock)) {
515
bufferevent_unsuspend_write_(&bev->bev,
516
BEV_SUSPEND_BW_GROUP);
517
EVLOCK_UNLOCK(bev->lock, 0);
518
} else {
519
again = 1;
520
}
521
});
522
g->pending_unsuspend_write = again;
523
}
524
525
/** Callback invoked every tick to add more elements to the group bucket
526
and unsuspend group members as needed.
527
*/
528
static void
529
bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)
530
{
531
struct bufferevent_rate_limit_group *g = arg;
532
unsigned tick;
533
struct timeval now;
534
535
event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
536
537
LOCK_GROUP(g);
538
539
tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);
540
ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);
541
542
if (g->pending_unsuspend_read ||
543
(g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
544
bev_group_unsuspend_reading_(g);
545
}
546
if (g->pending_unsuspend_write ||
547
(g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
548
bev_group_unsuspend_writing_(g);
549
}
550
551
/* XXXX Rather than waiting to the next tick to unsuspend stuff
552
* with pending_unsuspend_write/read, we should do it on the
553
* next iteration of the mainloop.
554
*/
555
556
UNLOCK_GROUP(g);
557
}
558
559
int
560
bufferevent_set_rate_limit(struct bufferevent *bev,
561
struct ev_token_bucket_cfg *cfg)
562
{
563
struct bufferevent_private *bevp = BEV_UPCAST(bev);
564
int r = -1;
565
struct bufferevent_rate_limit *rlim;
566
struct timeval now;
567
ev_uint32_t tick;
568
int reinit = 0, suspended = 0;
569
/* XXX reference-count cfg */
570
571
BEV_LOCK(bev);
572
573
if (cfg == NULL) {
574
if (bevp->rate_limiting) {
575
rlim = bevp->rate_limiting;
576
rlim->cfg = NULL;
577
bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
578
bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
579
if (event_initialized(&rlim->refill_bucket_event))
580
event_del(&rlim->refill_bucket_event);
581
}
582
r = 0;
583
goto done;
584
}
585
586
event_base_gettimeofday_cached(bev->ev_base, &now);
587
tick = ev_token_bucket_get_tick_(&now, cfg);
588
589
if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
590
/* no-op */
591
r = 0;
592
goto done;
593
}
594
if (bevp->rate_limiting == NULL) {
595
rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
596
if (!rlim)
597
goto done;
598
bevp->rate_limiting = rlim;
599
} else {
600
rlim = bevp->rate_limiting;
601
}
602
reinit = rlim->cfg != NULL;
603
604
rlim->cfg = cfg;
605
ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);
606
607
if (reinit) {
608
EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
609
event_del(&rlim->refill_bucket_event);
610
}
611
event_assign(&rlim->refill_bucket_event, bev->ev_base,
612
-1, EV_FINALIZE, bev_refill_callback_, bevp);
613
614
if (rlim->limit.read_limit > 0) {
615
bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
616
} else {
617
bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
618
suspended=1;
619
}
620
if (rlim->limit.write_limit > 0) {
621
bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
622
} else {
623
bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
624
suspended = 1;
625
}
626
627
if (suspended)
628
event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
629
630
r = 0;
631
632
done:
633
BEV_UNLOCK(bev);
634
return r;
635
}
636
637
struct bufferevent_rate_limit_group *
638
bufferevent_rate_limit_group_new(struct event_base *base,
639
const struct ev_token_bucket_cfg *cfg)
640
{
641
struct bufferevent_rate_limit_group *g;
642
struct timeval now;
643
ev_uint32_t tick;
644
645
event_base_gettimeofday_cached(base, &now);
646
tick = ev_token_bucket_get_tick_(&now, cfg);
647
648
g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
649
if (!g)
650
return NULL;
651
memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
652
LIST_INIT(&g->members);
653
654
ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
655
656
event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
657
bev_group_refill_callback_, g);
658
/*XXXX handle event_add failure */
659
event_add(&g->master_refill_event, &cfg->tick_timeout);
660
661
EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
662
663
bufferevent_rate_limit_group_set_min_share(g, 64);
664
665
evutil_weakrand_seed_(&g->weakrand_seed,
666
(ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));
667
668
return g;
669
}
670
671
int
672
bufferevent_rate_limit_group_set_cfg(
673
struct bufferevent_rate_limit_group *g,
674
const struct ev_token_bucket_cfg *cfg)
675
{
676
int same_tick;
677
if (!g || !cfg)
678
return -1;
679
680
LOCK_GROUP(g);
681
same_tick = evutil_timercmp(
682
&g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
683
memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
684
685
if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
686
g->rate_limit.read_limit = cfg->read_maximum;
687
if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
688
g->rate_limit.write_limit = cfg->write_maximum;
689
690
if (!same_tick) {
691
/* This can cause a hiccup in the schedule */
692
event_add(&g->master_refill_event, &cfg->tick_timeout);
693
}
694
695
/* The new limits might force us to adjust min_share differently. */
696
bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
697
698
UNLOCK_GROUP(g);
699
return 0;
700
}
701
702
int
703
bufferevent_rate_limit_group_set_min_share(
704
struct bufferevent_rate_limit_group *g,
705
size_t share)
706
{
707
if (share > EV_SSIZE_MAX)
708
return -1;
709
710
g->configured_min_share = share;
711
712
/* Can't set share to less than the one-tick maximum. IOW, at steady
713
* state, at least one connection can go per tick. */
714
if (share > g->rate_limit_cfg.read_rate)
715
share = g->rate_limit_cfg.read_rate;
716
if (share > g->rate_limit_cfg.write_rate)
717
share = g->rate_limit_cfg.write_rate;
718
719
g->min_share = share;
720
return 0;
721
}
722
723
void
724
bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
725
{
726
LOCK_GROUP(g);
727
EVUTIL_ASSERT(0 == g->n_members);
728
event_del(&g->master_refill_event);
729
UNLOCK_GROUP(g);
730
EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
731
mm_free(g);
732
}
733
734
int
735
bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
736
struct bufferevent_rate_limit_group *g)
737
{
738
int wsuspend, rsuspend;
739
struct bufferevent_private *bevp = BEV_UPCAST(bev);
740
BEV_LOCK(bev);
741
742
if (!bevp->rate_limiting) {
743
struct bufferevent_rate_limit *rlim;
744
rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
745
if (!rlim) {
746
BEV_UNLOCK(bev);
747
return -1;
748
}
749
event_assign(&rlim->refill_bucket_event, bev->ev_base,
750
-1, EV_FINALIZE, bev_refill_callback_, bevp);
751
bevp->rate_limiting = rlim;
752
}
753
754
if (bevp->rate_limiting->group == g) {
755
BEV_UNLOCK(bev);
756
return 0;
757
}
758
if (bevp->rate_limiting->group)
759
bufferevent_remove_from_rate_limit_group(bev);
760
761
LOCK_GROUP(g);
762
bevp->rate_limiting->group = g;
763
++g->n_members;
764
LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);
765
766
rsuspend = g->read_suspended;
767
wsuspend = g->write_suspended;
768
769
UNLOCK_GROUP(g);
770
771
if (rsuspend)
772
bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);
773
if (wsuspend)
774
bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);
775
776
BEV_UNLOCK(bev);
777
return 0;
778
}
779
780
int
781
bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
782
{
783
return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);
784
}
785
786
int
787
bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,
788
int unsuspend)
789
{
790
struct bufferevent_private *bevp = BEV_UPCAST(bev);
791
BEV_LOCK(bev);
792
if (bevp->rate_limiting && bevp->rate_limiting->group) {
793
struct bufferevent_rate_limit_group *g =
794
bevp->rate_limiting->group;
795
LOCK_GROUP(g);
796
bevp->rate_limiting->group = NULL;
797
--g->n_members;
798
LIST_REMOVE(bevp, rate_limiting->next_in_group);
799
UNLOCK_GROUP(g);
800
}
801
if (unsuspend) {
802
bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);
803
bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);
804
}
805
BEV_UNLOCK(bev);
806
return 0;
807
}
808
809
/* ===
810
* API functions to expose rate limits.
811
*
812
* Don't use these from inside Libevent; they're meant to be for use by
813
* the program.
814
* === */
815
816
/* Mostly you don't want to use this function from inside libevent;
817
* bufferevent_get_read_max_() is more likely what you want*/
818
ev_ssize_t
819
bufferevent_get_read_limit(struct bufferevent *bev)
820
{
821
ev_ssize_t r;
822
struct bufferevent_private *bevp;
823
BEV_LOCK(bev);
824
bevp = BEV_UPCAST(bev);
825
if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
826
bufferevent_update_buckets(bevp);
827
r = bevp->rate_limiting->limit.read_limit;
828
} else {
829
r = EV_SSIZE_MAX;
830
}
831
BEV_UNLOCK(bev);
832
return r;
833
}
834
835
/* Mostly you don't want to use this function from inside libevent;
836
* bufferevent_get_write_max_() is more likely what you want*/
837
ev_ssize_t
838
bufferevent_get_write_limit(struct bufferevent *bev)
839
{
840
ev_ssize_t r;
841
struct bufferevent_private *bevp;
842
BEV_LOCK(bev);
843
bevp = BEV_UPCAST(bev);
844
if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
845
bufferevent_update_buckets(bevp);
846
r = bevp->rate_limiting->limit.write_limit;
847
} else {
848
r = EV_SSIZE_MAX;
849
}
850
BEV_UNLOCK(bev);
851
return r;
852
}
853
854
int
855
bufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
856
{
857
struct bufferevent_private *bevp;
858
BEV_LOCK(bev);
859
bevp = BEV_UPCAST(bev);
860
if (size == 0 || size > EV_SSIZE_MAX)
861
bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
862
else
863
bevp->max_single_read = size;
864
BEV_UNLOCK(bev);
865
return 0;
866
}
867
868
int
869
bufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
870
{
871
struct bufferevent_private *bevp;
872
BEV_LOCK(bev);
873
bevp = BEV_UPCAST(bev);
874
if (size == 0 || size > EV_SSIZE_MAX)
875
bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
876
else
877
bevp->max_single_write = size;
878
BEV_UNLOCK(bev);
879
return 0;
880
}
881
882
ev_ssize_t
883
bufferevent_get_max_single_read(struct bufferevent *bev)
884
{
885
ev_ssize_t r;
886
887
BEV_LOCK(bev);
888
r = BEV_UPCAST(bev)->max_single_read;
889
BEV_UNLOCK(bev);
890
return r;
891
}
892
893
ev_ssize_t
894
bufferevent_get_max_single_write(struct bufferevent *bev)
895
{
896
ev_ssize_t r;
897
898
BEV_LOCK(bev);
899
r = BEV_UPCAST(bev)->max_single_write;
900
BEV_UNLOCK(bev);
901
return r;
902
}
903
904
ev_ssize_t
905
bufferevent_get_max_to_read(struct bufferevent *bev)
906
{
907
ev_ssize_t r;
908
BEV_LOCK(bev);
909
r = bufferevent_get_read_max_(BEV_UPCAST(bev));
910
BEV_UNLOCK(bev);
911
return r;
912
}
913
914
ev_ssize_t
915
bufferevent_get_max_to_write(struct bufferevent *bev)
916
{
917
ev_ssize_t r;
918
BEV_LOCK(bev);
919
r = bufferevent_get_write_max_(BEV_UPCAST(bev));
920
BEV_UNLOCK(bev);
921
return r;
922
}
923
924
const struct ev_token_bucket_cfg *
925
bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) {
926
struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
927
struct ev_token_bucket_cfg *cfg;
928
929
BEV_LOCK(bev);
930
931
if (bufev_private->rate_limiting) {
932
cfg = bufev_private->rate_limiting->cfg;
933
} else {
934
cfg = NULL;
935
}
936
937
BEV_UNLOCK(bev);
938
939
return cfg;
940
}
941
942
/* Mostly you don't want to use this function from inside libevent;
943
* bufferevent_get_read_max_() is more likely what you want*/
944
ev_ssize_t
945
bufferevent_rate_limit_group_get_read_limit(
946
struct bufferevent_rate_limit_group *grp)
947
{
948
ev_ssize_t r;
949
LOCK_GROUP(grp);
950
r = grp->rate_limit.read_limit;
951
UNLOCK_GROUP(grp);
952
return r;
953
}
954
955
/* Mostly you don't want to use this function from inside libevent;
956
* bufferevent_get_write_max_() is more likely what you want. */
957
ev_ssize_t
958
bufferevent_rate_limit_group_get_write_limit(
959
struct bufferevent_rate_limit_group *grp)
960
{
961
ev_ssize_t r;
962
LOCK_GROUP(grp);
963
r = grp->rate_limit.write_limit;
964
UNLOCK_GROUP(grp);
965
return r;
966
}
967
968
int
969
bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
970
{
971
int r = 0;
972
ev_ssize_t old_limit, new_limit;
973
struct bufferevent_private *bevp;
974
BEV_LOCK(bev);
975
bevp = BEV_UPCAST(bev);
976
EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
977
old_limit = bevp->rate_limiting->limit.read_limit;
978
979
new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
980
if (old_limit > 0 && new_limit <= 0) {
981
bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
982
if (event_add(&bevp->rate_limiting->refill_bucket_event,
983
&bevp->rate_limiting->cfg->tick_timeout) < 0)
984
r = -1;
985
} else if (old_limit <= 0 && new_limit > 0) {
986
if (!(bevp->write_suspended & BEV_SUSPEND_BW))
987
event_del(&bevp->rate_limiting->refill_bucket_event);
988
bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
989
}
990
991
BEV_UNLOCK(bev);
992
return r;
993
}
994
995
int
996
bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
997
{
998
/* XXXX this is mostly copy-and-paste from
999
* bufferevent_decrement_read_limit */
1000
int r = 0;
1001
ev_ssize_t old_limit, new_limit;
1002
struct bufferevent_private *bevp;
1003
BEV_LOCK(bev);
1004
bevp = BEV_UPCAST(bev);
1005
EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
1006
old_limit = bevp->rate_limiting->limit.write_limit;
1007
1008
new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
1009
if (old_limit > 0 && new_limit <= 0) {
1010
bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
1011
if (event_add(&bevp->rate_limiting->refill_bucket_event,
1012
&bevp->rate_limiting->cfg->tick_timeout) < 0)
1013
r = -1;
1014
} else if (old_limit <= 0 && new_limit > 0) {
1015
if (!(bevp->read_suspended & BEV_SUSPEND_BW))
1016
event_del(&bevp->rate_limiting->refill_bucket_event);
1017
bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
1018
}
1019
1020
BEV_UNLOCK(bev);
1021
return r;
1022
}
1023
1024
int
1025
bufferevent_rate_limit_group_decrement_read(
1026
struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1027
{
1028
int r = 0;
1029
ev_ssize_t old_limit, new_limit;
1030
LOCK_GROUP(grp);
1031
old_limit = grp->rate_limit.read_limit;
1032
new_limit = (grp->rate_limit.read_limit -= decr);
1033
1034
if (old_limit > 0 && new_limit <= 0) {
1035
bev_group_suspend_reading_(grp);
1036
} else if (old_limit <= 0 && new_limit > 0) {
1037
bev_group_unsuspend_reading_(grp);
1038
}
1039
1040
UNLOCK_GROUP(grp);
1041
return r;
1042
}
1043
1044
int
1045
bufferevent_rate_limit_group_decrement_write(
1046
struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1047
{
1048
int r = 0;
1049
ev_ssize_t old_limit, new_limit;
1050
LOCK_GROUP(grp);
1051
old_limit = grp->rate_limit.write_limit;
1052
new_limit = (grp->rate_limit.write_limit -= decr);
1053
1054
if (old_limit > 0 && new_limit <= 0) {
1055
bev_group_suspend_writing_(grp);
1056
} else if (old_limit <= 0 && new_limit > 0) {
1057
bev_group_unsuspend_writing_(grp);
1058
}
1059
1060
UNLOCK_GROUP(grp);
1061
return r;
1062
}
1063
1064
void
1065
bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
1066
ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
1067
{
1068
EVUTIL_ASSERT(grp != NULL);
1069
if (total_read_out)
1070
*total_read_out = grp->total_read;
1071
if (total_written_out)
1072
*total_written_out = grp->total_written;
1073
}
1074
1075
void
1076
bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
1077
{
1078
grp->total_read = grp->total_written = 0;
1079
}
1080
1081
int
1082
bufferevent_ratelim_init_(struct bufferevent_private *bev)
1083
{
1084
bev->rate_limiting = NULL;
1085
bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
1086
bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
1087
1088
return 0;
1089
}
1090
1091