Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/io_uring/io_uring.c
49429 views
1
// SPDX-License-Identifier: GPL-2.0
2
/*
3
* Shared application/kernel submission and completion ring pairs, for
4
* supporting fast/efficient IO.
5
*
6
* A note on the read/write ordering memory barriers that are matched between
7
* the application and kernel side.
8
*
9
* After the application reads the CQ ring tail, it must use an
10
* appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
11
* before writing the tail (using smp_load_acquire to read the tail will
12
* do). It also needs a smp_mb() before updating CQ head (ordering the
13
* entry load(s) with the head store), pairing with an implicit barrier
14
* through a control-dependency in io_get_cqe (smp_store_release to
15
* store head will do). Failure to do so could lead to reading invalid
16
* CQ entries.
17
*
18
* Likewise, the application must use an appropriate smp_wmb() before
19
* writing the SQ tail (ordering SQ entry stores with the tail store),
20
* which pairs with smp_load_acquire in io_get_sqring (smp_store_release
21
* to store the tail will do). And it needs a barrier ordering the SQ
22
* head load before writing new SQ entries (smp_load_acquire to read
23
* head will do).
24
*
25
* When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
26
* needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
27
* updating the SQ tail; a full memory barrier smp_mb() is needed
28
* between.
29
*
30
* Also see the examples in the liburing library:
31
*
32
* git://git.kernel.org/pub/scm/linux/kernel/git/axboe/liburing.git
33
*
34
* io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
35
* from data shared between the kernel and application. This is done both
36
* for ordering purposes, but also to ensure that once a value is loaded from
37
* data that the application could potentially modify, it remains stable.
38
*
39
* Copyright (C) 2018-2019 Jens Axboe
40
* Copyright (c) 2018-2019 Christoph Hellwig
41
*/
42
#include <linux/kernel.h>
43
#include <linux/init.h>
44
#include <linux/errno.h>
45
#include <linux/syscalls.h>
46
#include <net/compat.h>
47
#include <linux/refcount.h>
48
#include <linux/uio.h>
49
#include <linux/bits.h>
50
51
#include <linux/sched/signal.h>
52
#include <linux/fs.h>
53
#include <linux/file.h>
54
#include <linux/mm.h>
55
#include <linux/mman.h>
56
#include <linux/percpu.h>
57
#include <linux/slab.h>
58
#include <linux/bvec.h>
59
#include <linux/net.h>
60
#include <net/sock.h>
61
#include <linux/anon_inodes.h>
62
#include <linux/sched/mm.h>
63
#include <linux/uaccess.h>
64
#include <linux/nospec.h>
65
#include <linux/fsnotify.h>
66
#include <linux/fadvise.h>
67
#include <linux/task_work.h>
68
#include <linux/io_uring.h>
69
#include <linux/io_uring/cmd.h>
70
#include <linux/audit.h>
71
#include <linux/security.h>
72
#include <linux/jump_label.h>
73
#include <asm/shmparam.h>
74
75
#define CREATE_TRACE_POINTS
76
#include <trace/events/io_uring.h>
77
78
#include <uapi/linux/io_uring.h>
79
80
#include "io-wq.h"
81
82
#include "filetable.h"
83
#include "io_uring.h"
84
#include "opdef.h"
85
#include "refs.h"
86
#include "tctx.h"
87
#include "register.h"
88
#include "sqpoll.h"
89
#include "fdinfo.h"
90
#include "kbuf.h"
91
#include "rsrc.h"
92
#include "cancel.h"
93
#include "net.h"
94
#include "notif.h"
95
#include "waitid.h"
96
#include "futex.h"
97
#include "napi.h"
98
#include "uring_cmd.h"
99
#include "msg_ring.h"
100
#include "memmap.h"
101
#include "zcrx.h"
102
103
#include "timeout.h"
104
#include "poll.h"
105
#include "rw.h"
106
#include "alloc_cache.h"
107
#include "eventfd.h"
108
109
#define SQE_COMMON_FLAGS (IOSQE_FIXED_FILE | IOSQE_IO_LINK | \
110
IOSQE_IO_HARDLINK | IOSQE_ASYNC)
111
112
#define IO_REQ_LINK_FLAGS (REQ_F_LINK | REQ_F_HARDLINK)
113
114
#define IO_REQ_CLEAN_FLAGS (REQ_F_BUFFER_SELECTED | REQ_F_NEED_CLEANUP | \
115
REQ_F_INFLIGHT | REQ_F_CREDS | REQ_F_ASYNC_DATA)
116
117
#define IO_REQ_CLEAN_SLOW_FLAGS (REQ_F_REFCOUNT | IO_REQ_LINK_FLAGS | \
118
REQ_F_REISSUE | REQ_F_POLLED | \
119
IO_REQ_CLEAN_FLAGS)
120
121
#define IO_TCTX_REFS_CACHE_NR (1U << 10)
122
123
#define IO_COMPL_BATCH 32
124
#define IO_REQ_ALLOC_BATCH 8
125
#define IO_LOCAL_TW_DEFAULT_MAX 20
126
127
/* requests with any of those set should undergo io_disarm_next() */
128
#define IO_DISARM_MASK (REQ_F_ARM_LTIMEOUT | REQ_F_LINK_TIMEOUT | REQ_F_FAIL)
129
130
/*
131
* No waiters. It's larger than any valid value of the tw counter
132
* so that tests against ->cq_wait_nr would fail and skip wake_up().
133
*/
134
#define IO_CQ_WAKE_INIT (-1U)
135
/* Forced wake up if there is a waiter regardless of ->cq_wait_nr */
136
#define IO_CQ_WAKE_FORCE (IO_CQ_WAKE_INIT >> 1)
137
138
static void io_queue_sqe(struct io_kiocb *req, unsigned int extra_flags);
139
static void __io_req_caches_free(struct io_ring_ctx *ctx);
140
141
static __read_mostly DEFINE_STATIC_KEY_FALSE(io_key_has_sqarray);
142
143
struct kmem_cache *req_cachep;
144
static struct workqueue_struct *iou_wq __ro_after_init;
145
146
static int __read_mostly sysctl_io_uring_disabled;
147
static int __read_mostly sysctl_io_uring_group = -1;
148
149
#ifdef CONFIG_SYSCTL
150
static const struct ctl_table kernel_io_uring_disabled_table[] = {
151
{
152
.procname = "io_uring_disabled",
153
.data = &sysctl_io_uring_disabled,
154
.maxlen = sizeof(sysctl_io_uring_disabled),
155
.mode = 0644,
156
.proc_handler = proc_dointvec_minmax,
157
.extra1 = SYSCTL_ZERO,
158
.extra2 = SYSCTL_TWO,
159
},
160
{
161
.procname = "io_uring_group",
162
.data = &sysctl_io_uring_group,
163
.maxlen = sizeof(gid_t),
164
.mode = 0644,
165
.proc_handler = proc_dointvec,
166
},
167
};
168
#endif
169
170
static void io_poison_cached_req(struct io_kiocb *req)
171
{
172
req->ctx = IO_URING_PTR_POISON;
173
req->tctx = IO_URING_PTR_POISON;
174
req->file = IO_URING_PTR_POISON;
175
req->creds = IO_URING_PTR_POISON;
176
req->io_task_work.func = IO_URING_PTR_POISON;
177
req->apoll = IO_URING_PTR_POISON;
178
}
179
180
static void io_poison_req(struct io_kiocb *req)
181
{
182
io_poison_cached_req(req);
183
req->async_data = IO_URING_PTR_POISON;
184
req->kbuf = IO_URING_PTR_POISON;
185
req->comp_list.next = IO_URING_PTR_POISON;
186
req->file_node = IO_URING_PTR_POISON;
187
req->link = IO_URING_PTR_POISON;
188
}
189
190
static inline unsigned int __io_cqring_events(struct io_ring_ctx *ctx)
191
{
192
return ctx->cached_cq_tail - READ_ONCE(ctx->rings->cq.head);
193
}
194
195
static inline unsigned int __io_cqring_events_user(struct io_ring_ctx *ctx)
196
{
197
return READ_ONCE(ctx->rings->cq.tail) - READ_ONCE(ctx->rings->cq.head);
198
}
199
200
static inline void req_fail_link_node(struct io_kiocb *req, int res)
201
{
202
req_set_fail(req);
203
io_req_set_res(req, res, 0);
204
}
205
206
static inline void io_req_add_to_cache(struct io_kiocb *req, struct io_ring_ctx *ctx)
207
{
208
if (IS_ENABLED(CONFIG_KASAN))
209
io_poison_cached_req(req);
210
wq_stack_add_head(&req->comp_list, &ctx->submit_state.free_list);
211
}
212
213
static __cold void io_ring_ctx_ref_free(struct percpu_ref *ref)
214
{
215
struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
216
217
complete(&ctx->ref_comp);
218
}
219
220
/*
221
* Terminate the request if either of these conditions are true:
222
*
223
* 1) It's being executed by the original task, but that task is marked
224
* with PF_EXITING as it's exiting.
225
* 2) PF_KTHREAD is set, in which case the invoker of the task_work is
226
* our fallback task_work.
227
* 3) The ring has been closed and is going away.
228
*/
229
static inline bool io_should_terminate_tw(struct io_ring_ctx *ctx)
230
{
231
return (current->flags & (PF_EXITING | PF_KTHREAD)) || percpu_ref_is_dying(&ctx->refs);
232
}
233
234
static __cold void io_fallback_req_func(struct work_struct *work)
235
{
236
struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx,
237
fallback_work.work);
238
struct llist_node *node = llist_del_all(&ctx->fallback_llist);
239
struct io_kiocb *req, *tmp;
240
struct io_tw_state ts = {};
241
242
percpu_ref_get(&ctx->refs);
243
mutex_lock(&ctx->uring_lock);
244
ts.cancel = io_should_terminate_tw(ctx);
245
llist_for_each_entry_safe(req, tmp, node, io_task_work.node)
246
req->io_task_work.func((struct io_tw_req){req}, ts);
247
io_submit_flush_completions(ctx);
248
mutex_unlock(&ctx->uring_lock);
249
percpu_ref_put(&ctx->refs);
250
}
251
252
static int io_alloc_hash_table(struct io_hash_table *table, unsigned bits)
253
{
254
unsigned int hash_buckets;
255
int i;
256
257
do {
258
hash_buckets = 1U << bits;
259
table->hbs = kvmalloc_array(hash_buckets, sizeof(table->hbs[0]),
260
GFP_KERNEL_ACCOUNT);
261
if (table->hbs)
262
break;
263
if (bits == 1)
264
return -ENOMEM;
265
bits--;
266
} while (1);
267
268
table->hash_bits = bits;
269
for (i = 0; i < hash_buckets; i++)
270
INIT_HLIST_HEAD(&table->hbs[i].list);
271
return 0;
272
}
273
274
static void io_free_alloc_caches(struct io_ring_ctx *ctx)
275
{
276
io_alloc_cache_free(&ctx->apoll_cache, kfree);
277
io_alloc_cache_free(&ctx->netmsg_cache, io_netmsg_cache_free);
278
io_alloc_cache_free(&ctx->rw_cache, io_rw_cache_free);
279
io_alloc_cache_free(&ctx->cmd_cache, io_cmd_cache_free);
280
io_futex_cache_free(ctx);
281
io_rsrc_cache_free(ctx);
282
}
283
284
static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
285
{
286
struct io_ring_ctx *ctx;
287
int hash_bits;
288
bool ret;
289
290
ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
291
if (!ctx)
292
return NULL;
293
294
xa_init(&ctx->io_bl_xa);
295
296
/*
297
* Use 5 bits less than the max cq entries, that should give us around
298
* 32 entries per hash list if totally full and uniformly spread, but
299
* don't keep too many buckets to not overconsume memory.
300
*/
301
hash_bits = ilog2(p->cq_entries) - 5;
302
hash_bits = clamp(hash_bits, 1, 8);
303
if (io_alloc_hash_table(&ctx->cancel_table, hash_bits))
304
goto err;
305
if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free,
306
0, GFP_KERNEL))
307
goto err;
308
309
ctx->flags = p->flags;
310
ctx->hybrid_poll_time = LLONG_MAX;
311
atomic_set(&ctx->cq_wait_nr, IO_CQ_WAKE_INIT);
312
init_waitqueue_head(&ctx->sqo_sq_wait);
313
INIT_LIST_HEAD(&ctx->sqd_list);
314
INIT_LIST_HEAD(&ctx->cq_overflow_list);
315
ret = io_alloc_cache_init(&ctx->apoll_cache, IO_POLL_ALLOC_CACHE_MAX,
316
sizeof(struct async_poll), 0);
317
ret |= io_alloc_cache_init(&ctx->netmsg_cache, IO_ALLOC_CACHE_MAX,
318
sizeof(struct io_async_msghdr),
319
offsetof(struct io_async_msghdr, clear));
320
ret |= io_alloc_cache_init(&ctx->rw_cache, IO_ALLOC_CACHE_MAX,
321
sizeof(struct io_async_rw),
322
offsetof(struct io_async_rw, clear));
323
ret |= io_alloc_cache_init(&ctx->cmd_cache, IO_ALLOC_CACHE_MAX,
324
sizeof(struct io_async_cmd),
325
sizeof(struct io_async_cmd));
326
ret |= io_futex_cache_init(ctx);
327
ret |= io_rsrc_cache_init(ctx);
328
if (ret)
329
goto free_ref;
330
init_completion(&ctx->ref_comp);
331
xa_init_flags(&ctx->personalities, XA_FLAGS_ALLOC1);
332
mutex_init(&ctx->uring_lock);
333
init_waitqueue_head(&ctx->cq_wait);
334
init_waitqueue_head(&ctx->poll_wq);
335
spin_lock_init(&ctx->completion_lock);
336
raw_spin_lock_init(&ctx->timeout_lock);
337
INIT_WQ_LIST(&ctx->iopoll_list);
338
INIT_LIST_HEAD(&ctx->defer_list);
339
INIT_LIST_HEAD(&ctx->timeout_list);
340
INIT_LIST_HEAD(&ctx->ltimeout_list);
341
init_llist_head(&ctx->work_llist);
342
INIT_LIST_HEAD(&ctx->tctx_list);
343
mutex_init(&ctx->tctx_lock);
344
ctx->submit_state.free_list.next = NULL;
345
INIT_HLIST_HEAD(&ctx->waitid_list);
346
xa_init_flags(&ctx->zcrx_ctxs, XA_FLAGS_ALLOC);
347
#ifdef CONFIG_FUTEX
348
INIT_HLIST_HEAD(&ctx->futex_list);
349
#endif
350
INIT_DELAYED_WORK(&ctx->fallback_work, io_fallback_req_func);
351
INIT_WQ_LIST(&ctx->submit_state.compl_reqs);
352
INIT_HLIST_HEAD(&ctx->cancelable_uring_cmd);
353
io_napi_init(ctx);
354
mutex_init(&ctx->mmap_lock);
355
356
return ctx;
357
358
free_ref:
359
percpu_ref_exit(&ctx->refs);
360
err:
361
io_free_alloc_caches(ctx);
362
kvfree(ctx->cancel_table.hbs);
363
xa_destroy(&ctx->io_bl_xa);
364
kfree(ctx);
365
return NULL;
366
}
367
368
static void io_clean_op(struct io_kiocb *req)
369
{
370
if (unlikely(req->flags & REQ_F_BUFFER_SELECTED))
371
io_kbuf_drop_legacy(req);
372
373
if (req->flags & REQ_F_NEED_CLEANUP) {
374
const struct io_cold_def *def = &io_cold_defs[req->opcode];
375
376
if (def->cleanup)
377
def->cleanup(req);
378
}
379
if (req->flags & REQ_F_INFLIGHT)
380
atomic_dec(&req->tctx->inflight_tracked);
381
if (req->flags & REQ_F_CREDS)
382
put_cred(req->creds);
383
if (req->flags & REQ_F_ASYNC_DATA) {
384
kfree(req->async_data);
385
req->async_data = NULL;
386
}
387
req->flags &= ~IO_REQ_CLEAN_FLAGS;
388
}
389
390
/*
391
* Mark the request as inflight, so that file cancelation will find it.
392
* Can be used if the file is an io_uring instance, or if the request itself
393
* relies on ->mm being alive for the duration of the request.
394
*/
395
inline void io_req_track_inflight(struct io_kiocb *req)
396
{
397
if (!(req->flags & REQ_F_INFLIGHT)) {
398
req->flags |= REQ_F_INFLIGHT;
399
atomic_inc(&req->tctx->inflight_tracked);
400
}
401
}
402
403
static struct io_kiocb *__io_prep_linked_timeout(struct io_kiocb *req)
404
{
405
if (WARN_ON_ONCE(!req->link))
406
return NULL;
407
408
req->flags &= ~REQ_F_ARM_LTIMEOUT;
409
req->flags |= REQ_F_LINK_TIMEOUT;
410
411
/* linked timeouts should have two refs once prep'ed */
412
io_req_set_refcount(req);
413
__io_req_set_refcount(req->link, 2);
414
return req->link;
415
}
416
417
static void io_prep_async_work(struct io_kiocb *req)
418
{
419
const struct io_issue_def *def = &io_issue_defs[req->opcode];
420
struct io_ring_ctx *ctx = req->ctx;
421
422
if (!(req->flags & REQ_F_CREDS)) {
423
req->flags |= REQ_F_CREDS;
424
req->creds = get_current_cred();
425
}
426
427
req->work.list.next = NULL;
428
atomic_set(&req->work.flags, 0);
429
if (req->flags & REQ_F_FORCE_ASYNC)
430
atomic_or(IO_WQ_WORK_CONCURRENT, &req->work.flags);
431
432
if (req->file && !(req->flags & REQ_F_FIXED_FILE))
433
req->flags |= io_file_get_flags(req->file);
434
435
if (req->file && (req->flags & REQ_F_ISREG)) {
436
bool should_hash = def->hash_reg_file;
437
438
/* don't serialize this request if the fs doesn't need it */
439
if (should_hash && (req->file->f_flags & O_DIRECT) &&
440
(req->file->f_op->fop_flags & FOP_DIO_PARALLEL_WRITE))
441
should_hash = false;
442
if (should_hash || (ctx->flags & IORING_SETUP_IOPOLL))
443
io_wq_hash_work(&req->work, file_inode(req->file));
444
} else if (!req->file || !S_ISBLK(file_inode(req->file)->i_mode)) {
445
if (def->unbound_nonreg_file)
446
atomic_or(IO_WQ_WORK_UNBOUND, &req->work.flags);
447
}
448
}
449
450
static void io_prep_async_link(struct io_kiocb *req)
451
{
452
struct io_kiocb *cur;
453
454
if (req->flags & REQ_F_LINK_TIMEOUT) {
455
struct io_ring_ctx *ctx = req->ctx;
456
457
raw_spin_lock_irq(&ctx->timeout_lock);
458
io_for_each_link(cur, req)
459
io_prep_async_work(cur);
460
raw_spin_unlock_irq(&ctx->timeout_lock);
461
} else {
462
io_for_each_link(cur, req)
463
io_prep_async_work(cur);
464
}
465
}
466
467
static void io_queue_iowq(struct io_kiocb *req)
468
{
469
struct io_uring_task *tctx = req->tctx;
470
471
BUG_ON(!tctx);
472
473
if ((current->flags & PF_KTHREAD) || !tctx->io_wq) {
474
io_req_task_queue_fail(req, -ECANCELED);
475
return;
476
}
477
478
/* init ->work of the whole link before punting */
479
io_prep_async_link(req);
480
481
/*
482
* Not expected to happen, but if we do have a bug where this _can_
483
* happen, catch it here and ensure the request is marked as
484
* canceled. That will make io-wq go through the usual work cancel
485
* procedure rather than attempt to run this request (or create a new
486
* worker for it).
487
*/
488
if (WARN_ON_ONCE(!same_thread_group(tctx->task, current)))
489
atomic_or(IO_WQ_WORK_CANCEL, &req->work.flags);
490
491
trace_io_uring_queue_async_work(req, io_wq_is_hashed(&req->work));
492
io_wq_enqueue(tctx->io_wq, &req->work);
493
}
494
495
static void io_req_queue_iowq_tw(struct io_tw_req tw_req, io_tw_token_t tw)
496
{
497
io_queue_iowq(tw_req.req);
498
}
499
500
void io_req_queue_iowq(struct io_kiocb *req)
501
{
502
req->io_task_work.func = io_req_queue_iowq_tw;
503
io_req_task_work_add(req);
504
}
505
506
unsigned io_linked_nr(struct io_kiocb *req)
507
{
508
struct io_kiocb *tmp;
509
unsigned nr = 0;
510
511
io_for_each_link(tmp, req)
512
nr++;
513
return nr;
514
}
515
516
static __cold noinline void io_queue_deferred(struct io_ring_ctx *ctx)
517
{
518
bool drain_seen = false, first = true;
519
520
lockdep_assert_held(&ctx->uring_lock);
521
__io_req_caches_free(ctx);
522
523
while (!list_empty(&ctx->defer_list)) {
524
struct io_defer_entry *de = list_first_entry(&ctx->defer_list,
525
struct io_defer_entry, list);
526
527
drain_seen |= de->req->flags & REQ_F_IO_DRAIN;
528
if ((drain_seen || first) && ctx->nr_req_allocated != ctx->nr_drained)
529
return;
530
531
list_del_init(&de->list);
532
ctx->nr_drained -= io_linked_nr(de->req);
533
io_req_task_queue(de->req);
534
kfree(de);
535
first = false;
536
}
537
}
538
539
void __io_commit_cqring_flush(struct io_ring_ctx *ctx)
540
{
541
if (ctx->poll_activated)
542
io_poll_wq_wake(ctx);
543
if (ctx->off_timeout_used)
544
io_flush_timeouts(ctx);
545
if (ctx->has_evfd)
546
io_eventfd_signal(ctx, true);
547
}
548
549
static inline void __io_cq_lock(struct io_ring_ctx *ctx)
550
{
551
if (!ctx->lockless_cq)
552
spin_lock(&ctx->completion_lock);
553
}
554
555
static inline void io_cq_lock(struct io_ring_ctx *ctx)
556
__acquires(ctx->completion_lock)
557
{
558
spin_lock(&ctx->completion_lock);
559
}
560
561
static inline void __io_cq_unlock_post(struct io_ring_ctx *ctx)
562
{
563
io_commit_cqring(ctx);
564
if (!ctx->task_complete) {
565
if (!ctx->lockless_cq)
566
spin_unlock(&ctx->completion_lock);
567
/* IOPOLL rings only need to wake up if it's also SQPOLL */
568
if (!ctx->syscall_iopoll)
569
io_cqring_wake(ctx);
570
}
571
io_commit_cqring_flush(ctx);
572
}
573
574
static void io_cq_unlock_post(struct io_ring_ctx *ctx)
575
__releases(ctx->completion_lock)
576
{
577
io_commit_cqring(ctx);
578
spin_unlock(&ctx->completion_lock);
579
io_cqring_wake(ctx);
580
io_commit_cqring_flush(ctx);
581
}
582
583
static void __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool dying)
584
{
585
lockdep_assert_held(&ctx->uring_lock);
586
587
/* don't abort if we're dying, entries must get freed */
588
if (!dying && __io_cqring_events(ctx) == ctx->cq_entries)
589
return;
590
591
io_cq_lock(ctx);
592
while (!list_empty(&ctx->cq_overflow_list)) {
593
size_t cqe_size = sizeof(struct io_uring_cqe);
594
struct io_uring_cqe *cqe;
595
struct io_overflow_cqe *ocqe;
596
bool is_cqe32 = false;
597
598
ocqe = list_first_entry(&ctx->cq_overflow_list,
599
struct io_overflow_cqe, list);
600
if (ocqe->cqe.flags & IORING_CQE_F_32 ||
601
ctx->flags & IORING_SETUP_CQE32) {
602
is_cqe32 = true;
603
cqe_size <<= 1;
604
}
605
if (ctx->flags & IORING_SETUP_CQE32)
606
is_cqe32 = false;
607
608
if (!dying) {
609
if (!io_get_cqe_overflow(ctx, &cqe, true, is_cqe32))
610
break;
611
memcpy(cqe, &ocqe->cqe, cqe_size);
612
}
613
list_del(&ocqe->list);
614
kfree(ocqe);
615
616
/*
617
* For silly syzbot cases that deliberately overflow by huge
618
* amounts, check if we need to resched and drop and
619
* reacquire the locks if so. Nothing real would ever hit this.
620
* Ideally we'd have a non-posting unlock for this, but hard
621
* to care for a non-real case.
622
*/
623
if (need_resched()) {
624
ctx->cqe_sentinel = ctx->cqe_cached;
625
io_cq_unlock_post(ctx);
626
mutex_unlock(&ctx->uring_lock);
627
cond_resched();
628
mutex_lock(&ctx->uring_lock);
629
io_cq_lock(ctx);
630
}
631
}
632
633
if (list_empty(&ctx->cq_overflow_list)) {
634
clear_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq);
635
atomic_andnot(IORING_SQ_CQ_OVERFLOW, &ctx->rings->sq_flags);
636
}
637
io_cq_unlock_post(ctx);
638
}
639
640
static void io_cqring_overflow_kill(struct io_ring_ctx *ctx)
641
{
642
if (ctx->rings)
643
__io_cqring_overflow_flush(ctx, true);
644
}
645
646
static void io_cqring_do_overflow_flush(struct io_ring_ctx *ctx)
647
{
648
mutex_lock(&ctx->uring_lock);
649
__io_cqring_overflow_flush(ctx, false);
650
mutex_unlock(&ctx->uring_lock);
651
}
652
653
/* must to be called somewhat shortly after putting a request */
654
static inline void io_put_task(struct io_kiocb *req)
655
{
656
struct io_uring_task *tctx = req->tctx;
657
658
if (likely(tctx->task == current)) {
659
tctx->cached_refs++;
660
} else {
661
percpu_counter_sub(&tctx->inflight, 1);
662
if (unlikely(atomic_read(&tctx->in_cancel)))
663
wake_up(&tctx->wait);
664
put_task_struct(tctx->task);
665
}
666
}
667
668
void io_task_refs_refill(struct io_uring_task *tctx)
669
{
670
unsigned int refill = -tctx->cached_refs + IO_TCTX_REFS_CACHE_NR;
671
672
percpu_counter_add(&tctx->inflight, refill);
673
refcount_add(refill, &current->usage);
674
tctx->cached_refs += refill;
675
}
676
677
__cold void io_uring_drop_tctx_refs(struct task_struct *task)
678
{
679
struct io_uring_task *tctx = task->io_uring;
680
unsigned int refs = tctx->cached_refs;
681
682
if (refs) {
683
tctx->cached_refs = 0;
684
percpu_counter_sub(&tctx->inflight, refs);
685
put_task_struct_many(task, refs);
686
}
687
}
688
689
static __cold bool io_cqring_add_overflow(struct io_ring_ctx *ctx,
690
struct io_overflow_cqe *ocqe)
691
{
692
lockdep_assert_held(&ctx->completion_lock);
693
694
if (!ocqe) {
695
struct io_rings *r = ctx->rings;
696
697
/*
698
* If we're in ring overflow flush mode, or in task cancel mode,
699
* or cannot allocate an overflow entry, then we need to drop it
700
* on the floor.
701
*/
702
WRITE_ONCE(r->cq_overflow, READ_ONCE(r->cq_overflow) + 1);
703
set_bit(IO_CHECK_CQ_DROPPED_BIT, &ctx->check_cq);
704
return false;
705
}
706
if (list_empty(&ctx->cq_overflow_list)) {
707
set_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq);
708
atomic_or(IORING_SQ_CQ_OVERFLOW, &ctx->rings->sq_flags);
709
710
}
711
list_add_tail(&ocqe->list, &ctx->cq_overflow_list);
712
return true;
713
}
714
715
static struct io_overflow_cqe *io_alloc_ocqe(struct io_ring_ctx *ctx,
716
struct io_cqe *cqe,
717
struct io_big_cqe *big_cqe, gfp_t gfp)
718
{
719
struct io_overflow_cqe *ocqe;
720
size_t ocq_size = sizeof(struct io_overflow_cqe);
721
bool is_cqe32 = false;
722
723
if (cqe->flags & IORING_CQE_F_32 || ctx->flags & IORING_SETUP_CQE32) {
724
is_cqe32 = true;
725
ocq_size += sizeof(struct io_uring_cqe);
726
}
727
728
ocqe = kzalloc(ocq_size, gfp | __GFP_ACCOUNT);
729
trace_io_uring_cqe_overflow(ctx, cqe->user_data, cqe->res, cqe->flags, ocqe);
730
if (ocqe) {
731
ocqe->cqe.user_data = cqe->user_data;
732
ocqe->cqe.res = cqe->res;
733
ocqe->cqe.flags = cqe->flags;
734
if (is_cqe32 && big_cqe) {
735
ocqe->cqe.big_cqe[0] = big_cqe->extra1;
736
ocqe->cqe.big_cqe[1] = big_cqe->extra2;
737
}
738
}
739
if (big_cqe)
740
big_cqe->extra1 = big_cqe->extra2 = 0;
741
return ocqe;
742
}
743
744
/*
745
* Fill an empty dummy CQE, in case alignment is off for posting a 32b CQE
746
* because the ring is a single 16b entry away from wrapping.
747
*/
748
static bool io_fill_nop_cqe(struct io_ring_ctx *ctx, unsigned int off)
749
{
750
if (__io_cqring_events(ctx) < ctx->cq_entries) {
751
struct io_uring_cqe *cqe = &ctx->rings->cqes[off];
752
753
cqe->user_data = 0;
754
cqe->res = 0;
755
cqe->flags = IORING_CQE_F_SKIP;
756
ctx->cached_cq_tail++;
757
return true;
758
}
759
return false;
760
}
761
762
/*
763
* writes to the cq entry need to come after reading head; the
764
* control dependency is enough as we're using WRITE_ONCE to
765
* fill the cq entry
766
*/
767
bool io_cqe_cache_refill(struct io_ring_ctx *ctx, bool overflow, bool cqe32)
768
{
769
struct io_rings *rings = ctx->rings;
770
unsigned int off = ctx->cached_cq_tail & (ctx->cq_entries - 1);
771
unsigned int free, queued, len;
772
773
/*
774
* Posting into the CQ when there are pending overflowed CQEs may break
775
* ordering guarantees, which will affect links, F_MORE users and more.
776
* Force overflow the completion.
777
*/
778
if (!overflow && (ctx->check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT)))
779
return false;
780
781
/*
782
* Post dummy CQE if a 32b CQE is needed and there's only room for a
783
* 16b CQE before the ring wraps.
784
*/
785
if (cqe32 && off + 1 == ctx->cq_entries) {
786
if (!io_fill_nop_cqe(ctx, off))
787
return false;
788
off = 0;
789
}
790
791
/* userspace may cheat modifying the tail, be safe and do min */
792
queued = min(__io_cqring_events(ctx), ctx->cq_entries);
793
free = ctx->cq_entries - queued;
794
/* we need a contiguous range, limit based on the current array offset */
795
len = min(free, ctx->cq_entries - off);
796
if (len < (cqe32 + 1))
797
return false;
798
799
if (ctx->flags & IORING_SETUP_CQE32) {
800
off <<= 1;
801
len <<= 1;
802
}
803
804
ctx->cqe_cached = &rings->cqes[off];
805
ctx->cqe_sentinel = ctx->cqe_cached + len;
806
return true;
807
}
808
809
static bool io_fill_cqe_aux32(struct io_ring_ctx *ctx,
810
struct io_uring_cqe src_cqe[2])
811
{
812
struct io_uring_cqe *cqe;
813
814
if (WARN_ON_ONCE(!(ctx->flags & (IORING_SETUP_CQE32|IORING_SETUP_CQE_MIXED))))
815
return false;
816
if (unlikely(!io_get_cqe(ctx, &cqe, true)))
817
return false;
818
819
memcpy(cqe, src_cqe, 2 * sizeof(*cqe));
820
trace_io_uring_complete(ctx, NULL, cqe);
821
return true;
822
}
823
824
static bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res,
825
u32 cflags)
826
{
827
bool cqe32 = cflags & IORING_CQE_F_32;
828
struct io_uring_cqe *cqe;
829
830
if (likely(io_get_cqe(ctx, &cqe, cqe32))) {
831
WRITE_ONCE(cqe->user_data, user_data);
832
WRITE_ONCE(cqe->res, res);
833
WRITE_ONCE(cqe->flags, cflags);
834
835
if (cqe32) {
836
WRITE_ONCE(cqe->big_cqe[0], 0);
837
WRITE_ONCE(cqe->big_cqe[1], 0);
838
}
839
840
trace_io_uring_complete(ctx, NULL, cqe);
841
return true;
842
}
843
return false;
844
}
845
846
static inline struct io_cqe io_init_cqe(u64 user_data, s32 res, u32 cflags)
847
{
848
return (struct io_cqe) { .user_data = user_data, .res = res, .flags = cflags };
849
}
850
851
static __cold void io_cqe_overflow(struct io_ring_ctx *ctx, struct io_cqe *cqe,
852
struct io_big_cqe *big_cqe)
853
{
854
struct io_overflow_cqe *ocqe;
855
856
ocqe = io_alloc_ocqe(ctx, cqe, big_cqe, GFP_KERNEL);
857
spin_lock(&ctx->completion_lock);
858
io_cqring_add_overflow(ctx, ocqe);
859
spin_unlock(&ctx->completion_lock);
860
}
861
862
static __cold bool io_cqe_overflow_locked(struct io_ring_ctx *ctx,
863
struct io_cqe *cqe,
864
struct io_big_cqe *big_cqe)
865
{
866
struct io_overflow_cqe *ocqe;
867
868
ocqe = io_alloc_ocqe(ctx, cqe, big_cqe, GFP_NOWAIT);
869
return io_cqring_add_overflow(ctx, ocqe);
870
}
871
872
bool io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags)
873
{
874
bool filled;
875
876
io_cq_lock(ctx);
877
filled = io_fill_cqe_aux(ctx, user_data, res, cflags);
878
if (unlikely(!filled)) {
879
struct io_cqe cqe = io_init_cqe(user_data, res, cflags);
880
881
filled = io_cqe_overflow_locked(ctx, &cqe, NULL);
882
}
883
io_cq_unlock_post(ctx);
884
return filled;
885
}
886
887
/*
888
* Must be called from inline task_work so we know a flush will happen later,
889
* and obviously with ctx->uring_lock held (tw always has that).
890
*/
891
void io_add_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags)
892
{
893
lockdep_assert_held(&ctx->uring_lock);
894
lockdep_assert(ctx->lockless_cq);
895
896
if (!io_fill_cqe_aux(ctx, user_data, res, cflags)) {
897
struct io_cqe cqe = io_init_cqe(user_data, res, cflags);
898
899
io_cqe_overflow(ctx, &cqe, NULL);
900
}
901
ctx->submit_state.cq_flush = true;
902
}
903
904
/*
905
* A helper for multishot requests posting additional CQEs.
906
* Should only be used from a task_work including IO_URING_F_MULTISHOT.
907
*/
908
bool io_req_post_cqe(struct io_kiocb *req, s32 res, u32 cflags)
909
{
910
struct io_ring_ctx *ctx = req->ctx;
911
bool posted;
912
913
/*
914
* If multishot has already posted deferred completions, ensure that
915
* those are flushed first before posting this one. If not, CQEs
916
* could get reordered.
917
*/
918
if (!wq_list_empty(&ctx->submit_state.compl_reqs))
919
__io_submit_flush_completions(ctx);
920
921
lockdep_assert(!io_wq_current_is_worker());
922
lockdep_assert_held(&ctx->uring_lock);
923
924
if (!ctx->lockless_cq) {
925
spin_lock(&ctx->completion_lock);
926
posted = io_fill_cqe_aux(ctx, req->cqe.user_data, res, cflags);
927
spin_unlock(&ctx->completion_lock);
928
} else {
929
posted = io_fill_cqe_aux(ctx, req->cqe.user_data, res, cflags);
930
}
931
932
ctx->submit_state.cq_flush = true;
933
return posted;
934
}
935
936
/*
937
* A helper for multishot requests posting additional CQEs.
938
* Should only be used from a task_work including IO_URING_F_MULTISHOT.
939
*/
940
bool io_req_post_cqe32(struct io_kiocb *req, struct io_uring_cqe cqe[2])
941
{
942
struct io_ring_ctx *ctx = req->ctx;
943
bool posted;
944
945
lockdep_assert(!io_wq_current_is_worker());
946
lockdep_assert_held(&ctx->uring_lock);
947
948
cqe[0].user_data = req->cqe.user_data;
949
if (!ctx->lockless_cq) {
950
spin_lock(&ctx->completion_lock);
951
posted = io_fill_cqe_aux32(ctx, cqe);
952
spin_unlock(&ctx->completion_lock);
953
} else {
954
posted = io_fill_cqe_aux32(ctx, cqe);
955
}
956
957
ctx->submit_state.cq_flush = true;
958
return posted;
959
}
960
961
static void io_req_complete_post(struct io_kiocb *req, unsigned issue_flags)
962
{
963
struct io_ring_ctx *ctx = req->ctx;
964
bool completed = true;
965
966
/*
967
* All execution paths but io-wq use the deferred completions by
968
* passing IO_URING_F_COMPLETE_DEFER and thus should not end up here.
969
*/
970
if (WARN_ON_ONCE(!(issue_flags & IO_URING_F_IOWQ)))
971
return;
972
973
/*
974
* Handle special CQ sync cases via task_work. DEFER_TASKRUN requires
975
* the submitter task context, IOPOLL protects with uring_lock.
976
*/
977
if (ctx->lockless_cq || (req->flags & REQ_F_REISSUE)) {
978
defer_complete:
979
req->io_task_work.func = io_req_task_complete;
980
io_req_task_work_add(req);
981
return;
982
}
983
984
io_cq_lock(ctx);
985
if (!(req->flags & REQ_F_CQE_SKIP))
986
completed = io_fill_cqe_req(ctx, req);
987
io_cq_unlock_post(ctx);
988
989
if (!completed)
990
goto defer_complete;
991
992
/*
993
* We don't free the request here because we know it's called from
994
* io-wq only, which holds a reference, so it cannot be the last put.
995
*/
996
req_ref_put(req);
997
}
998
999
void io_req_defer_failed(struct io_kiocb *req, s32 res)
1000
__must_hold(&ctx->uring_lock)
1001
{
1002
const struct io_cold_def *def = &io_cold_defs[req->opcode];
1003
1004
lockdep_assert_held(&req->ctx->uring_lock);
1005
1006
req_set_fail(req);
1007
io_req_set_res(req, res, io_put_kbuf(req, res, NULL));
1008
if (def->fail)
1009
def->fail(req);
1010
io_req_complete_defer(req);
1011
}
1012
1013
/*
1014
* A request might get retired back into the request caches even before opcode
1015
* handlers and io_issue_sqe() are done with it, e.g. inline completion path.
1016
* Because of that, io_alloc_req() should be called only under ->uring_lock
1017
* and with extra caution to not get a request that is still worked on.
1018
*/
1019
__cold bool __io_alloc_req_refill(struct io_ring_ctx *ctx)
1020
__must_hold(&ctx->uring_lock)
1021
{
1022
gfp_t gfp = GFP_KERNEL | __GFP_NOWARN | __GFP_ZERO;
1023
void *reqs[IO_REQ_ALLOC_BATCH];
1024
int ret;
1025
1026
ret = kmem_cache_alloc_bulk(req_cachep, gfp, ARRAY_SIZE(reqs), reqs);
1027
1028
/*
1029
* Bulk alloc is all-or-nothing. If we fail to get a batch,
1030
* retry single alloc to be on the safe side.
1031
*/
1032
if (unlikely(ret <= 0)) {
1033
reqs[0] = kmem_cache_alloc(req_cachep, gfp);
1034
if (!reqs[0])
1035
return false;
1036
ret = 1;
1037
}
1038
1039
percpu_ref_get_many(&ctx->refs, ret);
1040
ctx->nr_req_allocated += ret;
1041
1042
while (ret--) {
1043
struct io_kiocb *req = reqs[ret];
1044
1045
io_req_add_to_cache(req, ctx);
1046
}
1047
return true;
1048
}
1049
1050
__cold void io_free_req(struct io_kiocb *req)
1051
{
1052
/* refs were already put, restore them for io_req_task_complete() */
1053
req->flags &= ~REQ_F_REFCOUNT;
1054
/* we only want to free it, don't post CQEs */
1055
req->flags |= REQ_F_CQE_SKIP;
1056
req->io_task_work.func = io_req_task_complete;
1057
io_req_task_work_add(req);
1058
}
1059
1060
static void __io_req_find_next_prep(struct io_kiocb *req)
1061
{
1062
struct io_ring_ctx *ctx = req->ctx;
1063
1064
spin_lock(&ctx->completion_lock);
1065
io_disarm_next(req);
1066
spin_unlock(&ctx->completion_lock);
1067
}
1068
1069
static inline struct io_kiocb *io_req_find_next(struct io_kiocb *req)
1070
{
1071
struct io_kiocb *nxt;
1072
1073
/*
1074
* If LINK is set, we have dependent requests in this chain. If we
1075
* didn't fail this request, queue the first one up, moving any other
1076
* dependencies to the next request. In case of failure, fail the rest
1077
* of the chain.
1078
*/
1079
if (unlikely(req->flags & IO_DISARM_MASK))
1080
__io_req_find_next_prep(req);
1081
nxt = req->link;
1082
req->link = NULL;
1083
return nxt;
1084
}
1085
1086
static void ctx_flush_and_put(struct io_ring_ctx *ctx, io_tw_token_t tw)
1087
{
1088
if (!ctx)
1089
return;
1090
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
1091
atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
1092
1093
io_submit_flush_completions(ctx);
1094
mutex_unlock(&ctx->uring_lock);
1095
percpu_ref_put(&ctx->refs);
1096
}
1097
1098
/*
1099
* Run queued task_work, returning the number of entries processed in *count.
1100
* If more entries than max_entries are available, stop processing once this
1101
* is reached and return the rest of the list.
1102
*/
1103
struct llist_node *io_handle_tw_list(struct llist_node *node,
1104
unsigned int *count,
1105
unsigned int max_entries)
1106
{
1107
struct io_ring_ctx *ctx = NULL;
1108
struct io_tw_state ts = { };
1109
1110
do {
1111
struct llist_node *next = node->next;
1112
struct io_kiocb *req = container_of(node, struct io_kiocb,
1113
io_task_work.node);
1114
1115
if (req->ctx != ctx) {
1116
ctx_flush_and_put(ctx, ts);
1117
ctx = req->ctx;
1118
mutex_lock(&ctx->uring_lock);
1119
percpu_ref_get(&ctx->refs);
1120
ts.cancel = io_should_terminate_tw(ctx);
1121
}
1122
INDIRECT_CALL_2(req->io_task_work.func,
1123
io_poll_task_func, io_req_rw_complete,
1124
(struct io_tw_req){req}, ts);
1125
node = next;
1126
(*count)++;
1127
if (unlikely(need_resched())) {
1128
ctx_flush_and_put(ctx, ts);
1129
ctx = NULL;
1130
cond_resched();
1131
}
1132
} while (node && *count < max_entries);
1133
1134
ctx_flush_and_put(ctx, ts);
1135
return node;
1136
}
1137
1138
static __cold void __io_fallback_tw(struct llist_node *node, bool sync)
1139
{
1140
struct io_ring_ctx *last_ctx = NULL;
1141
struct io_kiocb *req;
1142
1143
while (node) {
1144
req = container_of(node, struct io_kiocb, io_task_work.node);
1145
node = node->next;
1146
if (last_ctx != req->ctx) {
1147
if (last_ctx) {
1148
if (sync)
1149
flush_delayed_work(&last_ctx->fallback_work);
1150
percpu_ref_put(&last_ctx->refs);
1151
}
1152
last_ctx = req->ctx;
1153
percpu_ref_get(&last_ctx->refs);
1154
}
1155
if (llist_add(&req->io_task_work.node, &last_ctx->fallback_llist))
1156
schedule_delayed_work(&last_ctx->fallback_work, 1);
1157
}
1158
1159
if (last_ctx) {
1160
if (sync)
1161
flush_delayed_work(&last_ctx->fallback_work);
1162
percpu_ref_put(&last_ctx->refs);
1163
}
1164
}
1165
1166
static void io_fallback_tw(struct io_uring_task *tctx, bool sync)
1167
{
1168
struct llist_node *node = llist_del_all(&tctx->task_list);
1169
1170
__io_fallback_tw(node, sync);
1171
}
1172
1173
struct llist_node *tctx_task_work_run(struct io_uring_task *tctx,
1174
unsigned int max_entries,
1175
unsigned int *count)
1176
{
1177
struct llist_node *node;
1178
1179
node = llist_del_all(&tctx->task_list);
1180
if (node) {
1181
node = llist_reverse_order(node);
1182
node = io_handle_tw_list(node, count, max_entries);
1183
}
1184
1185
/* relaxed read is enough as only the task itself sets ->in_cancel */
1186
if (unlikely(atomic_read(&tctx->in_cancel)))
1187
io_uring_drop_tctx_refs(current);
1188
1189
trace_io_uring_task_work_run(tctx, *count);
1190
return node;
1191
}
1192
1193
void tctx_task_work(struct callback_head *cb)
1194
{
1195
struct io_uring_task *tctx;
1196
struct llist_node *ret;
1197
unsigned int count = 0;
1198
1199
tctx = container_of(cb, struct io_uring_task, task_work);
1200
ret = tctx_task_work_run(tctx, UINT_MAX, &count);
1201
/* can't happen */
1202
WARN_ON_ONCE(ret);
1203
}
1204
1205
static void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
1206
{
1207
struct io_ring_ctx *ctx = req->ctx;
1208
unsigned nr_wait, nr_tw, nr_tw_prev;
1209
struct llist_node *head;
1210
1211
/* See comment above IO_CQ_WAKE_INIT */
1212
BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);
1213
1214
/*
1215
* We don't know how many requests there are in the link and whether
1216
* they can even be queued lazily, fall back to non-lazy.
1217
*/
1218
if (req->flags & IO_REQ_LINK_FLAGS)
1219
flags &= ~IOU_F_TWQ_LAZY_WAKE;
1220
1221
guard(rcu)();
1222
1223
head = READ_ONCE(ctx->work_llist.first);
1224
do {
1225
nr_tw_prev = 0;
1226
if (head) {
1227
struct io_kiocb *first_req = container_of(head,
1228
struct io_kiocb,
1229
io_task_work.node);
1230
/*
1231
* Might be executed at any moment, rely on
1232
* SLAB_TYPESAFE_BY_RCU to keep it alive.
1233
*/
1234
nr_tw_prev = READ_ONCE(first_req->nr_tw);
1235
}
1236
1237
/*
1238
* Theoretically, it can overflow, but that's fine as one of
1239
* previous adds should've tried to wake the task.
1240
*/
1241
nr_tw = nr_tw_prev + 1;
1242
if (!(flags & IOU_F_TWQ_LAZY_WAKE))
1243
nr_tw = IO_CQ_WAKE_FORCE;
1244
1245
req->nr_tw = nr_tw;
1246
req->io_task_work.node.next = head;
1247
} while (!try_cmpxchg(&ctx->work_llist.first, &head,
1248
&req->io_task_work.node));
1249
1250
/*
1251
* cmpxchg implies a full barrier, which pairs with the barrier
1252
* in set_current_state() on the io_cqring_wait() side. It's used
1253
* to ensure that either we see updated ->cq_wait_nr, or waiters
1254
* going to sleep will observe the work added to the list, which
1255
* is similar to the wait/wawke task state sync.
1256
*/
1257
1258
if (!head) {
1259
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
1260
atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
1261
if (ctx->has_evfd)
1262
io_eventfd_signal(ctx, false);
1263
}
1264
1265
nr_wait = atomic_read(&ctx->cq_wait_nr);
1266
/* not enough or no one is waiting */
1267
if (nr_tw < nr_wait)
1268
return;
1269
/* the previous add has already woken it up */
1270
if (nr_tw_prev >= nr_wait)
1271
return;
1272
wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
1273
}
1274
1275
static void io_req_normal_work_add(struct io_kiocb *req)
1276
{
1277
struct io_uring_task *tctx = req->tctx;
1278
struct io_ring_ctx *ctx = req->ctx;
1279
1280
/* task_work already pending, we're done */
1281
if (!llist_add(&req->io_task_work.node, &tctx->task_list))
1282
return;
1283
1284
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
1285
atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
1286
1287
/* SQPOLL doesn't need the task_work added, it'll run it itself */
1288
if (ctx->flags & IORING_SETUP_SQPOLL) {
1289
__set_notify_signal(tctx->task);
1290
return;
1291
}
1292
1293
if (likely(!task_work_add(tctx->task, &tctx->task_work, ctx->notify_method)))
1294
return;
1295
1296
io_fallback_tw(tctx, false);
1297
}
1298
1299
void __io_req_task_work_add(struct io_kiocb *req, unsigned flags)
1300
{
1301
if (req->ctx->flags & IORING_SETUP_DEFER_TASKRUN)
1302
io_req_local_work_add(req, flags);
1303
else
1304
io_req_normal_work_add(req);
1305
}
1306
1307
void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags)
1308
{
1309
if (WARN_ON_ONCE(!(req->ctx->flags & IORING_SETUP_DEFER_TASKRUN)))
1310
return;
1311
__io_req_task_work_add(req, flags);
1312
}
1313
1314
static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
1315
{
1316
struct llist_node *node = llist_del_all(&ctx->work_llist);
1317
1318
__io_fallback_tw(node, false);
1319
node = llist_del_all(&ctx->retry_llist);
1320
__io_fallback_tw(node, false);
1321
}
1322
1323
static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
1324
int min_events)
1325
{
1326
if (!io_local_work_pending(ctx))
1327
return false;
1328
if (events < min_events)
1329
return true;
1330
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
1331
atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
1332
return false;
1333
}
1334
1335
static int __io_run_local_work_loop(struct llist_node **node,
1336
io_tw_token_t tw,
1337
int events)
1338
{
1339
int ret = 0;
1340
1341
while (*node) {
1342
struct llist_node *next = (*node)->next;
1343
struct io_kiocb *req = container_of(*node, struct io_kiocb,
1344
io_task_work.node);
1345
INDIRECT_CALL_2(req->io_task_work.func,
1346
io_poll_task_func, io_req_rw_complete,
1347
(struct io_tw_req){req}, tw);
1348
*node = next;
1349
if (++ret >= events)
1350
break;
1351
}
1352
1353
return ret;
1354
}
1355
1356
static int __io_run_local_work(struct io_ring_ctx *ctx, io_tw_token_t tw,
1357
int min_events, int max_events)
1358
{
1359
struct llist_node *node;
1360
unsigned int loops = 0;
1361
int ret = 0;
1362
1363
if (WARN_ON_ONCE(ctx->submitter_task != current))
1364
return -EEXIST;
1365
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
1366
atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
1367
again:
1368
tw.cancel = io_should_terminate_tw(ctx);
1369
min_events -= ret;
1370
ret = __io_run_local_work_loop(&ctx->retry_llist.first, tw, max_events);
1371
if (ctx->retry_llist.first)
1372
goto retry_done;
1373
1374
/*
1375
* llists are in reverse order, flip it back the right way before
1376
* running the pending items.
1377
*/
1378
node = llist_reverse_order(llist_del_all(&ctx->work_llist));
1379
ret += __io_run_local_work_loop(&node, tw, max_events - ret);
1380
ctx->retry_llist.first = node;
1381
loops++;
1382
1383
if (io_run_local_work_continue(ctx, ret, min_events))
1384
goto again;
1385
retry_done:
1386
io_submit_flush_completions(ctx);
1387
if (io_run_local_work_continue(ctx, ret, min_events))
1388
goto again;
1389
1390
trace_io_uring_local_work_run(ctx, ret, loops);
1391
return ret;
1392
}
1393
1394
static inline int io_run_local_work_locked(struct io_ring_ctx *ctx,
1395
int min_events)
1396
{
1397
struct io_tw_state ts = {};
1398
1399
if (!io_local_work_pending(ctx))
1400
return 0;
1401
return __io_run_local_work(ctx, ts, min_events,
1402
max(IO_LOCAL_TW_DEFAULT_MAX, min_events));
1403
}
1404
1405
int io_run_local_work(struct io_ring_ctx *ctx, int min_events, int max_events)
1406
{
1407
struct io_tw_state ts = {};
1408
int ret;
1409
1410
mutex_lock(&ctx->uring_lock);
1411
ret = __io_run_local_work(ctx, ts, min_events, max_events);
1412
mutex_unlock(&ctx->uring_lock);
1413
return ret;
1414
}
1415
1416
static void io_req_task_cancel(struct io_tw_req tw_req, io_tw_token_t tw)
1417
{
1418
struct io_kiocb *req = tw_req.req;
1419
1420
io_tw_lock(req->ctx, tw);
1421
io_req_defer_failed(req, req->cqe.res);
1422
}
1423
1424
void io_req_task_submit(struct io_tw_req tw_req, io_tw_token_t tw)
1425
{
1426
struct io_kiocb *req = tw_req.req;
1427
struct io_ring_ctx *ctx = req->ctx;
1428
1429
io_tw_lock(ctx, tw);
1430
if (unlikely(tw.cancel))
1431
io_req_defer_failed(req, -EFAULT);
1432
else if (req->flags & REQ_F_FORCE_ASYNC)
1433
io_queue_iowq(req);
1434
else
1435
io_queue_sqe(req, 0);
1436
}
1437
1438
void io_req_task_queue_fail(struct io_kiocb *req, int ret)
1439
{
1440
io_req_set_res(req, ret, 0);
1441
req->io_task_work.func = io_req_task_cancel;
1442
io_req_task_work_add(req);
1443
}
1444
1445
void io_req_task_queue(struct io_kiocb *req)
1446
{
1447
req->io_task_work.func = io_req_task_submit;
1448
io_req_task_work_add(req);
1449
}
1450
1451
void io_queue_next(struct io_kiocb *req)
1452
{
1453
struct io_kiocb *nxt = io_req_find_next(req);
1454
1455
if (nxt)
1456
io_req_task_queue(nxt);
1457
}
1458
1459
static inline void io_req_put_rsrc_nodes(struct io_kiocb *req)
1460
{
1461
if (req->file_node) {
1462
io_put_rsrc_node(req->ctx, req->file_node);
1463
req->file_node = NULL;
1464
}
1465
if (req->flags & REQ_F_BUF_NODE)
1466
io_put_rsrc_node(req->ctx, req->buf_node);
1467
}
1468
1469
static void io_free_batch_list(struct io_ring_ctx *ctx,
1470
struct io_wq_work_node *node)
1471
__must_hold(&ctx->uring_lock)
1472
{
1473
do {
1474
struct io_kiocb *req = container_of(node, struct io_kiocb,
1475
comp_list);
1476
1477
if (unlikely(req->flags & IO_REQ_CLEAN_SLOW_FLAGS)) {
1478
if (req->flags & REQ_F_REISSUE) {
1479
node = req->comp_list.next;
1480
req->flags &= ~REQ_F_REISSUE;
1481
io_queue_iowq(req);
1482
continue;
1483
}
1484
if (req->flags & REQ_F_REFCOUNT) {
1485
node = req->comp_list.next;
1486
if (!req_ref_put_and_test(req))
1487
continue;
1488
}
1489
if ((req->flags & REQ_F_POLLED) && req->apoll) {
1490
struct async_poll *apoll = req->apoll;
1491
1492
if (apoll->double_poll)
1493
kfree(apoll->double_poll);
1494
io_cache_free(&ctx->apoll_cache, apoll);
1495
req->flags &= ~REQ_F_POLLED;
1496
}
1497
if (req->flags & IO_REQ_LINK_FLAGS)
1498
io_queue_next(req);
1499
if (unlikely(req->flags & IO_REQ_CLEAN_FLAGS))
1500
io_clean_op(req);
1501
}
1502
io_put_file(req);
1503
io_req_put_rsrc_nodes(req);
1504
io_put_task(req);
1505
1506
node = req->comp_list.next;
1507
io_req_add_to_cache(req, ctx);
1508
} while (node);
1509
}
1510
1511
void __io_submit_flush_completions(struct io_ring_ctx *ctx)
1512
__must_hold(&ctx->uring_lock)
1513
{
1514
struct io_submit_state *state = &ctx->submit_state;
1515
struct io_wq_work_node *node;
1516
1517
__io_cq_lock(ctx);
1518
__wq_list_for_each(node, &state->compl_reqs) {
1519
struct io_kiocb *req = container_of(node, struct io_kiocb,
1520
comp_list);
1521
1522
/*
1523
* Requests marked with REQUEUE should not post a CQE, they
1524
* will go through the io-wq retry machinery and post one
1525
* later.
1526
*/
1527
if (!(req->flags & (REQ_F_CQE_SKIP | REQ_F_REISSUE)) &&
1528
unlikely(!io_fill_cqe_req(ctx, req))) {
1529
if (ctx->lockless_cq)
1530
io_cqe_overflow(ctx, &req->cqe, &req->big_cqe);
1531
else
1532
io_cqe_overflow_locked(ctx, &req->cqe, &req->big_cqe);
1533
}
1534
}
1535
__io_cq_unlock_post(ctx);
1536
1537
if (!wq_list_empty(&state->compl_reqs)) {
1538
io_free_batch_list(ctx, state->compl_reqs.first);
1539
INIT_WQ_LIST(&state->compl_reqs);
1540
}
1541
1542
if (unlikely(ctx->drain_active))
1543
io_queue_deferred(ctx);
1544
1545
ctx->submit_state.cq_flush = false;
1546
}
1547
1548
static unsigned io_cqring_events(struct io_ring_ctx *ctx)
1549
{
1550
/* See comment at the top of this file */
1551
smp_rmb();
1552
return __io_cqring_events(ctx);
1553
}
1554
1555
/*
1556
* We can't just wait for polled events to come to us, we have to actively
1557
* find and complete them.
1558
*/
1559
__cold void io_iopoll_try_reap_events(struct io_ring_ctx *ctx)
1560
{
1561
if (!(ctx->flags & IORING_SETUP_IOPOLL))
1562
return;
1563
1564
mutex_lock(&ctx->uring_lock);
1565
while (!wq_list_empty(&ctx->iopoll_list)) {
1566
/* let it sleep and repeat later if can't complete a request */
1567
if (io_do_iopoll(ctx, true) == 0)
1568
break;
1569
/*
1570
* Ensure we allow local-to-the-cpu processing to take place,
1571
* in this case we need to ensure that we reap all events.
1572
* Also let task_work, etc. to progress by releasing the mutex
1573
*/
1574
if (need_resched()) {
1575
mutex_unlock(&ctx->uring_lock);
1576
cond_resched();
1577
mutex_lock(&ctx->uring_lock);
1578
}
1579
}
1580
mutex_unlock(&ctx->uring_lock);
1581
1582
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
1583
io_move_task_work_from_local(ctx);
1584
}
1585
1586
static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned int min_events)
1587
{
1588
unsigned int nr_events = 0;
1589
unsigned long check_cq;
1590
1591
min_events = min(min_events, ctx->cq_entries);
1592
1593
lockdep_assert_held(&ctx->uring_lock);
1594
1595
if (!io_allowed_run_tw(ctx))
1596
return -EEXIST;
1597
1598
check_cq = READ_ONCE(ctx->check_cq);
1599
if (unlikely(check_cq)) {
1600
if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
1601
__io_cqring_overflow_flush(ctx, false);
1602
/*
1603
* Similarly do not spin if we have not informed the user of any
1604
* dropped CQE.
1605
*/
1606
if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT))
1607
return -EBADR;
1608
}
1609
/*
1610
* Don't enter poll loop if we already have events pending.
1611
* If we do, we can potentially be spinning for commands that
1612
* already triggered a CQE (eg in error).
1613
*/
1614
if (io_cqring_events(ctx))
1615
return 0;
1616
1617
do {
1618
int ret = 0;
1619
1620
/*
1621
* If a submit got punted to a workqueue, we can have the
1622
* application entering polling for a command before it gets
1623
* issued. That app will hold the uring_lock for the duration
1624
* of the poll right here, so we need to take a breather every
1625
* now and then to ensure that the issue has a chance to add
1626
* the poll to the issued list. Otherwise we can spin here
1627
* forever, while the workqueue is stuck trying to acquire the
1628
* very same mutex.
1629
*/
1630
if (wq_list_empty(&ctx->iopoll_list) ||
1631
io_task_work_pending(ctx)) {
1632
u32 tail = ctx->cached_cq_tail;
1633
1634
(void) io_run_local_work_locked(ctx, min_events);
1635
1636
if (task_work_pending(current) ||
1637
wq_list_empty(&ctx->iopoll_list)) {
1638
mutex_unlock(&ctx->uring_lock);
1639
io_run_task_work();
1640
mutex_lock(&ctx->uring_lock);
1641
}
1642
/* some requests don't go through iopoll_list */
1643
if (tail != ctx->cached_cq_tail ||
1644
wq_list_empty(&ctx->iopoll_list))
1645
break;
1646
}
1647
ret = io_do_iopoll(ctx, !min_events);
1648
if (unlikely(ret < 0))
1649
return ret;
1650
1651
if (task_sigpending(current))
1652
return -EINTR;
1653
if (need_resched())
1654
break;
1655
1656
nr_events += ret;
1657
} while (nr_events < min_events);
1658
1659
return 0;
1660
}
1661
1662
void io_req_task_complete(struct io_tw_req tw_req, io_tw_token_t tw)
1663
{
1664
io_req_complete_defer(tw_req.req);
1665
}
1666
1667
/*
1668
* After the iocb has been issued, it's safe to be found on the poll list.
1669
* Adding the kiocb to the list AFTER submission ensures that we don't
1670
* find it from a io_do_iopoll() thread before the issuer is done
1671
* accessing the kiocb cookie.
1672
*/
1673
static void io_iopoll_req_issued(struct io_kiocb *req, unsigned int issue_flags)
1674
{
1675
struct io_ring_ctx *ctx = req->ctx;
1676
const bool needs_lock = issue_flags & IO_URING_F_UNLOCKED;
1677
1678
/* workqueue context doesn't hold uring_lock, grab it now */
1679
if (unlikely(needs_lock))
1680
mutex_lock(&ctx->uring_lock);
1681
1682
/*
1683
* Track whether we have multiple files in our lists. This will impact
1684
* how we do polling eventually, not spinning if we're on potentially
1685
* different devices.
1686
*/
1687
if (wq_list_empty(&ctx->iopoll_list)) {
1688
ctx->poll_multi_queue = false;
1689
} else if (!ctx->poll_multi_queue) {
1690
struct io_kiocb *list_req;
1691
1692
list_req = container_of(ctx->iopoll_list.first, struct io_kiocb,
1693
comp_list);
1694
if (list_req->file != req->file)
1695
ctx->poll_multi_queue = true;
1696
}
1697
1698
/*
1699
* For fast devices, IO may have already completed. If it has, add
1700
* it to the front so we find it first.
1701
*/
1702
if (READ_ONCE(req->iopoll_completed))
1703
wq_list_add_head(&req->comp_list, &ctx->iopoll_list);
1704
else
1705
wq_list_add_tail(&req->comp_list, &ctx->iopoll_list);
1706
1707
if (unlikely(needs_lock)) {
1708
/*
1709
* If IORING_SETUP_SQPOLL is enabled, sqes are either handle
1710
* in sq thread task context or in io worker task context. If
1711
* current task context is sq thread, we don't need to check
1712
* whether should wake up sq thread.
1713
*/
1714
if ((ctx->flags & IORING_SETUP_SQPOLL) &&
1715
wq_has_sleeper(&ctx->sq_data->wait))
1716
wake_up(&ctx->sq_data->wait);
1717
1718
mutex_unlock(&ctx->uring_lock);
1719
}
1720
}
1721
1722
io_req_flags_t io_file_get_flags(struct file *file)
1723
{
1724
io_req_flags_t res = 0;
1725
1726
BUILD_BUG_ON(REQ_F_ISREG_BIT != REQ_F_SUPPORT_NOWAIT_BIT + 1);
1727
1728
if (S_ISREG(file_inode(file)->i_mode))
1729
res |= REQ_F_ISREG;
1730
if ((file->f_flags & O_NONBLOCK) || (file->f_mode & FMODE_NOWAIT))
1731
res |= REQ_F_SUPPORT_NOWAIT;
1732
return res;
1733
}
1734
1735
static __cold void io_drain_req(struct io_kiocb *req)
1736
__must_hold(&ctx->uring_lock)
1737
{
1738
struct io_ring_ctx *ctx = req->ctx;
1739
bool drain = req->flags & IOSQE_IO_DRAIN;
1740
struct io_defer_entry *de;
1741
1742
de = kmalloc(sizeof(*de), GFP_KERNEL_ACCOUNT);
1743
if (!de) {
1744
io_req_defer_failed(req, -ENOMEM);
1745
return;
1746
}
1747
1748
io_prep_async_link(req);
1749
trace_io_uring_defer(req);
1750
de->req = req;
1751
1752
ctx->nr_drained += io_linked_nr(req);
1753
list_add_tail(&de->list, &ctx->defer_list);
1754
io_queue_deferred(ctx);
1755
if (!drain && list_empty(&ctx->defer_list))
1756
ctx->drain_active = false;
1757
}
1758
1759
static bool io_assign_file(struct io_kiocb *req, const struct io_issue_def *def,
1760
unsigned int issue_flags)
1761
{
1762
if (req->file || !def->needs_file)
1763
return true;
1764
1765
if (req->flags & REQ_F_FIXED_FILE)
1766
req->file = io_file_get_fixed(req, req->cqe.fd, issue_flags);
1767
else
1768
req->file = io_file_get_normal(req, req->cqe.fd);
1769
1770
return !!req->file;
1771
}
1772
1773
#define REQ_ISSUE_SLOW_FLAGS (REQ_F_CREDS | REQ_F_ARM_LTIMEOUT)
1774
1775
static inline int __io_issue_sqe(struct io_kiocb *req,
1776
unsigned int issue_flags,
1777
const struct io_issue_def *def)
1778
{
1779
const struct cred *creds = NULL;
1780
struct io_kiocb *link = NULL;
1781
int ret;
1782
1783
if (unlikely(req->flags & REQ_ISSUE_SLOW_FLAGS)) {
1784
if ((req->flags & REQ_F_CREDS) && req->creds != current_cred())
1785
creds = override_creds(req->creds);
1786
if (req->flags & REQ_F_ARM_LTIMEOUT)
1787
link = __io_prep_linked_timeout(req);
1788
}
1789
1790
if (!def->audit_skip)
1791
audit_uring_entry(req->opcode);
1792
1793
ret = def->issue(req, issue_flags);
1794
1795
if (!def->audit_skip)
1796
audit_uring_exit(!ret, ret);
1797
1798
if (unlikely(creds || link)) {
1799
if (creds)
1800
revert_creds(creds);
1801
if (link)
1802
io_queue_linked_timeout(link);
1803
}
1804
1805
return ret;
1806
}
1807
1808
static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags)
1809
{
1810
const struct io_issue_def *def = &io_issue_defs[req->opcode];
1811
int ret;
1812
1813
if (unlikely(!io_assign_file(req, def, issue_flags)))
1814
return -EBADF;
1815
1816
ret = __io_issue_sqe(req, issue_flags, def);
1817
1818
if (ret == IOU_COMPLETE) {
1819
if (issue_flags & IO_URING_F_COMPLETE_DEFER)
1820
io_req_complete_defer(req);
1821
else
1822
io_req_complete_post(req, issue_flags);
1823
1824
return 0;
1825
}
1826
1827
if (ret == IOU_ISSUE_SKIP_COMPLETE) {
1828
ret = 0;
1829
1830
/* If the op doesn't have a file, we're not polling for it */
1831
if ((req->ctx->flags & IORING_SETUP_IOPOLL) && def->iopoll_queue)
1832
io_iopoll_req_issued(req, issue_flags);
1833
}
1834
return ret;
1835
}
1836
1837
int io_poll_issue(struct io_kiocb *req, io_tw_token_t tw)
1838
{
1839
const unsigned int issue_flags = IO_URING_F_NONBLOCK |
1840
IO_URING_F_MULTISHOT |
1841
IO_URING_F_COMPLETE_DEFER;
1842
int ret;
1843
1844
io_tw_lock(req->ctx, tw);
1845
1846
WARN_ON_ONCE(!req->file);
1847
if (WARN_ON_ONCE(req->ctx->flags & IORING_SETUP_IOPOLL))
1848
return -EFAULT;
1849
1850
ret = __io_issue_sqe(req, issue_flags, &io_issue_defs[req->opcode]);
1851
1852
WARN_ON_ONCE(ret == IOU_ISSUE_SKIP_COMPLETE);
1853
return ret;
1854
}
1855
1856
struct io_wq_work *io_wq_free_work(struct io_wq_work *work)
1857
{
1858
struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1859
struct io_kiocb *nxt = NULL;
1860
1861
if (req_ref_put_and_test_atomic(req)) {
1862
if (req->flags & IO_REQ_LINK_FLAGS)
1863
nxt = io_req_find_next(req);
1864
io_free_req(req);
1865
}
1866
return nxt ? &nxt->work : NULL;
1867
}
1868
1869
void io_wq_submit_work(struct io_wq_work *work)
1870
{
1871
struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1872
const struct io_issue_def *def = &io_issue_defs[req->opcode];
1873
unsigned int issue_flags = IO_URING_F_UNLOCKED | IO_URING_F_IOWQ;
1874
bool needs_poll = false;
1875
int ret = 0, err = -ECANCELED;
1876
1877
/* one will be dropped by io_wq_free_work() after returning to io-wq */
1878
if (!(req->flags & REQ_F_REFCOUNT))
1879
__io_req_set_refcount(req, 2);
1880
else
1881
req_ref_get(req);
1882
1883
/* either cancelled or io-wq is dying, so don't touch tctx->iowq */
1884
if (atomic_read(&work->flags) & IO_WQ_WORK_CANCEL) {
1885
fail:
1886
io_req_task_queue_fail(req, err);
1887
return;
1888
}
1889
if (!io_assign_file(req, def, issue_flags)) {
1890
err = -EBADF;
1891
atomic_or(IO_WQ_WORK_CANCEL, &work->flags);
1892
goto fail;
1893
}
1894
1895
/*
1896
* If DEFER_TASKRUN is set, it's only allowed to post CQEs from the
1897
* submitter task context. Final request completions are handed to the
1898
* right context, however this is not the case of auxiliary CQEs,
1899
* which is the main mean of operation for multishot requests.
1900
* Don't allow any multishot execution from io-wq. It's more restrictive
1901
* than necessary and also cleaner.
1902
*/
1903
if (req->flags & (REQ_F_MULTISHOT|REQ_F_APOLL_MULTISHOT)) {
1904
err = -EBADFD;
1905
if (!io_file_can_poll(req))
1906
goto fail;
1907
if (req->file->f_flags & O_NONBLOCK ||
1908
req->file->f_mode & FMODE_NOWAIT) {
1909
err = -ECANCELED;
1910
if (io_arm_poll_handler(req, issue_flags) != IO_APOLL_OK)
1911
goto fail;
1912
return;
1913
} else {
1914
req->flags &= ~(REQ_F_APOLL_MULTISHOT|REQ_F_MULTISHOT);
1915
}
1916
}
1917
1918
if (req->flags & REQ_F_FORCE_ASYNC) {
1919
bool opcode_poll = def->pollin || def->pollout;
1920
1921
if (opcode_poll && io_file_can_poll(req)) {
1922
needs_poll = true;
1923
issue_flags |= IO_URING_F_NONBLOCK;
1924
}
1925
}
1926
1927
do {
1928
ret = io_issue_sqe(req, issue_flags);
1929
if (ret != -EAGAIN)
1930
break;
1931
1932
/*
1933
* If REQ_F_NOWAIT is set, then don't wait or retry with
1934
* poll. -EAGAIN is final for that case.
1935
*/
1936
if (req->flags & REQ_F_NOWAIT)
1937
break;
1938
1939
/*
1940
* We can get EAGAIN for iopolled IO even though we're
1941
* forcing a sync submission from here, since we can't
1942
* wait for request slots on the block side.
1943
*/
1944
if (!needs_poll) {
1945
if (!(req->ctx->flags & IORING_SETUP_IOPOLL))
1946
break;
1947
if (io_wq_worker_stopped())
1948
break;
1949
cond_resched();
1950
continue;
1951
}
1952
1953
if (io_arm_poll_handler(req, issue_flags) == IO_APOLL_OK)
1954
return;
1955
/* aborted or ready, in either case retry blocking */
1956
needs_poll = false;
1957
issue_flags &= ~IO_URING_F_NONBLOCK;
1958
} while (1);
1959
1960
/* avoid locking problems by failing it from a clean context */
1961
if (ret)
1962
io_req_task_queue_fail(req, ret);
1963
}
1964
1965
inline struct file *io_file_get_fixed(struct io_kiocb *req, int fd,
1966
unsigned int issue_flags)
1967
{
1968
struct io_ring_ctx *ctx = req->ctx;
1969
struct io_rsrc_node *node;
1970
struct file *file = NULL;
1971
1972
io_ring_submit_lock(ctx, issue_flags);
1973
node = io_rsrc_node_lookup(&ctx->file_table.data, fd);
1974
if (node) {
1975
node->refs++;
1976
req->file_node = node;
1977
req->flags |= io_slot_flags(node);
1978
file = io_slot_file(node);
1979
}
1980
io_ring_submit_unlock(ctx, issue_flags);
1981
return file;
1982
}
1983
1984
struct file *io_file_get_normal(struct io_kiocb *req, int fd)
1985
{
1986
struct file *file = fget(fd);
1987
1988
trace_io_uring_file_get(req, fd);
1989
1990
/* we don't allow fixed io_uring files */
1991
if (file && io_is_uring_fops(file))
1992
io_req_track_inflight(req);
1993
return file;
1994
}
1995
1996
static int io_req_sqe_copy(struct io_kiocb *req, unsigned int issue_flags)
1997
{
1998
const struct io_cold_def *def = &io_cold_defs[req->opcode];
1999
2000
if (req->flags & REQ_F_SQE_COPIED)
2001
return 0;
2002
req->flags |= REQ_F_SQE_COPIED;
2003
if (!def->sqe_copy)
2004
return 0;
2005
if (WARN_ON_ONCE(!(issue_flags & IO_URING_F_INLINE)))
2006
return -EFAULT;
2007
def->sqe_copy(req);
2008
return 0;
2009
}
2010
2011
static void io_queue_async(struct io_kiocb *req, unsigned int issue_flags, int ret)
2012
__must_hold(&req->ctx->uring_lock)
2013
{
2014
if (ret != -EAGAIN || (req->flags & REQ_F_NOWAIT)) {
2015
fail:
2016
io_req_defer_failed(req, ret);
2017
return;
2018
}
2019
2020
ret = io_req_sqe_copy(req, issue_flags);
2021
if (unlikely(ret))
2022
goto fail;
2023
2024
switch (io_arm_poll_handler(req, 0)) {
2025
case IO_APOLL_READY:
2026
io_req_task_queue(req);
2027
break;
2028
case IO_APOLL_ABORTED:
2029
io_queue_iowq(req);
2030
break;
2031
case IO_APOLL_OK:
2032
break;
2033
}
2034
}
2035
2036
static inline void io_queue_sqe(struct io_kiocb *req, unsigned int extra_flags)
2037
__must_hold(&req->ctx->uring_lock)
2038
{
2039
unsigned int issue_flags = IO_URING_F_NONBLOCK |
2040
IO_URING_F_COMPLETE_DEFER | extra_flags;
2041
int ret;
2042
2043
ret = io_issue_sqe(req, issue_flags);
2044
2045
/*
2046
* We async punt it if the file wasn't marked NOWAIT, or if the file
2047
* doesn't support non-blocking read/write attempts
2048
*/
2049
if (unlikely(ret))
2050
io_queue_async(req, issue_flags, ret);
2051
}
2052
2053
static void io_queue_sqe_fallback(struct io_kiocb *req)
2054
__must_hold(&req->ctx->uring_lock)
2055
{
2056
if (unlikely(req->flags & REQ_F_FAIL)) {
2057
/*
2058
* We don't submit, fail them all, for that replace hardlinks
2059
* with normal links. Extra REQ_F_LINK is tolerated.
2060
*/
2061
req->flags &= ~REQ_F_HARDLINK;
2062
req->flags |= REQ_F_LINK;
2063
io_req_defer_failed(req, req->cqe.res);
2064
} else {
2065
/* can't fail with IO_URING_F_INLINE */
2066
io_req_sqe_copy(req, IO_URING_F_INLINE);
2067
if (unlikely(req->ctx->drain_active))
2068
io_drain_req(req);
2069
else
2070
io_queue_iowq(req);
2071
}
2072
}
2073
2074
/*
2075
* Check SQE restrictions (opcode and flags).
2076
*
2077
* Returns 'true' if SQE is allowed, 'false' otherwise.
2078
*/
2079
static inline bool io_check_restriction(struct io_ring_ctx *ctx,
2080
struct io_kiocb *req,
2081
unsigned int sqe_flags)
2082
{
2083
if (!test_bit(req->opcode, ctx->restrictions.sqe_op))
2084
return false;
2085
2086
if ((sqe_flags & ctx->restrictions.sqe_flags_required) !=
2087
ctx->restrictions.sqe_flags_required)
2088
return false;
2089
2090
if (sqe_flags & ~(ctx->restrictions.sqe_flags_allowed |
2091
ctx->restrictions.sqe_flags_required))
2092
return false;
2093
2094
return true;
2095
}
2096
2097
static void io_init_drain(struct io_ring_ctx *ctx)
2098
{
2099
struct io_kiocb *head = ctx->submit_state.link.head;
2100
2101
ctx->drain_active = true;
2102
if (head) {
2103
/*
2104
* If we need to drain a request in the middle of a link, drain
2105
* the head request and the next request/link after the current
2106
* link. Considering sequential execution of links,
2107
* REQ_F_IO_DRAIN will be maintained for every request of our
2108
* link.
2109
*/
2110
head->flags |= REQ_F_IO_DRAIN | REQ_F_FORCE_ASYNC;
2111
ctx->drain_next = true;
2112
}
2113
}
2114
2115
static __cold int io_init_fail_req(struct io_kiocb *req, int err)
2116
{
2117
/* ensure per-opcode data is cleared if we fail before prep */
2118
memset(&req->cmd.data, 0, sizeof(req->cmd.data));
2119
return err;
2120
}
2121
2122
static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req,
2123
const struct io_uring_sqe *sqe, unsigned int *left)
2124
__must_hold(&ctx->uring_lock)
2125
{
2126
const struct io_issue_def *def;
2127
unsigned int sqe_flags;
2128
int personality;
2129
u8 opcode;
2130
2131
req->ctx = ctx;
2132
req->opcode = opcode = READ_ONCE(sqe->opcode);
2133
/* same numerical values with corresponding REQ_F_*, safe to copy */
2134
sqe_flags = READ_ONCE(sqe->flags);
2135
req->flags = (__force io_req_flags_t) sqe_flags;
2136
req->cqe.user_data = READ_ONCE(sqe->user_data);
2137
req->file = NULL;
2138
req->tctx = current->io_uring;
2139
req->cancel_seq_set = false;
2140
req->async_data = NULL;
2141
2142
if (unlikely(opcode >= IORING_OP_LAST)) {
2143
req->opcode = 0;
2144
return io_init_fail_req(req, -EINVAL);
2145
}
2146
opcode = array_index_nospec(opcode, IORING_OP_LAST);
2147
2148
def = &io_issue_defs[opcode];
2149
if (def->is_128 && !(ctx->flags & IORING_SETUP_SQE128)) {
2150
/*
2151
* A 128b op on a non-128b SQ requires mixed SQE support as
2152
* well as 2 contiguous entries.
2153
*/
2154
if (!(ctx->flags & IORING_SETUP_SQE_MIXED) || *left < 2 ||
2155
!(ctx->cached_sq_head & (ctx->sq_entries - 1)))
2156
return io_init_fail_req(req, -EINVAL);
2157
/*
2158
* A 128b operation on a mixed SQ uses two entries, so we have
2159
* to increment the head and cached refs, and decrement what's
2160
* left.
2161
*/
2162
current->io_uring->cached_refs++;
2163
ctx->cached_sq_head++;
2164
(*left)--;
2165
}
2166
2167
if (unlikely(sqe_flags & ~SQE_COMMON_FLAGS)) {
2168
/* enforce forwards compatibility on users */
2169
if (sqe_flags & ~SQE_VALID_FLAGS)
2170
return io_init_fail_req(req, -EINVAL);
2171
if (sqe_flags & IOSQE_BUFFER_SELECT) {
2172
if (!def->buffer_select)
2173
return io_init_fail_req(req, -EOPNOTSUPP);
2174
req->buf_index = READ_ONCE(sqe->buf_group);
2175
}
2176
if (sqe_flags & IOSQE_CQE_SKIP_SUCCESS)
2177
ctx->drain_disabled = true;
2178
if (sqe_flags & IOSQE_IO_DRAIN) {
2179
if (ctx->drain_disabled)
2180
return io_init_fail_req(req, -EOPNOTSUPP);
2181
io_init_drain(ctx);
2182
}
2183
}
2184
if (unlikely(ctx->restricted || ctx->drain_active || ctx->drain_next)) {
2185
if (ctx->restricted && !io_check_restriction(ctx, req, sqe_flags))
2186
return io_init_fail_req(req, -EACCES);
2187
/* knock it to the slow queue path, will be drained there */
2188
if (ctx->drain_active)
2189
req->flags |= REQ_F_FORCE_ASYNC;
2190
/* if there is no link, we're at "next" request and need to drain */
2191
if (unlikely(ctx->drain_next) && !ctx->submit_state.link.head) {
2192
ctx->drain_next = false;
2193
ctx->drain_active = true;
2194
req->flags |= REQ_F_IO_DRAIN | REQ_F_FORCE_ASYNC;
2195
}
2196
}
2197
2198
if (!def->ioprio && sqe->ioprio)
2199
return io_init_fail_req(req, -EINVAL);
2200
if (!def->iopoll && (ctx->flags & IORING_SETUP_IOPOLL))
2201
return io_init_fail_req(req, -EINVAL);
2202
2203
if (def->needs_file) {
2204
struct io_submit_state *state = &ctx->submit_state;
2205
2206
req->cqe.fd = READ_ONCE(sqe->fd);
2207
2208
/*
2209
* Plug now if we have more than 2 IO left after this, and the
2210
* target is potentially a read/write to block based storage.
2211
*/
2212
if (state->need_plug && def->plug) {
2213
state->plug_started = true;
2214
state->need_plug = false;
2215
blk_start_plug_nr_ios(&state->plug, state->submit_nr);
2216
}
2217
}
2218
2219
personality = READ_ONCE(sqe->personality);
2220
if (personality) {
2221
int ret;
2222
2223
req->creds = xa_load(&ctx->personalities, personality);
2224
if (!req->creds)
2225
return io_init_fail_req(req, -EINVAL);
2226
get_cred(req->creds);
2227
ret = security_uring_override_creds(req->creds);
2228
if (ret) {
2229
put_cred(req->creds);
2230
return io_init_fail_req(req, ret);
2231
}
2232
req->flags |= REQ_F_CREDS;
2233
}
2234
2235
return def->prep(req, sqe);
2236
}
2237
2238
static __cold int io_submit_fail_init(const struct io_uring_sqe *sqe,
2239
struct io_kiocb *req, int ret)
2240
{
2241
struct io_ring_ctx *ctx = req->ctx;
2242
struct io_submit_link *link = &ctx->submit_state.link;
2243
struct io_kiocb *head = link->head;
2244
2245
trace_io_uring_req_failed(sqe, req, ret);
2246
2247
/*
2248
* Avoid breaking links in the middle as it renders links with SQPOLL
2249
* unusable. Instead of failing eagerly, continue assembling the link if
2250
* applicable and mark the head with REQ_F_FAIL. The link flushing code
2251
* should find the flag and handle the rest.
2252
*/
2253
req_fail_link_node(req, ret);
2254
if (head && !(head->flags & REQ_F_FAIL))
2255
req_fail_link_node(head, -ECANCELED);
2256
2257
if (!(req->flags & IO_REQ_LINK_FLAGS)) {
2258
if (head) {
2259
link->last->link = req;
2260
link->head = NULL;
2261
req = head;
2262
}
2263
io_queue_sqe_fallback(req);
2264
return ret;
2265
}
2266
2267
if (head)
2268
link->last->link = req;
2269
else
2270
link->head = req;
2271
link->last = req;
2272
return 0;
2273
}
2274
2275
static inline int io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
2276
const struct io_uring_sqe *sqe, unsigned int *left)
2277
__must_hold(&ctx->uring_lock)
2278
{
2279
struct io_submit_link *link = &ctx->submit_state.link;
2280
int ret;
2281
2282
ret = io_init_req(ctx, req, sqe, left);
2283
if (unlikely(ret))
2284
return io_submit_fail_init(sqe, req, ret);
2285
2286
trace_io_uring_submit_req(req);
2287
2288
/*
2289
* If we already have a head request, queue this one for async
2290
* submittal once the head completes. If we don't have a head but
2291
* IOSQE_IO_LINK is set in the sqe, start a new head. This one will be
2292
* submitted sync once the chain is complete. If none of those
2293
* conditions are true (normal request), then just queue it.
2294
*/
2295
if (unlikely(link->head)) {
2296
trace_io_uring_link(req, link->last);
2297
io_req_sqe_copy(req, IO_URING_F_INLINE);
2298
link->last->link = req;
2299
link->last = req;
2300
2301
if (req->flags & IO_REQ_LINK_FLAGS)
2302
return 0;
2303
/* last request of the link, flush it */
2304
req = link->head;
2305
link->head = NULL;
2306
if (req->flags & (REQ_F_FORCE_ASYNC | REQ_F_FAIL))
2307
goto fallback;
2308
2309
} else if (unlikely(req->flags & (IO_REQ_LINK_FLAGS |
2310
REQ_F_FORCE_ASYNC | REQ_F_FAIL))) {
2311
if (req->flags & IO_REQ_LINK_FLAGS) {
2312
link->head = req;
2313
link->last = req;
2314
} else {
2315
fallback:
2316
io_queue_sqe_fallback(req);
2317
}
2318
return 0;
2319
}
2320
2321
io_queue_sqe(req, IO_URING_F_INLINE);
2322
return 0;
2323
}
2324
2325
/*
2326
* Batched submission is done, ensure local IO is flushed out.
2327
*/
2328
static void io_submit_state_end(struct io_ring_ctx *ctx)
2329
{
2330
struct io_submit_state *state = &ctx->submit_state;
2331
2332
if (unlikely(state->link.head))
2333
io_queue_sqe_fallback(state->link.head);
2334
/* flush only after queuing links as they can generate completions */
2335
io_submit_flush_completions(ctx);
2336
if (state->plug_started)
2337
blk_finish_plug(&state->plug);
2338
}
2339
2340
/*
2341
* Start submission side cache.
2342
*/
2343
static void io_submit_state_start(struct io_submit_state *state,
2344
unsigned int max_ios)
2345
{
2346
state->plug_started = false;
2347
state->need_plug = max_ios > 2;
2348
state->submit_nr = max_ios;
2349
/* set only head, no need to init link_last in advance */
2350
state->link.head = NULL;
2351
}
2352
2353
static void io_commit_sqring(struct io_ring_ctx *ctx)
2354
{
2355
struct io_rings *rings = ctx->rings;
2356
2357
/*
2358
* Ensure any loads from the SQEs are done at this point,
2359
* since once we write the new head, the application could
2360
* write new data to them.
2361
*/
2362
smp_store_release(&rings->sq.head, ctx->cached_sq_head);
2363
}
2364
2365
/*
2366
* Fetch an sqe, if one is available. Note this returns a pointer to memory
2367
* that is mapped by userspace. This means that care needs to be taken to
2368
* ensure that reads are stable, as we cannot rely on userspace always
2369
* being a good citizen. If members of the sqe are validated and then later
2370
* used, it's important that those reads are done through READ_ONCE() to
2371
* prevent a re-load down the line.
2372
*/
2373
static bool io_get_sqe(struct io_ring_ctx *ctx, const struct io_uring_sqe **sqe)
2374
{
2375
unsigned mask = ctx->sq_entries - 1;
2376
unsigned head = ctx->cached_sq_head++ & mask;
2377
2378
if (static_branch_unlikely(&io_key_has_sqarray) &&
2379
(!(ctx->flags & IORING_SETUP_NO_SQARRAY))) {
2380
head = READ_ONCE(ctx->sq_array[head]);
2381
if (unlikely(head >= ctx->sq_entries)) {
2382
WRITE_ONCE(ctx->rings->sq_dropped,
2383
READ_ONCE(ctx->rings->sq_dropped) + 1);
2384
return false;
2385
}
2386
head = array_index_nospec(head, ctx->sq_entries);
2387
}
2388
2389
/*
2390
* The cached sq head (or cq tail) serves two purposes:
2391
*
2392
* 1) allows us to batch the cost of updating the user visible
2393
* head updates.
2394
* 2) allows the kernel side to track the head on its own, even
2395
* though the application is the one updating it.
2396
*/
2397
2398
/* double index for 128-byte SQEs, twice as long */
2399
if (ctx->flags & IORING_SETUP_SQE128)
2400
head <<= 1;
2401
*sqe = &ctx->sq_sqes[head];
2402
return true;
2403
}
2404
2405
int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr)
2406
__must_hold(&ctx->uring_lock)
2407
{
2408
unsigned int entries = io_sqring_entries(ctx);
2409
unsigned int left;
2410
int ret;
2411
2412
entries = min(nr, entries);
2413
if (unlikely(!entries))
2414
return 0;
2415
2416
ret = left = entries;
2417
io_get_task_refs(left);
2418
io_submit_state_start(&ctx->submit_state, left);
2419
2420
do {
2421
const struct io_uring_sqe *sqe;
2422
struct io_kiocb *req;
2423
2424
if (unlikely(!io_alloc_req(ctx, &req)))
2425
break;
2426
if (unlikely(!io_get_sqe(ctx, &sqe))) {
2427
io_req_add_to_cache(req, ctx);
2428
break;
2429
}
2430
2431
/*
2432
* Continue submitting even for sqe failure if the
2433
* ring was setup with IORING_SETUP_SUBMIT_ALL
2434
*/
2435
if (unlikely(io_submit_sqe(ctx, req, sqe, &left)) &&
2436
!(ctx->flags & IORING_SETUP_SUBMIT_ALL)) {
2437
left--;
2438
break;
2439
}
2440
} while (--left);
2441
2442
if (unlikely(left)) {
2443
ret -= left;
2444
/* try again if it submitted nothing and can't allocate a req */
2445
if (!ret && io_req_cache_empty(ctx))
2446
ret = -EAGAIN;
2447
current->io_uring->cached_refs += left;
2448
}
2449
2450
io_submit_state_end(ctx);
2451
/* Commit SQ ring head once we've consumed and submitted all SQEs */
2452
io_commit_sqring(ctx);
2453
return ret;
2454
}
2455
2456
static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
2457
int wake_flags, void *key)
2458
{
2459
struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue, wq);
2460
2461
/*
2462
* Cannot safely flush overflowed CQEs from here, ensure we wake up
2463
* the task, and the next invocation will do it.
2464
*/
2465
if (io_should_wake(iowq) || io_has_work(iowq->ctx))
2466
return autoremove_wake_function(curr, mode, wake_flags, key);
2467
return -1;
2468
}
2469
2470
int io_run_task_work_sig(struct io_ring_ctx *ctx)
2471
{
2472
if (io_local_work_pending(ctx)) {
2473
__set_current_state(TASK_RUNNING);
2474
if (io_run_local_work(ctx, INT_MAX, IO_LOCAL_TW_DEFAULT_MAX) > 0)
2475
return 0;
2476
}
2477
if (io_run_task_work() > 0)
2478
return 0;
2479
if (task_sigpending(current))
2480
return -EINTR;
2481
return 0;
2482
}
2483
2484
static bool current_pending_io(void)
2485
{
2486
struct io_uring_task *tctx = current->io_uring;
2487
2488
if (!tctx)
2489
return false;
2490
return percpu_counter_read_positive(&tctx->inflight);
2491
}
2492
2493
static enum hrtimer_restart io_cqring_timer_wakeup(struct hrtimer *timer)
2494
{
2495
struct io_wait_queue *iowq = container_of(timer, struct io_wait_queue, t);
2496
2497
WRITE_ONCE(iowq->hit_timeout, 1);
2498
iowq->min_timeout = 0;
2499
wake_up_process(iowq->wq.private);
2500
return HRTIMER_NORESTART;
2501
}
2502
2503
/*
2504
* Doing min_timeout portion. If we saw any timeouts, events, or have work,
2505
* wake up. If not, and we have a normal timeout, switch to that and keep
2506
* sleeping.
2507
*/
2508
static enum hrtimer_restart io_cqring_min_timer_wakeup(struct hrtimer *timer)
2509
{
2510
struct io_wait_queue *iowq = container_of(timer, struct io_wait_queue, t);
2511
struct io_ring_ctx *ctx = iowq->ctx;
2512
2513
/* no general timeout, or shorter (or equal), we are done */
2514
if (iowq->timeout == KTIME_MAX ||
2515
ktime_compare(iowq->min_timeout, iowq->timeout) >= 0)
2516
goto out_wake;
2517
/* work we may need to run, wake function will see if we need to wake */
2518
if (io_has_work(ctx))
2519
goto out_wake;
2520
/* got events since we started waiting, min timeout is done */
2521
if (iowq->cq_min_tail != READ_ONCE(ctx->rings->cq.tail))
2522
goto out_wake;
2523
/* if we have any events and min timeout expired, we're done */
2524
if (io_cqring_events(ctx))
2525
goto out_wake;
2526
2527
/*
2528
* If using deferred task_work running and application is waiting on
2529
* more than one request, ensure we reset it now where we are switching
2530
* to normal sleeps. Any request completion post min_wait should wake
2531
* the task and return.
2532
*/
2533
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
2534
atomic_set(&ctx->cq_wait_nr, 1);
2535
smp_mb();
2536
if (!llist_empty(&ctx->work_llist))
2537
goto out_wake;
2538
}
2539
2540
/* any generated CQE posted past this time should wake us up */
2541
iowq->cq_tail = iowq->cq_min_tail;
2542
2543
hrtimer_update_function(&iowq->t, io_cqring_timer_wakeup);
2544
hrtimer_set_expires(timer, iowq->timeout);
2545
return HRTIMER_RESTART;
2546
out_wake:
2547
return io_cqring_timer_wakeup(timer);
2548
}
2549
2550
static int io_cqring_schedule_timeout(struct io_wait_queue *iowq,
2551
clockid_t clock_id, ktime_t start_time)
2552
{
2553
ktime_t timeout;
2554
2555
if (iowq->min_timeout) {
2556
timeout = ktime_add_ns(iowq->min_timeout, start_time);
2557
hrtimer_setup_on_stack(&iowq->t, io_cqring_min_timer_wakeup, clock_id,
2558
HRTIMER_MODE_ABS);
2559
} else {
2560
timeout = iowq->timeout;
2561
hrtimer_setup_on_stack(&iowq->t, io_cqring_timer_wakeup, clock_id,
2562
HRTIMER_MODE_ABS);
2563
}
2564
2565
hrtimer_set_expires_range_ns(&iowq->t, timeout, 0);
2566
hrtimer_start_expires(&iowq->t, HRTIMER_MODE_ABS);
2567
2568
if (!READ_ONCE(iowq->hit_timeout))
2569
schedule();
2570
2571
hrtimer_cancel(&iowq->t);
2572
destroy_hrtimer_on_stack(&iowq->t);
2573
__set_current_state(TASK_RUNNING);
2574
2575
return READ_ONCE(iowq->hit_timeout) ? -ETIME : 0;
2576
}
2577
2578
struct ext_arg {
2579
size_t argsz;
2580
struct timespec64 ts;
2581
const sigset_t __user *sig;
2582
ktime_t min_time;
2583
bool ts_set;
2584
bool iowait;
2585
};
2586
2587
static int __io_cqring_wait_schedule(struct io_ring_ctx *ctx,
2588
struct io_wait_queue *iowq,
2589
struct ext_arg *ext_arg,
2590
ktime_t start_time)
2591
{
2592
int ret = 0;
2593
2594
/*
2595
* Mark us as being in io_wait if we have pending requests, so cpufreq
2596
* can take into account that the task is waiting for IO - turns out
2597
* to be important for low QD IO.
2598
*/
2599
if (ext_arg->iowait && current_pending_io())
2600
current->in_iowait = 1;
2601
if (iowq->timeout != KTIME_MAX || iowq->min_timeout)
2602
ret = io_cqring_schedule_timeout(iowq, ctx->clockid, start_time);
2603
else
2604
schedule();
2605
current->in_iowait = 0;
2606
return ret;
2607
}
2608
2609
/* If this returns > 0, the caller should retry */
2610
static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx,
2611
struct io_wait_queue *iowq,
2612
struct ext_arg *ext_arg,
2613
ktime_t start_time)
2614
{
2615
if (unlikely(READ_ONCE(ctx->check_cq)))
2616
return 1;
2617
if (unlikely(io_local_work_pending(ctx)))
2618
return 1;
2619
if (unlikely(task_work_pending(current)))
2620
return 1;
2621
if (unlikely(task_sigpending(current)))
2622
return -EINTR;
2623
if (unlikely(io_should_wake(iowq)))
2624
return 0;
2625
2626
return __io_cqring_wait_schedule(ctx, iowq, ext_arg, start_time);
2627
}
2628
2629
/*
2630
* Wait until events become available, if we don't already have some. The
2631
* application must reap them itself, as they reside on the shared cq ring.
2632
*/
2633
static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags,
2634
struct ext_arg *ext_arg)
2635
{
2636
struct io_wait_queue iowq;
2637
struct io_rings *rings = ctx->rings;
2638
ktime_t start_time;
2639
int ret;
2640
2641
min_events = min_t(int, min_events, ctx->cq_entries);
2642
2643
if (!io_allowed_run_tw(ctx))
2644
return -EEXIST;
2645
if (io_local_work_pending(ctx))
2646
io_run_local_work(ctx, min_events,
2647
max(IO_LOCAL_TW_DEFAULT_MAX, min_events));
2648
io_run_task_work();
2649
2650
if (unlikely(test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)))
2651
io_cqring_do_overflow_flush(ctx);
2652
if (__io_cqring_events_user(ctx) >= min_events)
2653
return 0;
2654
2655
init_waitqueue_func_entry(&iowq.wq, io_wake_function);
2656
iowq.wq.private = current;
2657
INIT_LIST_HEAD(&iowq.wq.entry);
2658
iowq.ctx = ctx;
2659
iowq.cq_tail = READ_ONCE(ctx->rings->cq.head) + min_events;
2660
iowq.cq_min_tail = READ_ONCE(ctx->rings->cq.tail);
2661
iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
2662
iowq.hit_timeout = 0;
2663
iowq.min_timeout = ext_arg->min_time;
2664
iowq.timeout = KTIME_MAX;
2665
start_time = io_get_time(ctx);
2666
2667
if (ext_arg->ts_set) {
2668
iowq.timeout = timespec64_to_ktime(ext_arg->ts);
2669
if (!(flags & IORING_ENTER_ABS_TIMER))
2670
iowq.timeout = ktime_add(iowq.timeout, start_time);
2671
}
2672
2673
if (ext_arg->sig) {
2674
#ifdef CONFIG_COMPAT
2675
if (in_compat_syscall())
2676
ret = set_compat_user_sigmask((const compat_sigset_t __user *)ext_arg->sig,
2677
ext_arg->argsz);
2678
else
2679
#endif
2680
ret = set_user_sigmask(ext_arg->sig, ext_arg->argsz);
2681
2682
if (ret)
2683
return ret;
2684
}
2685
2686
io_napi_busy_loop(ctx, &iowq);
2687
2688
trace_io_uring_cqring_wait(ctx, min_events);
2689
do {
2690
unsigned long check_cq;
2691
int nr_wait;
2692
2693
/* if min timeout has been hit, don't reset wait count */
2694
if (!iowq.hit_timeout)
2695
nr_wait = (int) iowq.cq_tail -
2696
READ_ONCE(ctx->rings->cq.tail);
2697
else
2698
nr_wait = 1;
2699
2700
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
2701
atomic_set(&ctx->cq_wait_nr, nr_wait);
2702
set_current_state(TASK_INTERRUPTIBLE);
2703
} else {
2704
prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq,
2705
TASK_INTERRUPTIBLE);
2706
}
2707
2708
ret = io_cqring_wait_schedule(ctx, &iowq, ext_arg, start_time);
2709
__set_current_state(TASK_RUNNING);
2710
atomic_set(&ctx->cq_wait_nr, IO_CQ_WAKE_INIT);
2711
2712
/*
2713
* Run task_work after scheduling and before io_should_wake().
2714
* If we got woken because of task_work being processed, run it
2715
* now rather than let the caller do another wait loop.
2716
*/
2717
if (io_local_work_pending(ctx))
2718
io_run_local_work(ctx, nr_wait, nr_wait);
2719
io_run_task_work();
2720
2721
/*
2722
* Non-local task_work will be run on exit to userspace, but
2723
* if we're using DEFER_TASKRUN, then we could have waited
2724
* with a timeout for a number of requests. If the timeout
2725
* hits, we could have some requests ready to process. Ensure
2726
* this break is _after_ we have run task_work, to avoid
2727
* deferring running potentially pending requests until the
2728
* next time we wait for events.
2729
*/
2730
if (ret < 0)
2731
break;
2732
2733
check_cq = READ_ONCE(ctx->check_cq);
2734
if (unlikely(check_cq)) {
2735
/* let the caller flush overflows, retry */
2736
if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
2737
io_cqring_do_overflow_flush(ctx);
2738
if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT)) {
2739
ret = -EBADR;
2740
break;
2741
}
2742
}
2743
2744
if (io_should_wake(&iowq)) {
2745
ret = 0;
2746
break;
2747
}
2748
cond_resched();
2749
} while (1);
2750
2751
if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN))
2752
finish_wait(&ctx->cq_wait, &iowq.wq);
2753
restore_saved_sigmask_unless(ret == -EINTR);
2754
2755
return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0;
2756
}
2757
2758
static void io_rings_free(struct io_ring_ctx *ctx)
2759
{
2760
io_free_region(ctx->user, &ctx->sq_region);
2761
io_free_region(ctx->user, &ctx->ring_region);
2762
ctx->rings = NULL;
2763
ctx->sq_sqes = NULL;
2764
}
2765
2766
static int rings_size(unsigned int flags, unsigned int sq_entries,
2767
unsigned int cq_entries, struct io_rings_layout *rl)
2768
{
2769
struct io_rings *rings;
2770
size_t sqe_size;
2771
size_t off;
2772
2773
if (flags & IORING_SETUP_CQE_MIXED) {
2774
if (cq_entries < 2)
2775
return -EOVERFLOW;
2776
}
2777
if (flags & IORING_SETUP_SQE_MIXED) {
2778
if (sq_entries < 2)
2779
return -EOVERFLOW;
2780
}
2781
2782
rl->sq_array_offset = SIZE_MAX;
2783
2784
sqe_size = sizeof(struct io_uring_sqe);
2785
if (flags & IORING_SETUP_SQE128)
2786
sqe_size *= 2;
2787
2788
rl->sq_size = array_size(sqe_size, sq_entries);
2789
if (rl->sq_size == SIZE_MAX)
2790
return -EOVERFLOW;
2791
2792
off = struct_size(rings, cqes, cq_entries);
2793
if (flags & IORING_SETUP_CQE32)
2794
off = size_mul(off, 2);
2795
if (off == SIZE_MAX)
2796
return -EOVERFLOW;
2797
2798
#ifdef CONFIG_SMP
2799
off = ALIGN(off, SMP_CACHE_BYTES);
2800
if (off == 0)
2801
return -EOVERFLOW;
2802
#endif
2803
2804
if (!(flags & IORING_SETUP_NO_SQARRAY)) {
2805
size_t sq_array_size;
2806
2807
rl->sq_array_offset = off;
2808
2809
sq_array_size = array_size(sizeof(u32), sq_entries);
2810
off = size_add(off, sq_array_size);
2811
if (off == SIZE_MAX)
2812
return -EOVERFLOW;
2813
}
2814
2815
rl->rings_size = off;
2816
return 0;
2817
}
2818
2819
static __cold void __io_req_caches_free(struct io_ring_ctx *ctx)
2820
{
2821
struct io_kiocb *req;
2822
int nr = 0;
2823
2824
while (!io_req_cache_empty(ctx)) {
2825
req = io_extract_req(ctx);
2826
io_poison_req(req);
2827
kmem_cache_free(req_cachep, req);
2828
nr++;
2829
}
2830
if (nr) {
2831
ctx->nr_req_allocated -= nr;
2832
percpu_ref_put_many(&ctx->refs, nr);
2833
}
2834
}
2835
2836
static __cold void io_req_caches_free(struct io_ring_ctx *ctx)
2837
{
2838
guard(mutex)(&ctx->uring_lock);
2839
__io_req_caches_free(ctx);
2840
}
2841
2842
static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
2843
{
2844
io_sq_thread_finish(ctx);
2845
2846
mutex_lock(&ctx->uring_lock);
2847
io_sqe_buffers_unregister(ctx);
2848
io_sqe_files_unregister(ctx);
2849
io_unregister_zcrx_ifqs(ctx);
2850
io_cqring_overflow_kill(ctx);
2851
io_eventfd_unregister(ctx);
2852
io_free_alloc_caches(ctx);
2853
io_destroy_buffers(ctx);
2854
io_free_region(ctx->user, &ctx->param_region);
2855
mutex_unlock(&ctx->uring_lock);
2856
if (ctx->sq_creds)
2857
put_cred(ctx->sq_creds);
2858
if (ctx->submitter_task)
2859
put_task_struct(ctx->submitter_task);
2860
2861
WARN_ON_ONCE(!list_empty(&ctx->ltimeout_list));
2862
2863
if (ctx->mm_account) {
2864
mmdrop(ctx->mm_account);
2865
ctx->mm_account = NULL;
2866
}
2867
io_rings_free(ctx);
2868
2869
if (!(ctx->flags & IORING_SETUP_NO_SQARRAY))
2870
static_branch_dec(&io_key_has_sqarray);
2871
2872
percpu_ref_exit(&ctx->refs);
2873
free_uid(ctx->user);
2874
io_req_caches_free(ctx);
2875
2876
WARN_ON_ONCE(ctx->nr_req_allocated);
2877
2878
if (ctx->hash_map)
2879
io_wq_put_hash(ctx->hash_map);
2880
io_napi_free(ctx);
2881
kvfree(ctx->cancel_table.hbs);
2882
xa_destroy(&ctx->io_bl_xa);
2883
kfree(ctx);
2884
}
2885
2886
static __cold void io_activate_pollwq_cb(struct callback_head *cb)
2887
{
2888
struct io_ring_ctx *ctx = container_of(cb, struct io_ring_ctx,
2889
poll_wq_task_work);
2890
2891
mutex_lock(&ctx->uring_lock);
2892
ctx->poll_activated = true;
2893
mutex_unlock(&ctx->uring_lock);
2894
2895
/*
2896
* Wake ups for some events between start of polling and activation
2897
* might've been lost due to loose synchronisation.
2898
*/
2899
wake_up_all(&ctx->poll_wq);
2900
percpu_ref_put(&ctx->refs);
2901
}
2902
2903
__cold void io_activate_pollwq(struct io_ring_ctx *ctx)
2904
{
2905
spin_lock(&ctx->completion_lock);
2906
/* already activated or in progress */
2907
if (ctx->poll_activated || ctx->poll_wq_task_work.func)
2908
goto out;
2909
if (WARN_ON_ONCE(!ctx->task_complete))
2910
goto out;
2911
if (!ctx->submitter_task)
2912
goto out;
2913
/*
2914
* with ->submitter_task only the submitter task completes requests, we
2915
* only need to sync with it, which is done by injecting a tw
2916
*/
2917
init_task_work(&ctx->poll_wq_task_work, io_activate_pollwq_cb);
2918
percpu_ref_get(&ctx->refs);
2919
if (task_work_add(ctx->submitter_task, &ctx->poll_wq_task_work, TWA_SIGNAL))
2920
percpu_ref_put(&ctx->refs);
2921
out:
2922
spin_unlock(&ctx->completion_lock);
2923
}
2924
2925
static __poll_t io_uring_poll(struct file *file, poll_table *wait)
2926
{
2927
struct io_ring_ctx *ctx = file->private_data;
2928
__poll_t mask = 0;
2929
2930
if (unlikely(!ctx->poll_activated))
2931
io_activate_pollwq(ctx);
2932
/*
2933
* provides mb() which pairs with barrier from wq_has_sleeper
2934
* call in io_commit_cqring
2935
*/
2936
poll_wait(file, &ctx->poll_wq, wait);
2937
2938
if (!io_sqring_full(ctx))
2939
mask |= EPOLLOUT | EPOLLWRNORM;
2940
2941
/*
2942
* Don't flush cqring overflow list here, just do a simple check.
2943
* Otherwise there could possible be ABBA deadlock:
2944
* CPU0 CPU1
2945
* ---- ----
2946
* lock(&ctx->uring_lock);
2947
* lock(&ep->mtx);
2948
* lock(&ctx->uring_lock);
2949
* lock(&ep->mtx);
2950
*
2951
* Users may get EPOLLIN meanwhile seeing nothing in cqring, this
2952
* pushes them to do the flush.
2953
*/
2954
2955
if (__io_cqring_events_user(ctx) || io_has_work(ctx))
2956
mask |= EPOLLIN | EPOLLRDNORM;
2957
2958
return mask;
2959
}
2960
2961
struct io_tctx_exit {
2962
struct callback_head task_work;
2963
struct completion completion;
2964
struct io_ring_ctx *ctx;
2965
};
2966
2967
static __cold void io_tctx_exit_cb(struct callback_head *cb)
2968
{
2969
struct io_uring_task *tctx = current->io_uring;
2970
struct io_tctx_exit *work;
2971
2972
work = container_of(cb, struct io_tctx_exit, task_work);
2973
/*
2974
* When @in_cancel, we're in cancellation and it's racy to remove the
2975
* node. It'll be removed by the end of cancellation, just ignore it.
2976
* tctx can be NULL if the queueing of this task_work raced with
2977
* work cancelation off the exec path.
2978
*/
2979
if (tctx && !atomic_read(&tctx->in_cancel))
2980
io_uring_del_tctx_node((unsigned long)work->ctx);
2981
complete(&work->completion);
2982
}
2983
2984
static __cold void io_ring_exit_work(struct work_struct *work)
2985
{
2986
struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx, exit_work);
2987
unsigned long timeout = jiffies + HZ * 60 * 5;
2988
unsigned long interval = HZ / 20;
2989
struct io_tctx_exit exit;
2990
struct io_tctx_node *node;
2991
int ret;
2992
2993
/*
2994
* If we're doing polled IO and end up having requests being
2995
* submitted async (out-of-line), then completions can come in while
2996
* we're waiting for refs to drop. We need to reap these manually,
2997
* as nobody else will be looking for them.
2998
*/
2999
do {
3000
if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) {
3001
mutex_lock(&ctx->uring_lock);
3002
io_cqring_overflow_kill(ctx);
3003
mutex_unlock(&ctx->uring_lock);
3004
}
3005
3006
/* The SQPOLL thread never reaches this path */
3007
do {
3008
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
3009
io_move_task_work_from_local(ctx);
3010
cond_resched();
3011
} while (io_uring_try_cancel_requests(ctx, NULL, true, false));
3012
3013
if (ctx->sq_data) {
3014
struct io_sq_data *sqd = ctx->sq_data;
3015
struct task_struct *tsk;
3016
3017
io_sq_thread_park(sqd);
3018
tsk = sqpoll_task_locked(sqd);
3019
if (tsk && tsk->io_uring && tsk->io_uring->io_wq)
3020
io_wq_cancel_cb(tsk->io_uring->io_wq,
3021
io_cancel_ctx_cb, ctx, true);
3022
io_sq_thread_unpark(sqd);
3023
}
3024
3025
io_req_caches_free(ctx);
3026
3027
if (WARN_ON_ONCE(time_after(jiffies, timeout))) {
3028
/* there is little hope left, don't run it too often */
3029
interval = HZ * 60;
3030
}
3031
/*
3032
* This is really an uninterruptible wait, as it has to be
3033
* complete. But it's also run from a kworker, which doesn't
3034
* take signals, so it's fine to make it interruptible. This
3035
* avoids scenarios where we knowingly can wait much longer
3036
* on completions, for example if someone does a SIGSTOP on
3037
* a task that needs to finish task_work to make this loop
3038
* complete. That's a synthetic situation that should not
3039
* cause a stuck task backtrace, and hence a potential panic
3040
* on stuck tasks if that is enabled.
3041
*/
3042
} while (!wait_for_completion_interruptible_timeout(&ctx->ref_comp, interval));
3043
3044
init_completion(&exit.completion);
3045
init_task_work(&exit.task_work, io_tctx_exit_cb);
3046
exit.ctx = ctx;
3047
3048
mutex_lock(&ctx->uring_lock);
3049
mutex_lock(&ctx->tctx_lock);
3050
while (!list_empty(&ctx->tctx_list)) {
3051
WARN_ON_ONCE(time_after(jiffies, timeout));
3052
3053
node = list_first_entry(&ctx->tctx_list, struct io_tctx_node,
3054
ctx_node);
3055
/* don't spin on a single task if cancellation failed */
3056
list_rotate_left(&ctx->tctx_list);
3057
ret = task_work_add(node->task, &exit.task_work, TWA_SIGNAL);
3058
if (WARN_ON_ONCE(ret))
3059
continue;
3060
3061
mutex_unlock(&ctx->tctx_lock);
3062
mutex_unlock(&ctx->uring_lock);
3063
/*
3064
* See comment above for
3065
* wait_for_completion_interruptible_timeout() on why this
3066
* wait is marked as interruptible.
3067
*/
3068
wait_for_completion_interruptible(&exit.completion);
3069
mutex_lock(&ctx->uring_lock);
3070
mutex_lock(&ctx->tctx_lock);
3071
}
3072
mutex_unlock(&ctx->tctx_lock);
3073
mutex_unlock(&ctx->uring_lock);
3074
spin_lock(&ctx->completion_lock);
3075
spin_unlock(&ctx->completion_lock);
3076
3077
/* pairs with RCU read section in io_req_local_work_add() */
3078
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
3079
synchronize_rcu();
3080
3081
io_ring_ctx_free(ctx);
3082
}
3083
3084
static __cold void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
3085
{
3086
unsigned long index;
3087
struct creds *creds;
3088
3089
mutex_lock(&ctx->uring_lock);
3090
percpu_ref_kill(&ctx->refs);
3091
xa_for_each(&ctx->personalities, index, creds)
3092
io_unregister_personality(ctx, index);
3093
mutex_unlock(&ctx->uring_lock);
3094
3095
flush_delayed_work(&ctx->fallback_work);
3096
3097
INIT_WORK(&ctx->exit_work, io_ring_exit_work);
3098
/*
3099
* Use system_dfl_wq to avoid spawning tons of event kworkers
3100
* if we're exiting a ton of rings at the same time. It just adds
3101
* noise and overhead, there's no discernable change in runtime
3102
* over using system_percpu_wq.
3103
*/
3104
queue_work(iou_wq, &ctx->exit_work);
3105
}
3106
3107
static int io_uring_release(struct inode *inode, struct file *file)
3108
{
3109
struct io_ring_ctx *ctx = file->private_data;
3110
3111
file->private_data = NULL;
3112
io_ring_ctx_wait_and_kill(ctx);
3113
return 0;
3114
}
3115
3116
static struct io_uring_reg_wait *io_get_ext_arg_reg(struct io_ring_ctx *ctx,
3117
const struct io_uring_getevents_arg __user *uarg)
3118
{
3119
unsigned long size = sizeof(struct io_uring_reg_wait);
3120
unsigned long offset = (uintptr_t)uarg;
3121
unsigned long end;
3122
3123
if (unlikely(offset % sizeof(long)))
3124
return ERR_PTR(-EFAULT);
3125
3126
/* also protects from NULL ->cq_wait_arg as the size would be 0 */
3127
if (unlikely(check_add_overflow(offset, size, &end) ||
3128
end > ctx->cq_wait_size))
3129
return ERR_PTR(-EFAULT);
3130
3131
offset = array_index_nospec(offset, ctx->cq_wait_size - size);
3132
return ctx->cq_wait_arg + offset;
3133
}
3134
3135
static int io_validate_ext_arg(struct io_ring_ctx *ctx, unsigned flags,
3136
const void __user *argp, size_t argsz)
3137
{
3138
struct io_uring_getevents_arg arg;
3139
3140
if (!(flags & IORING_ENTER_EXT_ARG))
3141
return 0;
3142
if (flags & IORING_ENTER_EXT_ARG_REG)
3143
return -EINVAL;
3144
if (argsz != sizeof(arg))
3145
return -EINVAL;
3146
if (copy_from_user(&arg, argp, sizeof(arg)))
3147
return -EFAULT;
3148
return 0;
3149
}
3150
3151
static int io_get_ext_arg(struct io_ring_ctx *ctx, unsigned flags,
3152
const void __user *argp, struct ext_arg *ext_arg)
3153
{
3154
const struct io_uring_getevents_arg __user *uarg = argp;
3155
struct io_uring_getevents_arg arg;
3156
3157
ext_arg->iowait = !(flags & IORING_ENTER_NO_IOWAIT);
3158
3159
/*
3160
* If EXT_ARG isn't set, then we have no timespec and the argp pointer
3161
* is just a pointer to the sigset_t.
3162
*/
3163
if (!(flags & IORING_ENTER_EXT_ARG)) {
3164
ext_arg->sig = (const sigset_t __user *) argp;
3165
return 0;
3166
}
3167
3168
if (flags & IORING_ENTER_EXT_ARG_REG) {
3169
struct io_uring_reg_wait *w;
3170
3171
if (ext_arg->argsz != sizeof(struct io_uring_reg_wait))
3172
return -EINVAL;
3173
w = io_get_ext_arg_reg(ctx, argp);
3174
if (IS_ERR(w))
3175
return PTR_ERR(w);
3176
3177
if (w->flags & ~IORING_REG_WAIT_TS)
3178
return -EINVAL;
3179
ext_arg->min_time = READ_ONCE(w->min_wait_usec) * NSEC_PER_USEC;
3180
ext_arg->sig = u64_to_user_ptr(READ_ONCE(w->sigmask));
3181
ext_arg->argsz = READ_ONCE(w->sigmask_sz);
3182
if (w->flags & IORING_REG_WAIT_TS) {
3183
ext_arg->ts.tv_sec = READ_ONCE(w->ts.tv_sec);
3184
ext_arg->ts.tv_nsec = READ_ONCE(w->ts.tv_nsec);
3185
ext_arg->ts_set = true;
3186
}
3187
return 0;
3188
}
3189
3190
/*
3191
* EXT_ARG is set - ensure we agree on the size of it and copy in our
3192
* timespec and sigset_t pointers if good.
3193
*/
3194
if (ext_arg->argsz != sizeof(arg))
3195
return -EINVAL;
3196
#ifdef CONFIG_64BIT
3197
if (!user_access_begin(uarg, sizeof(*uarg)))
3198
return -EFAULT;
3199
unsafe_get_user(arg.sigmask, &uarg->sigmask, uaccess_end);
3200
unsafe_get_user(arg.sigmask_sz, &uarg->sigmask_sz, uaccess_end);
3201
unsafe_get_user(arg.min_wait_usec, &uarg->min_wait_usec, uaccess_end);
3202
unsafe_get_user(arg.ts, &uarg->ts, uaccess_end);
3203
user_access_end();
3204
#else
3205
if (copy_from_user(&arg, uarg, sizeof(arg)))
3206
return -EFAULT;
3207
#endif
3208
ext_arg->min_time = arg.min_wait_usec * NSEC_PER_USEC;
3209
ext_arg->sig = u64_to_user_ptr(arg.sigmask);
3210
ext_arg->argsz = arg.sigmask_sz;
3211
if (arg.ts) {
3212
if (get_timespec64(&ext_arg->ts, u64_to_user_ptr(arg.ts)))
3213
return -EFAULT;
3214
ext_arg->ts_set = true;
3215
}
3216
return 0;
3217
#ifdef CONFIG_64BIT
3218
uaccess_end:
3219
user_access_end();
3220
return -EFAULT;
3221
#endif
3222
}
3223
3224
SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
3225
u32, min_complete, u32, flags, const void __user *, argp,
3226
size_t, argsz)
3227
{
3228
struct io_ring_ctx *ctx;
3229
struct file *file;
3230
long ret;
3231
3232
if (unlikely(flags & ~IORING_ENTER_FLAGS))
3233
return -EINVAL;
3234
3235
/*
3236
* Ring fd has been registered via IORING_REGISTER_RING_FDS, we
3237
* need only dereference our task private array to find it.
3238
*/
3239
if (flags & IORING_ENTER_REGISTERED_RING) {
3240
struct io_uring_task *tctx = current->io_uring;
3241
3242
if (unlikely(!tctx || fd >= IO_RINGFD_REG_MAX))
3243
return -EINVAL;
3244
fd = array_index_nospec(fd, IO_RINGFD_REG_MAX);
3245
file = tctx->registered_rings[fd];
3246
if (unlikely(!file))
3247
return -EBADF;
3248
} else {
3249
file = fget(fd);
3250
if (unlikely(!file))
3251
return -EBADF;
3252
ret = -EOPNOTSUPP;
3253
if (unlikely(!io_is_uring_fops(file)))
3254
goto out;
3255
}
3256
3257
ctx = file->private_data;
3258
ret = -EBADFD;
3259
if (unlikely(ctx->flags & IORING_SETUP_R_DISABLED))
3260
goto out;
3261
3262
/*
3263
* For SQ polling, the thread will do all submissions and completions.
3264
* Just return the requested submit count, and wake the thread if
3265
* we were asked to.
3266
*/
3267
ret = 0;
3268
if (ctx->flags & IORING_SETUP_SQPOLL) {
3269
if (unlikely(ctx->sq_data->thread == NULL)) {
3270
ret = -EOWNERDEAD;
3271
goto out;
3272
}
3273
if (flags & IORING_ENTER_SQ_WAKEUP)
3274
wake_up(&ctx->sq_data->wait);
3275
if (flags & IORING_ENTER_SQ_WAIT)
3276
io_sqpoll_wait_sq(ctx);
3277
3278
ret = to_submit;
3279
} else if (to_submit) {
3280
ret = io_uring_add_tctx_node(ctx);
3281
if (unlikely(ret))
3282
goto out;
3283
3284
mutex_lock(&ctx->uring_lock);
3285
ret = io_submit_sqes(ctx, to_submit);
3286
if (ret != to_submit) {
3287
mutex_unlock(&ctx->uring_lock);
3288
goto out;
3289
}
3290
if (flags & IORING_ENTER_GETEVENTS) {
3291
if (ctx->syscall_iopoll)
3292
goto iopoll_locked;
3293
/*
3294
* Ignore errors, we'll soon call io_cqring_wait() and
3295
* it should handle ownership problems if any.
3296
*/
3297
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
3298
(void)io_run_local_work_locked(ctx, min_complete);
3299
}
3300
mutex_unlock(&ctx->uring_lock);
3301
}
3302
3303
if (flags & IORING_ENTER_GETEVENTS) {
3304
int ret2;
3305
3306
if (ctx->syscall_iopoll) {
3307
/*
3308
* We disallow the app entering submit/complete with
3309
* polling, but we still need to lock the ring to
3310
* prevent racing with polled issue that got punted to
3311
* a workqueue.
3312
*/
3313
mutex_lock(&ctx->uring_lock);
3314
iopoll_locked:
3315
ret2 = io_validate_ext_arg(ctx, flags, argp, argsz);
3316
if (likely(!ret2))
3317
ret2 = io_iopoll_check(ctx, min_complete);
3318
mutex_unlock(&ctx->uring_lock);
3319
} else {
3320
struct ext_arg ext_arg = { .argsz = argsz };
3321
3322
ret2 = io_get_ext_arg(ctx, flags, argp, &ext_arg);
3323
if (likely(!ret2))
3324
ret2 = io_cqring_wait(ctx, min_complete, flags,
3325
&ext_arg);
3326
}
3327
3328
if (!ret) {
3329
ret = ret2;
3330
3331
/*
3332
* EBADR indicates that one or more CQE were dropped.
3333
* Once the user has been informed we can clear the bit
3334
* as they are obviously ok with those drops.
3335
*/
3336
if (unlikely(ret2 == -EBADR))
3337
clear_bit(IO_CHECK_CQ_DROPPED_BIT,
3338
&ctx->check_cq);
3339
}
3340
}
3341
out:
3342
if (!(flags & IORING_ENTER_REGISTERED_RING))
3343
fput(file);
3344
return ret;
3345
}
3346
3347
static const struct file_operations io_uring_fops = {
3348
.release = io_uring_release,
3349
.mmap = io_uring_mmap,
3350
.get_unmapped_area = io_uring_get_unmapped_area,
3351
#ifndef CONFIG_MMU
3352
.mmap_capabilities = io_uring_nommu_mmap_capabilities,
3353
#endif
3354
.poll = io_uring_poll,
3355
#ifdef CONFIG_PROC_FS
3356
.show_fdinfo = io_uring_show_fdinfo,
3357
#endif
3358
};
3359
3360
bool io_is_uring_fops(struct file *file)
3361
{
3362
return file->f_op == &io_uring_fops;
3363
}
3364
3365
static __cold int io_allocate_scq_urings(struct io_ring_ctx *ctx,
3366
struct io_ctx_config *config)
3367
{
3368
struct io_uring_params *p = &config->p;
3369
struct io_rings_layout *rl = &config->layout;
3370
struct io_uring_region_desc rd;
3371
struct io_rings *rings;
3372
int ret;
3373
3374
/* make sure these are sane, as we already accounted them */
3375
ctx->sq_entries = p->sq_entries;
3376
ctx->cq_entries = p->cq_entries;
3377
3378
memset(&rd, 0, sizeof(rd));
3379
rd.size = PAGE_ALIGN(rl->rings_size);
3380
if (ctx->flags & IORING_SETUP_NO_MMAP) {
3381
rd.user_addr = p->cq_off.user_addr;
3382
rd.flags |= IORING_MEM_REGION_TYPE_USER;
3383
}
3384
ret = io_create_region(ctx, &ctx->ring_region, &rd, IORING_OFF_CQ_RING);
3385
if (ret)
3386
return ret;
3387
ctx->rings = rings = io_region_get_ptr(&ctx->ring_region);
3388
if (!(ctx->flags & IORING_SETUP_NO_SQARRAY))
3389
ctx->sq_array = (u32 *)((char *)rings + rl->sq_array_offset);
3390
3391
memset(&rd, 0, sizeof(rd));
3392
rd.size = PAGE_ALIGN(rl->sq_size);
3393
if (ctx->flags & IORING_SETUP_NO_MMAP) {
3394
rd.user_addr = p->sq_off.user_addr;
3395
rd.flags |= IORING_MEM_REGION_TYPE_USER;
3396
}
3397
ret = io_create_region(ctx, &ctx->sq_region, &rd, IORING_OFF_SQES);
3398
if (ret) {
3399
io_rings_free(ctx);
3400
return ret;
3401
}
3402
ctx->sq_sqes = io_region_get_ptr(&ctx->sq_region);
3403
3404
memset(rings, 0, sizeof(*rings));
3405
WRITE_ONCE(rings->sq_ring_mask, ctx->sq_entries - 1);
3406
WRITE_ONCE(rings->cq_ring_mask, ctx->cq_entries - 1);
3407
WRITE_ONCE(rings->sq_ring_entries, ctx->sq_entries);
3408
WRITE_ONCE(rings->cq_ring_entries, ctx->cq_entries);
3409
return 0;
3410
}
3411
3412
static int io_uring_install_fd(struct file *file)
3413
{
3414
int fd;
3415
3416
fd = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
3417
if (fd < 0)
3418
return fd;
3419
fd_install(fd, file);
3420
return fd;
3421
}
3422
3423
/*
3424
* Allocate an anonymous fd, this is what constitutes the application
3425
* visible backing of an io_uring instance. The application mmaps this
3426
* fd to gain access to the SQ/CQ ring details.
3427
*/
3428
static struct file *io_uring_get_file(struct io_ring_ctx *ctx)
3429
{
3430
/* Create a new inode so that the LSM can block the creation. */
3431
return anon_inode_create_getfile("[io_uring]", &io_uring_fops, ctx,
3432
O_RDWR | O_CLOEXEC, NULL);
3433
}
3434
3435
static int io_uring_sanitise_params(struct io_uring_params *p)
3436
{
3437
unsigned flags = p->flags;
3438
3439
if (flags & ~IORING_SETUP_FLAGS)
3440
return -EINVAL;
3441
3442
/* There is no way to mmap rings without a real fd */
3443
if ((flags & IORING_SETUP_REGISTERED_FD_ONLY) &&
3444
!(flags & IORING_SETUP_NO_MMAP))
3445
return -EINVAL;
3446
3447
if (flags & IORING_SETUP_SQPOLL) {
3448
/* IPI related flags don't make sense with SQPOLL */
3449
if (flags & (IORING_SETUP_COOP_TASKRUN |
3450
IORING_SETUP_TASKRUN_FLAG |
3451
IORING_SETUP_DEFER_TASKRUN))
3452
return -EINVAL;
3453
}
3454
3455
if (flags & IORING_SETUP_TASKRUN_FLAG) {
3456
if (!(flags & (IORING_SETUP_COOP_TASKRUN |
3457
IORING_SETUP_DEFER_TASKRUN)))
3458
return -EINVAL;
3459
}
3460
3461
/* HYBRID_IOPOLL only valid with IOPOLL */
3462
if ((flags & IORING_SETUP_HYBRID_IOPOLL) && !(flags & IORING_SETUP_IOPOLL))
3463
return -EINVAL;
3464
3465
/*
3466
* For DEFER_TASKRUN we require the completion task to be the same as
3467
* the submission task. This implies that there is only one submitter.
3468
*/
3469
if ((flags & IORING_SETUP_DEFER_TASKRUN) &&
3470
!(flags & IORING_SETUP_SINGLE_ISSUER))
3471
return -EINVAL;
3472
3473
/*
3474
* Nonsensical to ask for CQE32 and mixed CQE support, it's not
3475
* supported to post 16b CQEs on a ring setup with CQE32.
3476
*/
3477
if ((flags & (IORING_SETUP_CQE32|IORING_SETUP_CQE_MIXED)) ==
3478
(IORING_SETUP_CQE32|IORING_SETUP_CQE_MIXED))
3479
return -EINVAL;
3480
/*
3481
* Nonsensical to ask for SQE128 and mixed SQE support, it's not
3482
* supported to post 64b SQEs on a ring setup with SQE128.
3483
*/
3484
if ((flags & (IORING_SETUP_SQE128|IORING_SETUP_SQE_MIXED)) ==
3485
(IORING_SETUP_SQE128|IORING_SETUP_SQE_MIXED))
3486
return -EINVAL;
3487
3488
return 0;
3489
}
3490
3491
static int io_uring_fill_params(struct io_uring_params *p)
3492
{
3493
unsigned entries = p->sq_entries;
3494
3495
if (!entries)
3496
return -EINVAL;
3497
if (entries > IORING_MAX_ENTRIES) {
3498
if (!(p->flags & IORING_SETUP_CLAMP))
3499
return -EINVAL;
3500
entries = IORING_MAX_ENTRIES;
3501
}
3502
3503
/*
3504
* Use twice as many entries for the CQ ring. It's possible for the
3505
* application to drive a higher depth than the size of the SQ ring,
3506
* since the sqes are only used at submission time. This allows for
3507
* some flexibility in overcommitting a bit. If the application has
3508
* set IORING_SETUP_CQSIZE, it will have passed in the desired number
3509
* of CQ ring entries manually.
3510
*/
3511
p->sq_entries = roundup_pow_of_two(entries);
3512
if (p->flags & IORING_SETUP_CQSIZE) {
3513
/*
3514
* If IORING_SETUP_CQSIZE is set, we do the same roundup
3515
* to a power-of-two, if it isn't already. We do NOT impose
3516
* any cq vs sq ring sizing.
3517
*/
3518
if (!p->cq_entries)
3519
return -EINVAL;
3520
if (p->cq_entries > IORING_MAX_CQ_ENTRIES) {
3521
if (!(p->flags & IORING_SETUP_CLAMP))
3522
return -EINVAL;
3523
p->cq_entries = IORING_MAX_CQ_ENTRIES;
3524
}
3525
p->cq_entries = roundup_pow_of_two(p->cq_entries);
3526
if (p->cq_entries < p->sq_entries)
3527
return -EINVAL;
3528
} else {
3529
p->cq_entries = 2 * p->sq_entries;
3530
}
3531
3532
return 0;
3533
}
3534
3535
int io_prepare_config(struct io_ctx_config *config)
3536
{
3537
struct io_uring_params *p = &config->p;
3538
int ret;
3539
3540
ret = io_uring_sanitise_params(p);
3541
if (ret)
3542
return ret;
3543
3544
ret = io_uring_fill_params(p);
3545
if (ret)
3546
return ret;
3547
3548
ret = rings_size(p->flags, p->sq_entries, p->cq_entries,
3549
&config->layout);
3550
if (ret)
3551
return ret;
3552
3553
p->sq_off.head = offsetof(struct io_rings, sq.head);
3554
p->sq_off.tail = offsetof(struct io_rings, sq.tail);
3555
p->sq_off.ring_mask = offsetof(struct io_rings, sq_ring_mask);
3556
p->sq_off.ring_entries = offsetof(struct io_rings, sq_ring_entries);
3557
p->sq_off.flags = offsetof(struct io_rings, sq_flags);
3558
p->sq_off.dropped = offsetof(struct io_rings, sq_dropped);
3559
p->sq_off.resv1 = 0;
3560
if (!(p->flags & IORING_SETUP_NO_MMAP))
3561
p->sq_off.user_addr = 0;
3562
3563
p->cq_off.head = offsetof(struct io_rings, cq.head);
3564
p->cq_off.tail = offsetof(struct io_rings, cq.tail);
3565
p->cq_off.ring_mask = offsetof(struct io_rings, cq_ring_mask);
3566
p->cq_off.ring_entries = offsetof(struct io_rings, cq_ring_entries);
3567
p->cq_off.overflow = offsetof(struct io_rings, cq_overflow);
3568
p->cq_off.cqes = offsetof(struct io_rings, cqes);
3569
p->cq_off.flags = offsetof(struct io_rings, cq_flags);
3570
p->cq_off.resv1 = 0;
3571
if (!(p->flags & IORING_SETUP_NO_MMAP))
3572
p->cq_off.user_addr = 0;
3573
if (!(p->flags & IORING_SETUP_NO_SQARRAY))
3574
p->sq_off.array = config->layout.sq_array_offset;
3575
3576
return 0;
3577
}
3578
3579
static __cold int io_uring_create(struct io_ctx_config *config)
3580
{
3581
struct io_uring_params *p = &config->p;
3582
struct io_ring_ctx *ctx;
3583
struct io_uring_task *tctx;
3584
struct file *file;
3585
int ret;
3586
3587
ret = io_prepare_config(config);
3588
if (ret)
3589
return ret;
3590
3591
ctx = io_ring_ctx_alloc(p);
3592
if (!ctx)
3593
return -ENOMEM;
3594
3595
ctx->clockid = CLOCK_MONOTONIC;
3596
ctx->clock_offset = 0;
3597
3598
if (!(ctx->flags & IORING_SETUP_NO_SQARRAY))
3599
static_branch_inc(&io_key_has_sqarray);
3600
3601
if ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) &&
3602
!(ctx->flags & IORING_SETUP_IOPOLL) &&
3603
!(ctx->flags & IORING_SETUP_SQPOLL))
3604
ctx->task_complete = true;
3605
3606
if (ctx->task_complete || (ctx->flags & IORING_SETUP_IOPOLL))
3607
ctx->lockless_cq = true;
3608
3609
/*
3610
* lazy poll_wq activation relies on ->task_complete for synchronisation
3611
* purposes, see io_activate_pollwq()
3612
*/
3613
if (!ctx->task_complete)
3614
ctx->poll_activated = true;
3615
3616
/*
3617
* When SETUP_IOPOLL and SETUP_SQPOLL are both enabled, user
3618
* space applications don't need to do io completion events
3619
* polling again, they can rely on io_sq_thread to do polling
3620
* work, which can reduce cpu usage and uring_lock contention.
3621
*/
3622
if (ctx->flags & IORING_SETUP_IOPOLL &&
3623
!(ctx->flags & IORING_SETUP_SQPOLL))
3624
ctx->syscall_iopoll = 1;
3625
3626
ctx->compat = in_compat_syscall();
3627
if (!ns_capable_noaudit(&init_user_ns, CAP_IPC_LOCK))
3628
ctx->user = get_uid(current_user());
3629
3630
/*
3631
* For SQPOLL, we just need a wakeup, always. For !SQPOLL, if
3632
* COOP_TASKRUN is set, then IPIs are never needed by the app.
3633
*/
3634
if (ctx->flags & (IORING_SETUP_SQPOLL|IORING_SETUP_COOP_TASKRUN))
3635
ctx->notify_method = TWA_SIGNAL_NO_IPI;
3636
else
3637
ctx->notify_method = TWA_SIGNAL;
3638
3639
/*
3640
* This is just grabbed for accounting purposes. When a process exits,
3641
* the mm is exited and dropped before the files, hence we need to hang
3642
* on to this mm purely for the purposes of being able to unaccount
3643
* memory (locked/pinned vm). It's not used for anything else.
3644
*/
3645
mmgrab(current->mm);
3646
ctx->mm_account = current->mm;
3647
3648
ret = io_allocate_scq_urings(ctx, config);
3649
if (ret)
3650
goto err;
3651
3652
ret = io_sq_offload_create(ctx, p);
3653
if (ret)
3654
goto err;
3655
3656
p->features = IORING_FEAT_FLAGS;
3657
3658
if (copy_to_user(config->uptr, p, sizeof(*p))) {
3659
ret = -EFAULT;
3660
goto err;
3661
}
3662
3663
if (ctx->flags & IORING_SETUP_SINGLE_ISSUER
3664
&& !(ctx->flags & IORING_SETUP_R_DISABLED)) {
3665
/*
3666
* Unlike io_register_enable_rings(), don't need WRITE_ONCE()
3667
* since ctx isn't yet accessible from other tasks
3668
*/
3669
ctx->submitter_task = get_task_struct(current);
3670
}
3671
3672
file = io_uring_get_file(ctx);
3673
if (IS_ERR(file)) {
3674
ret = PTR_ERR(file);
3675
goto err;
3676
}
3677
3678
ret = __io_uring_add_tctx_node(ctx);
3679
if (ret)
3680
goto err_fput;
3681
tctx = current->io_uring;
3682
3683
/*
3684
* Install ring fd as the very last thing, so we don't risk someone
3685
* having closed it before we finish setup
3686
*/
3687
if (p->flags & IORING_SETUP_REGISTERED_FD_ONLY)
3688
ret = io_ring_add_registered_file(tctx, file, 0, IO_RINGFD_REG_MAX);
3689
else
3690
ret = io_uring_install_fd(file);
3691
if (ret < 0)
3692
goto err_fput;
3693
3694
trace_io_uring_create(ret, ctx, p->sq_entries, p->cq_entries, p->flags);
3695
return ret;
3696
err:
3697
io_ring_ctx_wait_and_kill(ctx);
3698
return ret;
3699
err_fput:
3700
fput(file);
3701
return ret;
3702
}
3703
3704
/*
3705
* Sets up an aio uring context, and returns the fd. Applications asks for a
3706
* ring size, we return the actual sq/cq ring sizes (among other things) in the
3707
* params structure passed in.
3708
*/
3709
static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
3710
{
3711
struct io_ctx_config config;
3712
3713
memset(&config, 0, sizeof(config));
3714
3715
if (copy_from_user(&config.p, params, sizeof(config.p)))
3716
return -EFAULT;
3717
3718
if (!mem_is_zero(&config.p.resv, sizeof(config.p.resv)))
3719
return -EINVAL;
3720
3721
config.p.sq_entries = entries;
3722
config.uptr = params;
3723
return io_uring_create(&config);
3724
}
3725
3726
static inline int io_uring_allowed(void)
3727
{
3728
int disabled = READ_ONCE(sysctl_io_uring_disabled);
3729
kgid_t io_uring_group;
3730
3731
if (disabled == 2)
3732
return -EPERM;
3733
3734
if (disabled == 0 || capable(CAP_SYS_ADMIN))
3735
goto allowed_lsm;
3736
3737
io_uring_group = make_kgid(&init_user_ns, sysctl_io_uring_group);
3738
if (!gid_valid(io_uring_group))
3739
return -EPERM;
3740
3741
if (!in_group_p(io_uring_group))
3742
return -EPERM;
3743
3744
allowed_lsm:
3745
return security_uring_allowed();
3746
}
3747
3748
SYSCALL_DEFINE2(io_uring_setup, u32, entries,
3749
struct io_uring_params __user *, params)
3750
{
3751
int ret;
3752
3753
ret = io_uring_allowed();
3754
if (ret)
3755
return ret;
3756
3757
return io_uring_setup(entries, params);
3758
}
3759
3760
static int __init io_uring_init(void)
3761
{
3762
struct kmem_cache_args kmem_args = {
3763
.useroffset = offsetof(struct io_kiocb, cmd.data),
3764
.usersize = sizeof_field(struct io_kiocb, cmd.data),
3765
.freeptr_offset = offsetof(struct io_kiocb, work),
3766
.use_freeptr_offset = true,
3767
};
3768
3769
#define __BUILD_BUG_VERIFY_OFFSET_SIZE(stype, eoffset, esize, ename) do { \
3770
BUILD_BUG_ON(offsetof(stype, ename) != eoffset); \
3771
BUILD_BUG_ON(sizeof_field(stype, ename) != esize); \
3772
} while (0)
3773
3774
#define BUILD_BUG_SQE_ELEM(eoffset, etype, ename) \
3775
__BUILD_BUG_VERIFY_OFFSET_SIZE(struct io_uring_sqe, eoffset, sizeof(etype), ename)
3776
#define BUILD_BUG_SQE_ELEM_SIZE(eoffset, esize, ename) \
3777
__BUILD_BUG_VERIFY_OFFSET_SIZE(struct io_uring_sqe, eoffset, esize, ename)
3778
BUILD_BUG_ON(sizeof(struct io_uring_sqe) != 64);
3779
BUILD_BUG_SQE_ELEM(0, __u8, opcode);
3780
BUILD_BUG_SQE_ELEM(1, __u8, flags);
3781
BUILD_BUG_SQE_ELEM(2, __u16, ioprio);
3782
BUILD_BUG_SQE_ELEM(4, __s32, fd);
3783
BUILD_BUG_SQE_ELEM(8, __u64, off);
3784
BUILD_BUG_SQE_ELEM(8, __u64, addr2);
3785
BUILD_BUG_SQE_ELEM(8, __u32, cmd_op);
3786
BUILD_BUG_SQE_ELEM(12, __u32, __pad1);
3787
BUILD_BUG_SQE_ELEM(16, __u64, addr);
3788
BUILD_BUG_SQE_ELEM(16, __u64, splice_off_in);
3789
BUILD_BUG_SQE_ELEM(24, __u32, len);
3790
BUILD_BUG_SQE_ELEM(28, __kernel_rwf_t, rw_flags);
3791
BUILD_BUG_SQE_ELEM(28, /* compat */ int, rw_flags);
3792
BUILD_BUG_SQE_ELEM(28, /* compat */ __u32, rw_flags);
3793
BUILD_BUG_SQE_ELEM(28, __u32, fsync_flags);
3794
BUILD_BUG_SQE_ELEM(28, /* compat */ __u16, poll_events);
3795
BUILD_BUG_SQE_ELEM(28, __u32, poll32_events);
3796
BUILD_BUG_SQE_ELEM(28, __u32, sync_range_flags);
3797
BUILD_BUG_SQE_ELEM(28, __u32, msg_flags);
3798
BUILD_BUG_SQE_ELEM(28, __u32, timeout_flags);
3799
BUILD_BUG_SQE_ELEM(28, __u32, accept_flags);
3800
BUILD_BUG_SQE_ELEM(28, __u32, cancel_flags);
3801
BUILD_BUG_SQE_ELEM(28, __u32, open_flags);
3802
BUILD_BUG_SQE_ELEM(28, __u32, statx_flags);
3803
BUILD_BUG_SQE_ELEM(28, __u32, fadvise_advice);
3804
BUILD_BUG_SQE_ELEM(28, __u32, splice_flags);
3805
BUILD_BUG_SQE_ELEM(28, __u32, rename_flags);
3806
BUILD_BUG_SQE_ELEM(28, __u32, unlink_flags);
3807
BUILD_BUG_SQE_ELEM(28, __u32, hardlink_flags);
3808
BUILD_BUG_SQE_ELEM(28, __u32, xattr_flags);
3809
BUILD_BUG_SQE_ELEM(28, __u32, msg_ring_flags);
3810
BUILD_BUG_SQE_ELEM(32, __u64, user_data);
3811
BUILD_BUG_SQE_ELEM(40, __u16, buf_index);
3812
BUILD_BUG_SQE_ELEM(40, __u16, buf_group);
3813
BUILD_BUG_SQE_ELEM(42, __u16, personality);
3814
BUILD_BUG_SQE_ELEM(44, __s32, splice_fd_in);
3815
BUILD_BUG_SQE_ELEM(44, __u32, file_index);
3816
BUILD_BUG_SQE_ELEM(44, __u16, addr_len);
3817
BUILD_BUG_SQE_ELEM(44, __u8, write_stream);
3818
BUILD_BUG_SQE_ELEM(45, __u8, __pad4[0]);
3819
BUILD_BUG_SQE_ELEM(46, __u16, __pad3[0]);
3820
BUILD_BUG_SQE_ELEM(48, __u64, addr3);
3821
BUILD_BUG_SQE_ELEM_SIZE(48, 0, cmd);
3822
BUILD_BUG_SQE_ELEM(48, __u64, attr_ptr);
3823
BUILD_BUG_SQE_ELEM(56, __u64, attr_type_mask);
3824
BUILD_BUG_SQE_ELEM(56, __u64, __pad2);
3825
3826
BUILD_BUG_ON(sizeof(struct io_uring_files_update) !=
3827
sizeof(struct io_uring_rsrc_update));
3828
BUILD_BUG_ON(sizeof(struct io_uring_rsrc_update) >
3829
sizeof(struct io_uring_rsrc_update2));
3830
3831
/* ->buf_index is u16 */
3832
BUILD_BUG_ON(offsetof(struct io_uring_buf_ring, bufs) != 0);
3833
BUILD_BUG_ON(offsetof(struct io_uring_buf, resv) !=
3834
offsetof(struct io_uring_buf_ring, tail));
3835
3836
/* should fit into one byte */
3837
BUILD_BUG_ON(SQE_VALID_FLAGS >= (1 << 8));
3838
BUILD_BUG_ON(SQE_COMMON_FLAGS >= (1 << 8));
3839
BUILD_BUG_ON((SQE_VALID_FLAGS | SQE_COMMON_FLAGS) != SQE_VALID_FLAGS);
3840
3841
BUILD_BUG_ON(__REQ_F_LAST_BIT > 8 * sizeof_field(struct io_kiocb, flags));
3842
3843
BUILD_BUG_ON(sizeof(atomic_t) != sizeof(u32));
3844
3845
/* top 8bits are for internal use */
3846
BUILD_BUG_ON((IORING_URING_CMD_MASK & 0xff000000) != 0);
3847
3848
io_uring_optable_init();
3849
3850
/* imu->dir is u8 */
3851
BUILD_BUG_ON((IO_IMU_DEST | IO_IMU_SOURCE) > U8_MAX);
3852
3853
/*
3854
* Allow user copy in the per-command field, which starts after the
3855
* file in io_kiocb and until the opcode field. The openat2 handling
3856
* requires copying in user memory into the io_kiocb object in that
3857
* range, and HARDENED_USERCOPY will complain if we haven't
3858
* correctly annotated this range.
3859
*/
3860
req_cachep = kmem_cache_create("io_kiocb", sizeof(struct io_kiocb), &kmem_args,
3861
SLAB_HWCACHE_ALIGN | SLAB_PANIC | SLAB_ACCOUNT |
3862
SLAB_TYPESAFE_BY_RCU);
3863
3864
iou_wq = alloc_workqueue("iou_exit", WQ_UNBOUND, 64);
3865
BUG_ON(!iou_wq);
3866
3867
#ifdef CONFIG_SYSCTL
3868
register_sysctl_init("kernel", kernel_io_uring_disabled_table);
3869
#endif
3870
3871
return 0;
3872
};
3873
__initcall(io_uring_init);
3874
3875