Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/kernel/bpf/ringbuf.c
48986 views
1
#include <linux/bpf.h>
2
#include <linux/btf.h>
3
#include <linux/err.h>
4
#include <linux/irq_work.h>
5
#include <linux/slab.h>
6
#include <linux/filter.h>
7
#include <linux/mm.h>
8
#include <linux/vmalloc.h>
9
#include <linux/wait.h>
10
#include <linux/poll.h>
11
#include <linux/kmemleak.h>
12
#include <uapi/linux/btf.h>
13
#include <linux/btf_ids.h>
14
#include <asm/rqspinlock.h>
15
16
#define RINGBUF_CREATE_FLAG_MASK (BPF_F_NUMA_NODE | BPF_F_RB_OVERWRITE)
17
18
/* non-mmap()'able part of bpf_ringbuf (everything up to consumer page) */
19
#define RINGBUF_PGOFF \
20
(offsetof(struct bpf_ringbuf, consumer_pos) >> PAGE_SHIFT)
21
/* consumer page and producer page */
22
#define RINGBUF_POS_PAGES 2
23
#define RINGBUF_NR_META_PAGES (RINGBUF_PGOFF + RINGBUF_POS_PAGES)
24
25
#define RINGBUF_MAX_RECORD_SZ (UINT_MAX/4)
26
27
struct bpf_ringbuf {
28
wait_queue_head_t waitq;
29
struct irq_work work;
30
u64 mask;
31
struct page **pages;
32
int nr_pages;
33
bool overwrite_mode;
34
rqspinlock_t spinlock ____cacheline_aligned_in_smp;
35
/* For user-space producer ring buffers, an atomic_t busy bit is used
36
* to synchronize access to the ring buffers in the kernel, rather than
37
* the spinlock that is used for kernel-producer ring buffers. This is
38
* done because the ring buffer must hold a lock across a BPF program's
39
* callback:
40
*
41
* __bpf_user_ringbuf_peek() // lock acquired
42
* -> program callback_fn()
43
* -> __bpf_user_ringbuf_sample_release() // lock released
44
*
45
* It is unsafe and incorrect to hold an IRQ spinlock across what could
46
* be a long execution window, so we instead simply disallow concurrent
47
* access to the ring buffer by kernel consumers, and return -EBUSY from
48
* __bpf_user_ringbuf_peek() if the busy bit is held by another task.
49
*/
50
atomic_t busy ____cacheline_aligned_in_smp;
51
/* Consumer and producer counters are put into separate pages to
52
* allow each position to be mapped with different permissions.
53
* This prevents a user-space application from modifying the
54
* position and ruining in-kernel tracking. The permissions of the
55
* pages depend on who is producing samples: user-space or the
56
* kernel. Note that the pending counter is placed in the same
57
* page as the producer, so that it shares the same cache line.
58
*
59
* Kernel-producer
60
* ---------------
61
* The producer position and data pages are mapped as r/o in
62
* userspace. For this approach, bits in the header of samples are
63
* used to signal to user-space, and to other producers, whether a
64
* sample is currently being written.
65
*
66
* User-space producer
67
* -------------------
68
* Only the page containing the consumer position is mapped r/o in
69
* user-space. User-space producers also use bits of the header to
70
* communicate to the kernel, but the kernel must carefully check and
71
* validate each sample to ensure that they're correctly formatted, and
72
* fully contained within the ring buffer.
73
*/
74
unsigned long consumer_pos __aligned(PAGE_SIZE);
75
unsigned long producer_pos __aligned(PAGE_SIZE);
76
unsigned long pending_pos;
77
unsigned long overwrite_pos; /* position after the last overwritten record */
78
char data[] __aligned(PAGE_SIZE);
79
};
80
81
struct bpf_ringbuf_map {
82
struct bpf_map map;
83
struct bpf_ringbuf *rb;
84
};
85
86
/* 8-byte ring buffer record header structure */
87
struct bpf_ringbuf_hdr {
88
u32 len;
89
u32 pg_off;
90
};
91
92
static struct bpf_ringbuf *bpf_ringbuf_area_alloc(size_t data_sz, int numa_node)
93
{
94
const gfp_t flags = GFP_KERNEL_ACCOUNT | __GFP_RETRY_MAYFAIL |
95
__GFP_NOWARN | __GFP_ZERO;
96
int nr_meta_pages = RINGBUF_NR_META_PAGES;
97
int nr_data_pages = data_sz >> PAGE_SHIFT;
98
int nr_pages = nr_meta_pages + nr_data_pages;
99
struct page **pages, *page;
100
struct bpf_ringbuf *rb;
101
size_t array_size;
102
int i;
103
104
/* Each data page is mapped twice to allow "virtual"
105
* continuous read of samples wrapping around the end of ring
106
* buffer area:
107
* ------------------------------------------------------
108
* | meta pages | real data pages | same data pages |
109
* ------------------------------------------------------
110
* | | 1 2 3 4 5 6 7 8 9 | 1 2 3 4 5 6 7 8 9 |
111
* ------------------------------------------------------
112
* | | TA DA | TA DA |
113
* ------------------------------------------------------
114
* ^^^^^^^
115
* |
116
* Here, no need to worry about special handling of wrapped-around
117
* data due to double-mapped data pages. This works both in kernel and
118
* when mmap()'ed in user-space, simplifying both kernel and
119
* user-space implementations significantly.
120
*/
121
array_size = (nr_meta_pages + 2 * nr_data_pages) * sizeof(*pages);
122
pages = bpf_map_area_alloc(array_size, numa_node);
123
if (!pages)
124
return NULL;
125
126
for (i = 0; i < nr_pages; i++) {
127
page = alloc_pages_node(numa_node, flags, 0);
128
if (!page) {
129
nr_pages = i;
130
goto err_free_pages;
131
}
132
pages[i] = page;
133
if (i >= nr_meta_pages)
134
pages[nr_data_pages + i] = page;
135
}
136
137
rb = vmap(pages, nr_meta_pages + 2 * nr_data_pages,
138
VM_MAP | VM_USERMAP, PAGE_KERNEL);
139
if (rb) {
140
kmemleak_not_leak(pages);
141
rb->pages = pages;
142
rb->nr_pages = nr_pages;
143
return rb;
144
}
145
146
err_free_pages:
147
for (i = 0; i < nr_pages; i++)
148
__free_page(pages[i]);
149
bpf_map_area_free(pages);
150
return NULL;
151
}
152
153
static void bpf_ringbuf_notify(struct irq_work *work)
154
{
155
struct bpf_ringbuf *rb = container_of(work, struct bpf_ringbuf, work);
156
157
wake_up_all(&rb->waitq);
158
}
159
160
/* Maximum size of ring buffer area is limited by 32-bit page offset within
161
* record header, counted in pages. Reserve 8 bits for extensibility, and
162
* take into account few extra pages for consumer/producer pages and
163
* non-mmap()'able parts, the current maximum size would be:
164
*
165
* (((1ULL << 24) - RINGBUF_POS_PAGES - RINGBUF_PGOFF) * PAGE_SIZE)
166
*
167
* This gives 64GB limit, which seems plenty for single ring buffer. Now
168
* considering that the maximum value of data_sz is (4GB - 1), there
169
* will be no overflow, so just note the size limit in the comments.
170
*/
171
static struct bpf_ringbuf *bpf_ringbuf_alloc(size_t data_sz, int numa_node, bool overwrite_mode)
172
{
173
struct bpf_ringbuf *rb;
174
175
rb = bpf_ringbuf_area_alloc(data_sz, numa_node);
176
if (!rb)
177
return NULL;
178
179
raw_res_spin_lock_init(&rb->spinlock);
180
atomic_set(&rb->busy, 0);
181
init_waitqueue_head(&rb->waitq);
182
init_irq_work(&rb->work, bpf_ringbuf_notify);
183
184
rb->mask = data_sz - 1;
185
rb->consumer_pos = 0;
186
rb->producer_pos = 0;
187
rb->pending_pos = 0;
188
rb->overwrite_mode = overwrite_mode;
189
190
return rb;
191
}
192
193
static struct bpf_map *ringbuf_map_alloc(union bpf_attr *attr)
194
{
195
bool overwrite_mode = false;
196
struct bpf_ringbuf_map *rb_map;
197
198
if (attr->map_flags & ~RINGBUF_CREATE_FLAG_MASK)
199
return ERR_PTR(-EINVAL);
200
201
if (attr->map_flags & BPF_F_RB_OVERWRITE) {
202
if (attr->map_type != BPF_MAP_TYPE_RINGBUF)
203
return ERR_PTR(-EINVAL);
204
overwrite_mode = true;
205
}
206
207
if (attr->key_size || attr->value_size ||
208
!is_power_of_2(attr->max_entries) ||
209
!PAGE_ALIGNED(attr->max_entries))
210
return ERR_PTR(-EINVAL);
211
212
rb_map = bpf_map_area_alloc(sizeof(*rb_map), NUMA_NO_NODE);
213
if (!rb_map)
214
return ERR_PTR(-ENOMEM);
215
216
bpf_map_init_from_attr(&rb_map->map, attr);
217
218
rb_map->rb = bpf_ringbuf_alloc(attr->max_entries, rb_map->map.numa_node, overwrite_mode);
219
if (!rb_map->rb) {
220
bpf_map_area_free(rb_map);
221
return ERR_PTR(-ENOMEM);
222
}
223
224
return &rb_map->map;
225
}
226
227
static void bpf_ringbuf_free(struct bpf_ringbuf *rb)
228
{
229
irq_work_sync(&rb->work);
230
231
/* copy pages pointer and nr_pages to local variable, as we are going
232
* to unmap rb itself with vunmap() below
233
*/
234
struct page **pages = rb->pages;
235
int i, nr_pages = rb->nr_pages;
236
237
vunmap(rb);
238
for (i = 0; i < nr_pages; i++)
239
__free_page(pages[i]);
240
bpf_map_area_free(pages);
241
}
242
243
static void ringbuf_map_free(struct bpf_map *map)
244
{
245
struct bpf_ringbuf_map *rb_map;
246
247
rb_map = container_of(map, struct bpf_ringbuf_map, map);
248
bpf_ringbuf_free(rb_map->rb);
249
bpf_map_area_free(rb_map);
250
}
251
252
static void *ringbuf_map_lookup_elem(struct bpf_map *map, void *key)
253
{
254
return ERR_PTR(-ENOTSUPP);
255
}
256
257
static long ringbuf_map_update_elem(struct bpf_map *map, void *key, void *value,
258
u64 flags)
259
{
260
return -ENOTSUPP;
261
}
262
263
static long ringbuf_map_delete_elem(struct bpf_map *map, void *key)
264
{
265
return -ENOTSUPP;
266
}
267
268
static int ringbuf_map_get_next_key(struct bpf_map *map, void *key,
269
void *next_key)
270
{
271
return -ENOTSUPP;
272
}
273
274
static int ringbuf_map_mmap_kern(struct bpf_map *map, struct vm_area_struct *vma)
275
{
276
struct bpf_ringbuf_map *rb_map;
277
278
rb_map = container_of(map, struct bpf_ringbuf_map, map);
279
280
if (vma->vm_flags & VM_WRITE) {
281
/* allow writable mapping for the consumer_pos only */
282
if (vma->vm_pgoff != 0 || vma->vm_end - vma->vm_start != PAGE_SIZE)
283
return -EPERM;
284
}
285
/* remap_vmalloc_range() checks size and offset constraints */
286
return remap_vmalloc_range(vma, rb_map->rb,
287
vma->vm_pgoff + RINGBUF_PGOFF);
288
}
289
290
static int ringbuf_map_mmap_user(struct bpf_map *map, struct vm_area_struct *vma)
291
{
292
struct bpf_ringbuf_map *rb_map;
293
294
rb_map = container_of(map, struct bpf_ringbuf_map, map);
295
296
if (vma->vm_flags & VM_WRITE) {
297
if (vma->vm_pgoff == 0)
298
/* Disallow writable mappings to the consumer pointer,
299
* and allow writable mappings to both the producer
300
* position, and the ring buffer data itself.
301
*/
302
return -EPERM;
303
}
304
/* remap_vmalloc_range() checks size and offset constraints */
305
return remap_vmalloc_range(vma, rb_map->rb, vma->vm_pgoff + RINGBUF_PGOFF);
306
}
307
308
/*
309
* Return an estimate of the available data in the ring buffer.
310
* Note: the returned value can exceed the actual ring buffer size because the
311
* function is not synchronized with the producer. The producer acquires the
312
* ring buffer's spinlock, but this function does not.
313
*/
314
static unsigned long ringbuf_avail_data_sz(struct bpf_ringbuf *rb)
315
{
316
unsigned long cons_pos, prod_pos, over_pos;
317
318
cons_pos = smp_load_acquire(&rb->consumer_pos);
319
320
if (unlikely(rb->overwrite_mode)) {
321
over_pos = smp_load_acquire(&rb->overwrite_pos);
322
prod_pos = smp_load_acquire(&rb->producer_pos);
323
return prod_pos - max(cons_pos, over_pos);
324
} else {
325
prod_pos = smp_load_acquire(&rb->producer_pos);
326
return prod_pos - cons_pos;
327
}
328
}
329
330
static u32 ringbuf_total_data_sz(const struct bpf_ringbuf *rb)
331
{
332
return rb->mask + 1;
333
}
334
335
static __poll_t ringbuf_map_poll_kern(struct bpf_map *map, struct file *filp,
336
struct poll_table_struct *pts)
337
{
338
struct bpf_ringbuf_map *rb_map;
339
340
rb_map = container_of(map, struct bpf_ringbuf_map, map);
341
poll_wait(filp, &rb_map->rb->waitq, pts);
342
343
if (ringbuf_avail_data_sz(rb_map->rb))
344
return EPOLLIN | EPOLLRDNORM;
345
return 0;
346
}
347
348
static __poll_t ringbuf_map_poll_user(struct bpf_map *map, struct file *filp,
349
struct poll_table_struct *pts)
350
{
351
struct bpf_ringbuf_map *rb_map;
352
353
rb_map = container_of(map, struct bpf_ringbuf_map, map);
354
poll_wait(filp, &rb_map->rb->waitq, pts);
355
356
if (ringbuf_avail_data_sz(rb_map->rb) < ringbuf_total_data_sz(rb_map->rb))
357
return EPOLLOUT | EPOLLWRNORM;
358
return 0;
359
}
360
361
static u64 ringbuf_map_mem_usage(const struct bpf_map *map)
362
{
363
struct bpf_ringbuf *rb;
364
int nr_data_pages;
365
int nr_meta_pages;
366
u64 usage = sizeof(struct bpf_ringbuf_map);
367
368
rb = container_of(map, struct bpf_ringbuf_map, map)->rb;
369
usage += (u64)rb->nr_pages << PAGE_SHIFT;
370
nr_meta_pages = RINGBUF_NR_META_PAGES;
371
nr_data_pages = map->max_entries >> PAGE_SHIFT;
372
usage += (nr_meta_pages + 2 * nr_data_pages) * sizeof(struct page *);
373
return usage;
374
}
375
376
BTF_ID_LIST_SINGLE(ringbuf_map_btf_ids, struct, bpf_ringbuf_map)
377
const struct bpf_map_ops ringbuf_map_ops = {
378
.map_meta_equal = bpf_map_meta_equal,
379
.map_alloc = ringbuf_map_alloc,
380
.map_free = ringbuf_map_free,
381
.map_mmap = ringbuf_map_mmap_kern,
382
.map_poll = ringbuf_map_poll_kern,
383
.map_lookup_elem = ringbuf_map_lookup_elem,
384
.map_update_elem = ringbuf_map_update_elem,
385
.map_delete_elem = ringbuf_map_delete_elem,
386
.map_get_next_key = ringbuf_map_get_next_key,
387
.map_mem_usage = ringbuf_map_mem_usage,
388
.map_btf_id = &ringbuf_map_btf_ids[0],
389
};
390
391
BTF_ID_LIST_SINGLE(user_ringbuf_map_btf_ids, struct, bpf_ringbuf_map)
392
const struct bpf_map_ops user_ringbuf_map_ops = {
393
.map_meta_equal = bpf_map_meta_equal,
394
.map_alloc = ringbuf_map_alloc,
395
.map_free = ringbuf_map_free,
396
.map_mmap = ringbuf_map_mmap_user,
397
.map_poll = ringbuf_map_poll_user,
398
.map_lookup_elem = ringbuf_map_lookup_elem,
399
.map_update_elem = ringbuf_map_update_elem,
400
.map_delete_elem = ringbuf_map_delete_elem,
401
.map_get_next_key = ringbuf_map_get_next_key,
402
.map_mem_usage = ringbuf_map_mem_usage,
403
.map_btf_id = &user_ringbuf_map_btf_ids[0],
404
};
405
406
/* Given pointer to ring buffer record metadata and struct bpf_ringbuf itself,
407
* calculate offset from record metadata to ring buffer in pages, rounded
408
* down. This page offset is stored as part of record metadata and allows to
409
* restore struct bpf_ringbuf * from record pointer. This page offset is
410
* stored at offset 4 of record metadata header.
411
*/
412
static size_t bpf_ringbuf_rec_pg_off(struct bpf_ringbuf *rb,
413
struct bpf_ringbuf_hdr *hdr)
414
{
415
return ((void *)hdr - (void *)rb) >> PAGE_SHIFT;
416
}
417
418
/* Given pointer to ring buffer record header, restore pointer to struct
419
* bpf_ringbuf itself by using page offset stored at offset 4
420
*/
421
static struct bpf_ringbuf *
422
bpf_ringbuf_restore_from_rec(struct bpf_ringbuf_hdr *hdr)
423
{
424
unsigned long addr = (unsigned long)(void *)hdr;
425
unsigned long off = (unsigned long)hdr->pg_off << PAGE_SHIFT;
426
427
return (void*)((addr & PAGE_MASK) - off);
428
}
429
430
static bool bpf_ringbuf_has_space(const struct bpf_ringbuf *rb,
431
unsigned long new_prod_pos,
432
unsigned long cons_pos,
433
unsigned long pend_pos)
434
{
435
/*
436
* No space if oldest not yet committed record until the newest
437
* record span more than (ringbuf_size - 1).
438
*/
439
if (new_prod_pos - pend_pos > rb->mask)
440
return false;
441
442
/* Ok, we have space in overwrite mode */
443
if (unlikely(rb->overwrite_mode))
444
return true;
445
446
/*
447
* No space if producer position advances more than (ringbuf_size - 1)
448
* ahead of consumer position when not in overwrite mode.
449
*/
450
if (new_prod_pos - cons_pos > rb->mask)
451
return false;
452
453
return true;
454
}
455
456
static u32 bpf_ringbuf_round_up_hdr_len(u32 hdr_len)
457
{
458
hdr_len &= ~BPF_RINGBUF_DISCARD_BIT;
459
return round_up(hdr_len + BPF_RINGBUF_HDR_SZ, 8);
460
}
461
462
static void *__bpf_ringbuf_reserve(struct bpf_ringbuf *rb, u64 size)
463
{
464
unsigned long cons_pos, prod_pos, new_prod_pos, pend_pos, over_pos, flags;
465
struct bpf_ringbuf_hdr *hdr;
466
u32 len, pg_off, hdr_len;
467
468
if (unlikely(size > RINGBUF_MAX_RECORD_SZ))
469
return NULL;
470
471
len = round_up(size + BPF_RINGBUF_HDR_SZ, 8);
472
if (len > ringbuf_total_data_sz(rb))
473
return NULL;
474
475
cons_pos = smp_load_acquire(&rb->consumer_pos);
476
477
if (raw_res_spin_lock_irqsave(&rb->spinlock, flags))
478
return NULL;
479
480
pend_pos = rb->pending_pos;
481
prod_pos = rb->producer_pos;
482
new_prod_pos = prod_pos + len;
483
484
while (pend_pos < prod_pos) {
485
hdr = (void *)rb->data + (pend_pos & rb->mask);
486
hdr_len = READ_ONCE(hdr->len);
487
if (hdr_len & BPF_RINGBUF_BUSY_BIT)
488
break;
489
pend_pos += bpf_ringbuf_round_up_hdr_len(hdr_len);
490
}
491
rb->pending_pos = pend_pos;
492
493
if (!bpf_ringbuf_has_space(rb, new_prod_pos, cons_pos, pend_pos)) {
494
raw_res_spin_unlock_irqrestore(&rb->spinlock, flags);
495
return NULL;
496
}
497
498
/*
499
* In overwrite mode, advance overwrite_pos when the ring buffer is full.
500
* The key points are to stay on record boundaries and consume enough records
501
* to fit the new one.
502
*/
503
if (unlikely(rb->overwrite_mode)) {
504
over_pos = rb->overwrite_pos;
505
while (new_prod_pos - over_pos > rb->mask) {
506
hdr = (void *)rb->data + (over_pos & rb->mask);
507
hdr_len = READ_ONCE(hdr->len);
508
/*
509
* The bpf_ringbuf_has_space() check above ensures we won’t
510
* step over a record currently being worked on by another
511
* producer.
512
*/
513
over_pos += bpf_ringbuf_round_up_hdr_len(hdr_len);
514
}
515
/*
516
* smp_store_release(&rb->producer_pos, new_prod_pos) at
517
* the end of the function ensures that when consumer sees
518
* the updated rb->producer_pos, it always sees the updated
519
* rb->overwrite_pos, so when consumer reads overwrite_pos
520
* after smp_load_acquire(r->producer_pos), the overwrite_pos
521
* will always be valid.
522
*/
523
WRITE_ONCE(rb->overwrite_pos, over_pos);
524
}
525
526
hdr = (void *)rb->data + (prod_pos & rb->mask);
527
pg_off = bpf_ringbuf_rec_pg_off(rb, hdr);
528
hdr->len = size | BPF_RINGBUF_BUSY_BIT;
529
hdr->pg_off = pg_off;
530
531
/* pairs with consumer's smp_load_acquire() */
532
smp_store_release(&rb->producer_pos, new_prod_pos);
533
534
raw_res_spin_unlock_irqrestore(&rb->spinlock, flags);
535
536
return (void *)hdr + BPF_RINGBUF_HDR_SZ;
537
}
538
539
BPF_CALL_3(bpf_ringbuf_reserve, struct bpf_map *, map, u64, size, u64, flags)
540
{
541
struct bpf_ringbuf_map *rb_map;
542
543
if (unlikely(flags))
544
return 0;
545
546
rb_map = container_of(map, struct bpf_ringbuf_map, map);
547
return (unsigned long)__bpf_ringbuf_reserve(rb_map->rb, size);
548
}
549
550
const struct bpf_func_proto bpf_ringbuf_reserve_proto = {
551
.func = bpf_ringbuf_reserve,
552
.ret_type = RET_PTR_TO_RINGBUF_MEM_OR_NULL,
553
.arg1_type = ARG_CONST_MAP_PTR,
554
.arg2_type = ARG_CONST_ALLOC_SIZE_OR_ZERO,
555
.arg3_type = ARG_ANYTHING,
556
};
557
558
static void bpf_ringbuf_commit(void *sample, u64 flags, bool discard)
559
{
560
unsigned long rec_pos, cons_pos;
561
struct bpf_ringbuf_hdr *hdr;
562
struct bpf_ringbuf *rb;
563
u32 new_len;
564
565
hdr = sample - BPF_RINGBUF_HDR_SZ;
566
rb = bpf_ringbuf_restore_from_rec(hdr);
567
new_len = hdr->len ^ BPF_RINGBUF_BUSY_BIT;
568
if (discard)
569
new_len |= BPF_RINGBUF_DISCARD_BIT;
570
571
/* update record header with correct final size prefix */
572
xchg(&hdr->len, new_len);
573
574
/* if consumer caught up and is waiting for our record, notify about
575
* new data availability
576
*/
577
rec_pos = (void *)hdr - (void *)rb->data;
578
cons_pos = smp_load_acquire(&rb->consumer_pos) & rb->mask;
579
580
if (flags & BPF_RB_FORCE_WAKEUP)
581
irq_work_queue(&rb->work);
582
else if (cons_pos == rec_pos && !(flags & BPF_RB_NO_WAKEUP))
583
irq_work_queue(&rb->work);
584
}
585
586
BPF_CALL_2(bpf_ringbuf_submit, void *, sample, u64, flags)
587
{
588
bpf_ringbuf_commit(sample, flags, false /* discard */);
589
return 0;
590
}
591
592
const struct bpf_func_proto bpf_ringbuf_submit_proto = {
593
.func = bpf_ringbuf_submit,
594
.ret_type = RET_VOID,
595
.arg1_type = ARG_PTR_TO_RINGBUF_MEM | OBJ_RELEASE,
596
.arg2_type = ARG_ANYTHING,
597
};
598
599
BPF_CALL_2(bpf_ringbuf_discard, void *, sample, u64, flags)
600
{
601
bpf_ringbuf_commit(sample, flags, true /* discard */);
602
return 0;
603
}
604
605
const struct bpf_func_proto bpf_ringbuf_discard_proto = {
606
.func = bpf_ringbuf_discard,
607
.ret_type = RET_VOID,
608
.arg1_type = ARG_PTR_TO_RINGBUF_MEM | OBJ_RELEASE,
609
.arg2_type = ARG_ANYTHING,
610
};
611
612
BPF_CALL_4(bpf_ringbuf_output, struct bpf_map *, map, void *, data, u64, size,
613
u64, flags)
614
{
615
struct bpf_ringbuf_map *rb_map;
616
void *rec;
617
618
if (unlikely(flags & ~(BPF_RB_NO_WAKEUP | BPF_RB_FORCE_WAKEUP)))
619
return -EINVAL;
620
621
rb_map = container_of(map, struct bpf_ringbuf_map, map);
622
rec = __bpf_ringbuf_reserve(rb_map->rb, size);
623
if (!rec)
624
return -EAGAIN;
625
626
memcpy(rec, data, size);
627
bpf_ringbuf_commit(rec, flags, false /* discard */);
628
return 0;
629
}
630
631
const struct bpf_func_proto bpf_ringbuf_output_proto = {
632
.func = bpf_ringbuf_output,
633
.ret_type = RET_INTEGER,
634
.arg1_type = ARG_CONST_MAP_PTR,
635
.arg2_type = ARG_PTR_TO_MEM | MEM_RDONLY,
636
.arg3_type = ARG_CONST_SIZE_OR_ZERO,
637
.arg4_type = ARG_ANYTHING,
638
};
639
640
BPF_CALL_2(bpf_ringbuf_query, struct bpf_map *, map, u64, flags)
641
{
642
struct bpf_ringbuf *rb;
643
644
rb = container_of(map, struct bpf_ringbuf_map, map)->rb;
645
646
switch (flags) {
647
case BPF_RB_AVAIL_DATA:
648
return ringbuf_avail_data_sz(rb);
649
case BPF_RB_RING_SIZE:
650
return ringbuf_total_data_sz(rb);
651
case BPF_RB_CONS_POS:
652
return smp_load_acquire(&rb->consumer_pos);
653
case BPF_RB_PROD_POS:
654
return smp_load_acquire(&rb->producer_pos);
655
case BPF_RB_OVERWRITE_POS:
656
return smp_load_acquire(&rb->overwrite_pos);
657
default:
658
return 0;
659
}
660
}
661
662
const struct bpf_func_proto bpf_ringbuf_query_proto = {
663
.func = bpf_ringbuf_query,
664
.ret_type = RET_INTEGER,
665
.arg1_type = ARG_CONST_MAP_PTR,
666
.arg2_type = ARG_ANYTHING,
667
};
668
669
BPF_CALL_4(bpf_ringbuf_reserve_dynptr, struct bpf_map *, map, u32, size, u64, flags,
670
struct bpf_dynptr_kern *, ptr)
671
{
672
struct bpf_ringbuf_map *rb_map;
673
void *sample;
674
int err;
675
676
if (unlikely(flags)) {
677
bpf_dynptr_set_null(ptr);
678
return -EINVAL;
679
}
680
681
err = bpf_dynptr_check_size(size);
682
if (err) {
683
bpf_dynptr_set_null(ptr);
684
return err;
685
}
686
687
rb_map = container_of(map, struct bpf_ringbuf_map, map);
688
689
sample = __bpf_ringbuf_reserve(rb_map->rb, size);
690
if (!sample) {
691
bpf_dynptr_set_null(ptr);
692
return -EINVAL;
693
}
694
695
bpf_dynptr_init(ptr, sample, BPF_DYNPTR_TYPE_RINGBUF, 0, size);
696
697
return 0;
698
}
699
700
const struct bpf_func_proto bpf_ringbuf_reserve_dynptr_proto = {
701
.func = bpf_ringbuf_reserve_dynptr,
702
.ret_type = RET_INTEGER,
703
.arg1_type = ARG_CONST_MAP_PTR,
704
.arg2_type = ARG_ANYTHING,
705
.arg3_type = ARG_ANYTHING,
706
.arg4_type = ARG_PTR_TO_DYNPTR | DYNPTR_TYPE_RINGBUF | MEM_UNINIT | MEM_WRITE,
707
};
708
709
BPF_CALL_2(bpf_ringbuf_submit_dynptr, struct bpf_dynptr_kern *, ptr, u64, flags)
710
{
711
if (!ptr->data)
712
return 0;
713
714
bpf_ringbuf_commit(ptr->data, flags, false /* discard */);
715
716
bpf_dynptr_set_null(ptr);
717
718
return 0;
719
}
720
721
const struct bpf_func_proto bpf_ringbuf_submit_dynptr_proto = {
722
.func = bpf_ringbuf_submit_dynptr,
723
.ret_type = RET_VOID,
724
.arg1_type = ARG_PTR_TO_DYNPTR | DYNPTR_TYPE_RINGBUF | OBJ_RELEASE,
725
.arg2_type = ARG_ANYTHING,
726
};
727
728
BPF_CALL_2(bpf_ringbuf_discard_dynptr, struct bpf_dynptr_kern *, ptr, u64, flags)
729
{
730
if (!ptr->data)
731
return 0;
732
733
bpf_ringbuf_commit(ptr->data, flags, true /* discard */);
734
735
bpf_dynptr_set_null(ptr);
736
737
return 0;
738
}
739
740
const struct bpf_func_proto bpf_ringbuf_discard_dynptr_proto = {
741
.func = bpf_ringbuf_discard_dynptr,
742
.ret_type = RET_VOID,
743
.arg1_type = ARG_PTR_TO_DYNPTR | DYNPTR_TYPE_RINGBUF | OBJ_RELEASE,
744
.arg2_type = ARG_ANYTHING,
745
};
746
747
static int __bpf_user_ringbuf_peek(struct bpf_ringbuf *rb, void **sample, u32 *size)
748
{
749
int err;
750
u32 hdr_len, sample_len, total_len, flags, *hdr;
751
u64 cons_pos, prod_pos;
752
753
/* Synchronizes with smp_store_release() in user-space producer. */
754
prod_pos = smp_load_acquire(&rb->producer_pos);
755
if (prod_pos % 8)
756
return -EINVAL;
757
758
/* Synchronizes with smp_store_release() in __bpf_user_ringbuf_sample_release() */
759
cons_pos = smp_load_acquire(&rb->consumer_pos);
760
if (cons_pos >= prod_pos)
761
return -ENODATA;
762
763
hdr = (u32 *)((uintptr_t)rb->data + (uintptr_t)(cons_pos & rb->mask));
764
/* Synchronizes with smp_store_release() in user-space producer. */
765
hdr_len = smp_load_acquire(hdr);
766
flags = hdr_len & (BPF_RINGBUF_BUSY_BIT | BPF_RINGBUF_DISCARD_BIT);
767
sample_len = hdr_len & ~flags;
768
total_len = round_up(sample_len + BPF_RINGBUF_HDR_SZ, 8);
769
770
/* The sample must fit within the region advertised by the producer position. */
771
if (total_len > prod_pos - cons_pos)
772
return -EINVAL;
773
774
/* The sample must fit within the data region of the ring buffer. */
775
if (total_len > ringbuf_total_data_sz(rb))
776
return -E2BIG;
777
778
/* The sample must fit into a struct bpf_dynptr. */
779
err = bpf_dynptr_check_size(sample_len);
780
if (err)
781
return -E2BIG;
782
783
if (flags & BPF_RINGBUF_DISCARD_BIT) {
784
/* If the discard bit is set, the sample should be skipped.
785
*
786
* Update the consumer pos, and return -EAGAIN so the caller
787
* knows to skip this sample and try to read the next one.
788
*/
789
smp_store_release(&rb->consumer_pos, cons_pos + total_len);
790
return -EAGAIN;
791
}
792
793
if (flags & BPF_RINGBUF_BUSY_BIT)
794
return -ENODATA;
795
796
*sample = (void *)((uintptr_t)rb->data +
797
(uintptr_t)((cons_pos + BPF_RINGBUF_HDR_SZ) & rb->mask));
798
*size = sample_len;
799
return 0;
800
}
801
802
static void __bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size, u64 flags)
803
{
804
u64 consumer_pos;
805
u32 rounded_size = round_up(size + BPF_RINGBUF_HDR_SZ, 8);
806
807
/* Using smp_load_acquire() is unnecessary here, as the busy-bit
808
* prevents another task from writing to consumer_pos after it was read
809
* by this task with smp_load_acquire() in __bpf_user_ringbuf_peek().
810
*/
811
consumer_pos = rb->consumer_pos;
812
/* Synchronizes with smp_load_acquire() in user-space producer. */
813
smp_store_release(&rb->consumer_pos, consumer_pos + rounded_size);
814
}
815
816
BPF_CALL_4(bpf_user_ringbuf_drain, struct bpf_map *, map,
817
void *, callback_fn, void *, callback_ctx, u64, flags)
818
{
819
struct bpf_ringbuf *rb;
820
long samples, discarded_samples = 0, ret = 0;
821
bpf_callback_t callback = (bpf_callback_t)callback_fn;
822
u64 wakeup_flags = BPF_RB_NO_WAKEUP | BPF_RB_FORCE_WAKEUP;
823
int busy = 0;
824
825
if (unlikely(flags & ~wakeup_flags))
826
return -EINVAL;
827
828
rb = container_of(map, struct bpf_ringbuf_map, map)->rb;
829
830
/* If another consumer is already consuming a sample, wait for them to finish. */
831
if (!atomic_try_cmpxchg(&rb->busy, &busy, 1))
832
return -EBUSY;
833
834
for (samples = 0; samples < BPF_MAX_USER_RINGBUF_SAMPLES && ret == 0; samples++) {
835
int err;
836
u32 size;
837
void *sample;
838
struct bpf_dynptr_kern dynptr;
839
840
err = __bpf_user_ringbuf_peek(rb, &sample, &size);
841
if (err) {
842
if (err == -ENODATA) {
843
break;
844
} else if (err == -EAGAIN) {
845
discarded_samples++;
846
continue;
847
} else {
848
ret = err;
849
goto schedule_work_return;
850
}
851
}
852
853
bpf_dynptr_init(&dynptr, sample, BPF_DYNPTR_TYPE_LOCAL, 0, size);
854
ret = callback((uintptr_t)&dynptr, (uintptr_t)callback_ctx, 0, 0, 0);
855
__bpf_user_ringbuf_sample_release(rb, size, flags);
856
}
857
ret = samples - discarded_samples;
858
859
schedule_work_return:
860
/* Prevent the clearing of the busy-bit from being reordered before the
861
* storing of any rb consumer or producer positions.
862
*/
863
atomic_set_release(&rb->busy, 0);
864
865
if (flags & BPF_RB_FORCE_WAKEUP)
866
irq_work_queue(&rb->work);
867
else if (!(flags & BPF_RB_NO_WAKEUP) && samples > 0)
868
irq_work_queue(&rb->work);
869
return ret;
870
}
871
872
const struct bpf_func_proto bpf_user_ringbuf_drain_proto = {
873
.func = bpf_user_ringbuf_drain,
874
.ret_type = RET_INTEGER,
875
.arg1_type = ARG_CONST_MAP_PTR,
876
.arg2_type = ARG_PTR_TO_FUNC,
877
.arg3_type = ARG_PTR_TO_STACK_OR_NULL,
878
.arg4_type = ARG_ANYTHING,
879
};
880
881