Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/tools/lib/bpf/ringbuf.c
26282 views
1
// SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause)
2
/*
3
* Ring buffer operations.
4
*
5
* Copyright (C) 2020 Facebook, Inc.
6
*/
7
#ifndef _GNU_SOURCE
8
#define _GNU_SOURCE
9
#endif
10
#include <stdlib.h>
11
#include <stdio.h>
12
#include <errno.h>
13
#include <unistd.h>
14
#include <linux/err.h>
15
#include <linux/bpf.h>
16
#include <asm/barrier.h>
17
#include <sys/mman.h>
18
#include <sys/epoll.h>
19
#include <time.h>
20
21
#include "libbpf.h"
22
#include "libbpf_internal.h"
23
#include "bpf.h"
24
#include "str_error.h"
25
26
struct ring {
27
ring_buffer_sample_fn sample_cb;
28
void *ctx;
29
void *data;
30
unsigned long *consumer_pos;
31
unsigned long *producer_pos;
32
unsigned long mask;
33
int map_fd;
34
};
35
36
struct ring_buffer {
37
struct epoll_event *events;
38
struct ring **rings;
39
size_t page_size;
40
int epoll_fd;
41
int ring_cnt;
42
};
43
44
struct user_ring_buffer {
45
struct epoll_event event;
46
unsigned long *consumer_pos;
47
unsigned long *producer_pos;
48
void *data;
49
unsigned long mask;
50
size_t page_size;
51
int map_fd;
52
int epoll_fd;
53
};
54
55
/* 8-byte ring buffer header structure */
56
struct ringbuf_hdr {
57
__u32 len;
58
__u32 pad;
59
};
60
61
static void ringbuf_free_ring(struct ring_buffer *rb, struct ring *r)
62
{
63
if (r->consumer_pos) {
64
munmap(r->consumer_pos, rb->page_size);
65
r->consumer_pos = NULL;
66
}
67
if (r->producer_pos) {
68
munmap(r->producer_pos, rb->page_size + 2 * (r->mask + 1));
69
r->producer_pos = NULL;
70
}
71
72
free(r);
73
}
74
75
/* Add extra RINGBUF maps to this ring buffer manager */
76
int ring_buffer__add(struct ring_buffer *rb, int map_fd,
77
ring_buffer_sample_fn sample_cb, void *ctx)
78
{
79
struct bpf_map_info info;
80
__u32 len = sizeof(info);
81
struct epoll_event *e;
82
struct ring *r;
83
__u64 mmap_sz;
84
void *tmp;
85
int err;
86
87
memset(&info, 0, sizeof(info));
88
89
err = bpf_map_get_info_by_fd(map_fd, &info, &len);
90
if (err) {
91
err = -errno;
92
pr_warn("ringbuf: failed to get map info for fd=%d: %s\n",
93
map_fd, errstr(err));
94
return libbpf_err(err);
95
}
96
97
if (info.type != BPF_MAP_TYPE_RINGBUF) {
98
pr_warn("ringbuf: map fd=%d is not BPF_MAP_TYPE_RINGBUF\n",
99
map_fd);
100
return libbpf_err(-EINVAL);
101
}
102
103
tmp = libbpf_reallocarray(rb->rings, rb->ring_cnt + 1, sizeof(*rb->rings));
104
if (!tmp)
105
return libbpf_err(-ENOMEM);
106
rb->rings = tmp;
107
108
tmp = libbpf_reallocarray(rb->events, rb->ring_cnt + 1, sizeof(*rb->events));
109
if (!tmp)
110
return libbpf_err(-ENOMEM);
111
rb->events = tmp;
112
113
r = calloc(1, sizeof(*r));
114
if (!r)
115
return libbpf_err(-ENOMEM);
116
rb->rings[rb->ring_cnt] = r;
117
118
r->map_fd = map_fd;
119
r->sample_cb = sample_cb;
120
r->ctx = ctx;
121
r->mask = info.max_entries - 1;
122
123
/* Map writable consumer page */
124
tmp = mmap(NULL, rb->page_size, PROT_READ | PROT_WRITE, MAP_SHARED, map_fd, 0);
125
if (tmp == MAP_FAILED) {
126
err = -errno;
127
pr_warn("ringbuf: failed to mmap consumer page for map fd=%d: %s\n",
128
map_fd, errstr(err));
129
goto err_out;
130
}
131
r->consumer_pos = tmp;
132
133
/* Map read-only producer page and data pages. We map twice as big
134
* data size to allow simple reading of samples that wrap around the
135
* end of a ring buffer. See kernel implementation for details.
136
*/
137
mmap_sz = rb->page_size + 2 * (__u64)info.max_entries;
138
if (mmap_sz != (__u64)(size_t)mmap_sz) {
139
err = -E2BIG;
140
pr_warn("ringbuf: ring buffer size (%u) is too big\n", info.max_entries);
141
goto err_out;
142
}
143
tmp = mmap(NULL, (size_t)mmap_sz, PROT_READ, MAP_SHARED, map_fd, rb->page_size);
144
if (tmp == MAP_FAILED) {
145
err = -errno;
146
pr_warn("ringbuf: failed to mmap data pages for map fd=%d: %s\n",
147
map_fd, errstr(err));
148
goto err_out;
149
}
150
r->producer_pos = tmp;
151
r->data = tmp + rb->page_size;
152
153
e = &rb->events[rb->ring_cnt];
154
memset(e, 0, sizeof(*e));
155
156
e->events = EPOLLIN;
157
e->data.fd = rb->ring_cnt;
158
if (epoll_ctl(rb->epoll_fd, EPOLL_CTL_ADD, map_fd, e) < 0) {
159
err = -errno;
160
pr_warn("ringbuf: failed to epoll add map fd=%d: %s\n",
161
map_fd, errstr(err));
162
goto err_out;
163
}
164
165
rb->ring_cnt++;
166
return 0;
167
168
err_out:
169
ringbuf_free_ring(rb, r);
170
return libbpf_err(err);
171
}
172
173
void ring_buffer__free(struct ring_buffer *rb)
174
{
175
int i;
176
177
if (!rb)
178
return;
179
180
for (i = 0; i < rb->ring_cnt; ++i)
181
ringbuf_free_ring(rb, rb->rings[i]);
182
if (rb->epoll_fd >= 0)
183
close(rb->epoll_fd);
184
185
free(rb->events);
186
free(rb->rings);
187
free(rb);
188
}
189
190
struct ring_buffer *
191
ring_buffer__new(int map_fd, ring_buffer_sample_fn sample_cb, void *ctx,
192
const struct ring_buffer_opts *opts)
193
{
194
struct ring_buffer *rb;
195
int err;
196
197
if (!OPTS_VALID(opts, ring_buffer_opts))
198
return errno = EINVAL, NULL;
199
200
rb = calloc(1, sizeof(*rb));
201
if (!rb)
202
return errno = ENOMEM, NULL;
203
204
rb->page_size = getpagesize();
205
206
rb->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
207
if (rb->epoll_fd < 0) {
208
err = -errno;
209
pr_warn("ringbuf: failed to create epoll instance: %s\n", errstr(err));
210
goto err_out;
211
}
212
213
err = ring_buffer__add(rb, map_fd, sample_cb, ctx);
214
if (err)
215
goto err_out;
216
217
return rb;
218
219
err_out:
220
ring_buffer__free(rb);
221
return errno = -err, NULL;
222
}
223
224
static inline int roundup_len(__u32 len)
225
{
226
/* clear out top 2 bits (discard and busy, if set) */
227
len <<= 2;
228
len >>= 2;
229
/* add length prefix */
230
len += BPF_RINGBUF_HDR_SZ;
231
/* round up to 8 byte alignment */
232
return (len + 7) / 8 * 8;
233
}
234
235
static int64_t ringbuf_process_ring(struct ring *r, size_t n)
236
{
237
int *len_ptr, len, err;
238
/* 64-bit to avoid overflow in case of extreme application behavior */
239
int64_t cnt = 0;
240
unsigned long cons_pos, prod_pos;
241
bool got_new_data;
242
void *sample;
243
244
cons_pos = smp_load_acquire(r->consumer_pos);
245
do {
246
got_new_data = false;
247
prod_pos = smp_load_acquire(r->producer_pos);
248
while (cons_pos < prod_pos) {
249
len_ptr = r->data + (cons_pos & r->mask);
250
len = smp_load_acquire(len_ptr);
251
252
/* sample not committed yet, bail out for now */
253
if (len & BPF_RINGBUF_BUSY_BIT)
254
goto done;
255
256
got_new_data = true;
257
cons_pos += roundup_len(len);
258
259
if ((len & BPF_RINGBUF_DISCARD_BIT) == 0) {
260
sample = (void *)len_ptr + BPF_RINGBUF_HDR_SZ;
261
err = r->sample_cb(r->ctx, sample, len);
262
if (err < 0) {
263
/* update consumer pos and bail out */
264
smp_store_release(r->consumer_pos,
265
cons_pos);
266
return err;
267
}
268
cnt++;
269
}
270
271
smp_store_release(r->consumer_pos, cons_pos);
272
273
if (cnt >= n)
274
goto done;
275
}
276
} while (got_new_data);
277
done:
278
return cnt;
279
}
280
281
/* Consume available ring buffer(s) data without event polling, up to n
282
* records.
283
*
284
* Returns number of records consumed across all registered ring buffers (or
285
* n, whichever is less), or negative number if any of the callbacks return
286
* error.
287
*/
288
int ring_buffer__consume_n(struct ring_buffer *rb, size_t n)
289
{
290
int64_t err, res = 0;
291
int i;
292
293
for (i = 0; i < rb->ring_cnt; i++) {
294
struct ring *ring = rb->rings[i];
295
296
err = ringbuf_process_ring(ring, n);
297
if (err < 0)
298
return libbpf_err(err);
299
res += err;
300
n -= err;
301
302
if (n == 0)
303
break;
304
}
305
return res > INT_MAX ? INT_MAX : res;
306
}
307
308
/* Consume available ring buffer(s) data without event polling.
309
* Returns number of records consumed across all registered ring buffers (or
310
* INT_MAX, whichever is less), or negative number if any of the callbacks
311
* return error.
312
*/
313
int ring_buffer__consume(struct ring_buffer *rb)
314
{
315
int64_t err, res = 0;
316
int i;
317
318
for (i = 0; i < rb->ring_cnt; i++) {
319
struct ring *ring = rb->rings[i];
320
321
err = ringbuf_process_ring(ring, INT_MAX);
322
if (err < 0)
323
return libbpf_err(err);
324
res += err;
325
if (res > INT_MAX) {
326
res = INT_MAX;
327
break;
328
}
329
}
330
return res;
331
}
332
333
/* Poll for available data and consume records, if any are available.
334
* Returns number of records consumed (or INT_MAX, whichever is less), or
335
* negative number, if any of the registered callbacks returned error.
336
*/
337
int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms)
338
{
339
int i, cnt;
340
int64_t err, res = 0;
341
342
cnt = epoll_wait(rb->epoll_fd, rb->events, rb->ring_cnt, timeout_ms);
343
if (cnt < 0)
344
return libbpf_err(-errno);
345
346
for (i = 0; i < cnt; i++) {
347
__u32 ring_id = rb->events[i].data.fd;
348
struct ring *ring = rb->rings[ring_id];
349
350
err = ringbuf_process_ring(ring, INT_MAX);
351
if (err < 0)
352
return libbpf_err(err);
353
res += err;
354
}
355
if (res > INT_MAX)
356
res = INT_MAX;
357
return res;
358
}
359
360
/* Get an fd that can be used to sleep until data is available in the ring(s) */
361
int ring_buffer__epoll_fd(const struct ring_buffer *rb)
362
{
363
return rb->epoll_fd;
364
}
365
366
struct ring *ring_buffer__ring(struct ring_buffer *rb, unsigned int idx)
367
{
368
if (idx >= rb->ring_cnt)
369
return errno = ERANGE, NULL;
370
371
return rb->rings[idx];
372
}
373
374
unsigned long ring__consumer_pos(const struct ring *r)
375
{
376
/* Synchronizes with smp_store_release() in ringbuf_process_ring(). */
377
return smp_load_acquire(r->consumer_pos);
378
}
379
380
unsigned long ring__producer_pos(const struct ring *r)
381
{
382
/* Synchronizes with smp_store_release() in __bpf_ringbuf_reserve() in
383
* the kernel.
384
*/
385
return smp_load_acquire(r->producer_pos);
386
}
387
388
size_t ring__avail_data_size(const struct ring *r)
389
{
390
unsigned long cons_pos, prod_pos;
391
392
cons_pos = ring__consumer_pos(r);
393
prod_pos = ring__producer_pos(r);
394
return prod_pos - cons_pos;
395
}
396
397
size_t ring__size(const struct ring *r)
398
{
399
return r->mask + 1;
400
}
401
402
int ring__map_fd(const struct ring *r)
403
{
404
return r->map_fd;
405
}
406
407
int ring__consume_n(struct ring *r, size_t n)
408
{
409
int64_t res;
410
411
res = ringbuf_process_ring(r, n);
412
if (res < 0)
413
return libbpf_err(res);
414
415
return res > INT_MAX ? INT_MAX : res;
416
}
417
418
int ring__consume(struct ring *r)
419
{
420
return ring__consume_n(r, INT_MAX);
421
}
422
423
static void user_ringbuf_unmap_ring(struct user_ring_buffer *rb)
424
{
425
if (rb->consumer_pos) {
426
munmap(rb->consumer_pos, rb->page_size);
427
rb->consumer_pos = NULL;
428
}
429
if (rb->producer_pos) {
430
munmap(rb->producer_pos, rb->page_size + 2 * (rb->mask + 1));
431
rb->producer_pos = NULL;
432
}
433
}
434
435
void user_ring_buffer__free(struct user_ring_buffer *rb)
436
{
437
if (!rb)
438
return;
439
440
user_ringbuf_unmap_ring(rb);
441
442
if (rb->epoll_fd >= 0)
443
close(rb->epoll_fd);
444
445
free(rb);
446
}
447
448
static int user_ringbuf_map(struct user_ring_buffer *rb, int map_fd)
449
{
450
struct bpf_map_info info;
451
__u32 len = sizeof(info);
452
__u64 mmap_sz;
453
void *tmp;
454
struct epoll_event *rb_epoll;
455
int err;
456
457
memset(&info, 0, sizeof(info));
458
459
err = bpf_map_get_info_by_fd(map_fd, &info, &len);
460
if (err) {
461
err = -errno;
462
pr_warn("user ringbuf: failed to get map info for fd=%d: %s\n",
463
map_fd, errstr(err));
464
return err;
465
}
466
467
if (info.type != BPF_MAP_TYPE_USER_RINGBUF) {
468
pr_warn("user ringbuf: map fd=%d is not BPF_MAP_TYPE_USER_RINGBUF\n", map_fd);
469
return -EINVAL;
470
}
471
472
rb->map_fd = map_fd;
473
rb->mask = info.max_entries - 1;
474
475
/* Map read-only consumer page */
476
tmp = mmap(NULL, rb->page_size, PROT_READ, MAP_SHARED, map_fd, 0);
477
if (tmp == MAP_FAILED) {
478
err = -errno;
479
pr_warn("user ringbuf: failed to mmap consumer page for map fd=%d: %s\n",
480
map_fd, errstr(err));
481
return err;
482
}
483
rb->consumer_pos = tmp;
484
485
/* Map read-write the producer page and data pages. We map the data
486
* region as twice the total size of the ring buffer to allow the
487
* simple reading and writing of samples that wrap around the end of
488
* the buffer. See the kernel implementation for details.
489
*/
490
mmap_sz = rb->page_size + 2 * (__u64)info.max_entries;
491
if (mmap_sz != (__u64)(size_t)mmap_sz) {
492
pr_warn("user ringbuf: ring buf size (%u) is too big\n", info.max_entries);
493
return -E2BIG;
494
}
495
tmp = mmap(NULL, (size_t)mmap_sz, PROT_READ | PROT_WRITE, MAP_SHARED,
496
map_fd, rb->page_size);
497
if (tmp == MAP_FAILED) {
498
err = -errno;
499
pr_warn("user ringbuf: failed to mmap data pages for map fd=%d: %s\n",
500
map_fd, errstr(err));
501
return err;
502
}
503
504
rb->producer_pos = tmp;
505
rb->data = tmp + rb->page_size;
506
507
rb_epoll = &rb->event;
508
rb_epoll->events = EPOLLOUT;
509
if (epoll_ctl(rb->epoll_fd, EPOLL_CTL_ADD, map_fd, rb_epoll) < 0) {
510
err = -errno;
511
pr_warn("user ringbuf: failed to epoll add map fd=%d: %s\n", map_fd, errstr(err));
512
return err;
513
}
514
515
return 0;
516
}
517
518
struct user_ring_buffer *
519
user_ring_buffer__new(int map_fd, const struct user_ring_buffer_opts *opts)
520
{
521
struct user_ring_buffer *rb;
522
int err;
523
524
if (!OPTS_VALID(opts, user_ring_buffer_opts))
525
return errno = EINVAL, NULL;
526
527
rb = calloc(1, sizeof(*rb));
528
if (!rb)
529
return errno = ENOMEM, NULL;
530
531
rb->page_size = getpagesize();
532
533
rb->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
534
if (rb->epoll_fd < 0) {
535
err = -errno;
536
pr_warn("user ringbuf: failed to create epoll instance: %s\n", errstr(err));
537
goto err_out;
538
}
539
540
err = user_ringbuf_map(rb, map_fd);
541
if (err)
542
goto err_out;
543
544
return rb;
545
546
err_out:
547
user_ring_buffer__free(rb);
548
return errno = -err, NULL;
549
}
550
551
static void user_ringbuf_commit(struct user_ring_buffer *rb, void *sample, bool discard)
552
{
553
__u32 new_len;
554
struct ringbuf_hdr *hdr;
555
uintptr_t hdr_offset;
556
557
hdr_offset = rb->mask + 1 + (sample - rb->data) - BPF_RINGBUF_HDR_SZ;
558
hdr = rb->data + (hdr_offset & rb->mask);
559
560
new_len = hdr->len & ~BPF_RINGBUF_BUSY_BIT;
561
if (discard)
562
new_len |= BPF_RINGBUF_DISCARD_BIT;
563
564
/* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_peek() in
565
* the kernel.
566
*/
567
__atomic_exchange_n(&hdr->len, new_len, __ATOMIC_ACQ_REL);
568
}
569
570
void user_ring_buffer__discard(struct user_ring_buffer *rb, void *sample)
571
{
572
user_ringbuf_commit(rb, sample, true);
573
}
574
575
void user_ring_buffer__submit(struct user_ring_buffer *rb, void *sample)
576
{
577
user_ringbuf_commit(rb, sample, false);
578
}
579
580
void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size)
581
{
582
__u32 avail_size, total_size, max_size;
583
/* 64-bit to avoid overflow in case of extreme application behavior */
584
__u64 cons_pos, prod_pos;
585
struct ringbuf_hdr *hdr;
586
587
/* The top two bits are used as special flags */
588
if (size & (BPF_RINGBUF_BUSY_BIT | BPF_RINGBUF_DISCARD_BIT))
589
return errno = E2BIG, NULL;
590
591
/* Synchronizes with smp_store_release() in __bpf_user_ringbuf_peek() in
592
* the kernel.
593
*/
594
cons_pos = smp_load_acquire(rb->consumer_pos);
595
/* Synchronizes with smp_store_release() in user_ringbuf_commit() */
596
prod_pos = smp_load_acquire(rb->producer_pos);
597
598
max_size = rb->mask + 1;
599
avail_size = max_size - (prod_pos - cons_pos);
600
/* Round up total size to a multiple of 8. */
601
total_size = (size + BPF_RINGBUF_HDR_SZ + 7) / 8 * 8;
602
603
if (total_size > max_size)
604
return errno = E2BIG, NULL;
605
606
if (avail_size < total_size)
607
return errno = ENOSPC, NULL;
608
609
hdr = rb->data + (prod_pos & rb->mask);
610
hdr->len = size | BPF_RINGBUF_BUSY_BIT;
611
hdr->pad = 0;
612
613
/* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_peek() in
614
* the kernel.
615
*/
616
smp_store_release(rb->producer_pos, prod_pos + total_size);
617
618
return (void *)rb->data + ((prod_pos + BPF_RINGBUF_HDR_SZ) & rb->mask);
619
}
620
621
static __u64 ns_elapsed_timespec(const struct timespec *start, const struct timespec *end)
622
{
623
__u64 start_ns, end_ns, ns_per_s = 1000000000;
624
625
start_ns = (__u64)start->tv_sec * ns_per_s + start->tv_nsec;
626
end_ns = (__u64)end->tv_sec * ns_per_s + end->tv_nsec;
627
628
return end_ns - start_ns;
629
}
630
631
void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb, __u32 size, int timeout_ms)
632
{
633
void *sample;
634
int err, ms_remaining = timeout_ms;
635
struct timespec start;
636
637
if (timeout_ms < 0 && timeout_ms != -1)
638
return errno = EINVAL, NULL;
639
640
if (timeout_ms != -1) {
641
err = clock_gettime(CLOCK_MONOTONIC, &start);
642
if (err)
643
return NULL;
644
}
645
646
do {
647
int cnt, ms_elapsed;
648
struct timespec curr;
649
__u64 ns_per_ms = 1000000;
650
651
sample = user_ring_buffer__reserve(rb, size);
652
if (sample)
653
return sample;
654
else if (errno != ENOSPC)
655
return NULL;
656
657
/* The kernel guarantees at least one event notification
658
* delivery whenever at least one sample is drained from the
659
* ring buffer in an invocation to bpf_ringbuf_drain(). Other
660
* additional events may be delivered at any time, but only one
661
* event is guaranteed per bpf_ringbuf_drain() invocation,
662
* provided that a sample is drained, and the BPF program did
663
* not pass BPF_RB_NO_WAKEUP to bpf_ringbuf_drain(). If
664
* BPF_RB_FORCE_WAKEUP is passed to bpf_ringbuf_drain(), a
665
* wakeup event will be delivered even if no samples are
666
* drained.
667
*/
668
cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, ms_remaining);
669
if (cnt < 0)
670
return NULL;
671
672
if (timeout_ms == -1)
673
continue;
674
675
err = clock_gettime(CLOCK_MONOTONIC, &curr);
676
if (err)
677
return NULL;
678
679
ms_elapsed = ns_elapsed_timespec(&start, &curr) / ns_per_ms;
680
ms_remaining = timeout_ms - ms_elapsed;
681
} while (ms_remaining > 0);
682
683
/* Try one more time to reserve a sample after the specified timeout has elapsed. */
684
return user_ring_buffer__reserve(rb, size);
685
}
686
687