Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/io_uring/io_uring.h
26131 views
1
#ifndef IOU_CORE_H
2
#define IOU_CORE_H
3
4
#include <linux/errno.h>
5
#include <linux/lockdep.h>
6
#include <linux/resume_user_mode.h>
7
#include <linux/kasan.h>
8
#include <linux/poll.h>
9
#include <linux/io_uring_types.h>
10
#include <uapi/linux/eventpoll.h>
11
#include "alloc_cache.h"
12
#include "io-wq.h"
13
#include "slist.h"
14
#include "filetable.h"
15
#include "opdef.h"
16
17
#ifndef CREATE_TRACE_POINTS
18
#include <trace/events/io_uring.h>
19
#endif
20
21
enum {
22
IOU_COMPLETE = 0,
23
24
IOU_ISSUE_SKIP_COMPLETE = -EIOCBQUEUED,
25
26
/*
27
* The request has more work to do and should be retried. io_uring will
28
* attempt to wait on the file for eligible opcodes, but otherwise
29
* it'll be handed to iowq for blocking execution. It works for normal
30
* requests as well as for the multi shot mode.
31
*/
32
IOU_RETRY = -EAGAIN,
33
34
/*
35
* Requeue the task_work to restart operations on this request. The
36
* actual value isn't important, should just be not an otherwise
37
* valid error code, yet less than -MAX_ERRNO and valid internally.
38
*/
39
IOU_REQUEUE = -3072,
40
};
41
42
struct io_wait_queue {
43
struct wait_queue_entry wq;
44
struct io_ring_ctx *ctx;
45
unsigned cq_tail;
46
unsigned cq_min_tail;
47
unsigned nr_timeouts;
48
int hit_timeout;
49
ktime_t min_timeout;
50
ktime_t timeout;
51
struct hrtimer t;
52
53
#ifdef CONFIG_NET_RX_BUSY_POLL
54
ktime_t napi_busy_poll_dt;
55
bool napi_prefer_busy_poll;
56
#endif
57
};
58
59
static inline bool io_should_wake(struct io_wait_queue *iowq)
60
{
61
struct io_ring_ctx *ctx = iowq->ctx;
62
int dist = READ_ONCE(ctx->rings->cq.tail) - (int) iowq->cq_tail;
63
64
/*
65
* Wake up if we have enough events, or if a timeout occurred since we
66
* started waiting. For timeouts, we always want to return to userspace,
67
* regardless of event count.
68
*/
69
return dist >= 0 || atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
70
}
71
72
#define IORING_MAX_ENTRIES 32768
73
#define IORING_MAX_CQ_ENTRIES (2 * IORING_MAX_ENTRIES)
74
75
unsigned long rings_size(unsigned int flags, unsigned int sq_entries,
76
unsigned int cq_entries, size_t *sq_offset);
77
int io_uring_fill_params(unsigned entries, struct io_uring_params *p);
78
bool io_cqe_cache_refill(struct io_ring_ctx *ctx, bool overflow);
79
int io_run_task_work_sig(struct io_ring_ctx *ctx);
80
void io_req_defer_failed(struct io_kiocb *req, s32 res);
81
bool io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags);
82
void io_add_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags);
83
bool io_req_post_cqe(struct io_kiocb *req, s32 res, u32 cflags);
84
bool io_req_post_cqe32(struct io_kiocb *req, struct io_uring_cqe src_cqe[2]);
85
void __io_commit_cqring_flush(struct io_ring_ctx *ctx);
86
87
void io_req_track_inflight(struct io_kiocb *req);
88
struct file *io_file_get_normal(struct io_kiocb *req, int fd);
89
struct file *io_file_get_fixed(struct io_kiocb *req, int fd,
90
unsigned issue_flags);
91
92
void __io_req_task_work_add(struct io_kiocb *req, unsigned flags);
93
void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags);
94
void io_req_task_queue(struct io_kiocb *req);
95
void io_req_task_complete(struct io_kiocb *req, io_tw_token_t tw);
96
void io_req_task_queue_fail(struct io_kiocb *req, int ret);
97
void io_req_task_submit(struct io_kiocb *req, io_tw_token_t tw);
98
struct llist_node *io_handle_tw_list(struct llist_node *node, unsigned int *count, unsigned int max_entries);
99
struct llist_node *tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries, unsigned int *count);
100
void tctx_task_work(struct callback_head *cb);
101
__cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd);
102
103
int io_ring_add_registered_file(struct io_uring_task *tctx, struct file *file,
104
int start, int end);
105
void io_req_queue_iowq(struct io_kiocb *req);
106
107
int io_poll_issue(struct io_kiocb *req, io_tw_token_t tw);
108
int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr);
109
int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin);
110
void __io_submit_flush_completions(struct io_ring_ctx *ctx);
111
112
struct io_wq_work *io_wq_free_work(struct io_wq_work *work);
113
void io_wq_submit_work(struct io_wq_work *work);
114
115
void io_free_req(struct io_kiocb *req);
116
void io_queue_next(struct io_kiocb *req);
117
void io_task_refs_refill(struct io_uring_task *tctx);
118
bool __io_alloc_req_refill(struct io_ring_ctx *ctx);
119
120
bool io_match_task_safe(struct io_kiocb *head, struct io_uring_task *tctx,
121
bool cancel_all);
122
123
void io_activate_pollwq(struct io_ring_ctx *ctx);
124
125
static inline void io_lockdep_assert_cq_locked(struct io_ring_ctx *ctx)
126
{
127
#if defined(CONFIG_PROVE_LOCKING)
128
lockdep_assert(in_task());
129
130
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
131
lockdep_assert_held(&ctx->uring_lock);
132
133
if (ctx->flags & IORING_SETUP_IOPOLL) {
134
lockdep_assert_held(&ctx->uring_lock);
135
} else if (!ctx->task_complete) {
136
lockdep_assert_held(&ctx->completion_lock);
137
} else if (ctx->submitter_task) {
138
/*
139
* ->submitter_task may be NULL and we can still post a CQE,
140
* if the ring has been setup with IORING_SETUP_R_DISABLED.
141
* Not from an SQE, as those cannot be submitted, but via
142
* updating tagged resources.
143
*/
144
if (!percpu_ref_is_dying(&ctx->refs))
145
lockdep_assert(current == ctx->submitter_task);
146
}
147
#endif
148
}
149
150
static inline bool io_is_compat(struct io_ring_ctx *ctx)
151
{
152
return IS_ENABLED(CONFIG_COMPAT) && unlikely(ctx->compat);
153
}
154
155
static inline void io_req_task_work_add(struct io_kiocb *req)
156
{
157
__io_req_task_work_add(req, 0);
158
}
159
160
static inline void io_submit_flush_completions(struct io_ring_ctx *ctx)
161
{
162
if (!wq_list_empty(&ctx->submit_state.compl_reqs) ||
163
ctx->submit_state.cq_flush)
164
__io_submit_flush_completions(ctx);
165
}
166
167
#define io_for_each_link(pos, head) \
168
for (pos = (head); pos; pos = pos->link)
169
170
static inline bool io_get_cqe_overflow(struct io_ring_ctx *ctx,
171
struct io_uring_cqe **ret,
172
bool overflow)
173
{
174
io_lockdep_assert_cq_locked(ctx);
175
176
if (unlikely(ctx->cqe_cached >= ctx->cqe_sentinel)) {
177
if (unlikely(!io_cqe_cache_refill(ctx, overflow)))
178
return false;
179
}
180
*ret = ctx->cqe_cached;
181
ctx->cached_cq_tail++;
182
ctx->cqe_cached++;
183
if (ctx->flags & IORING_SETUP_CQE32)
184
ctx->cqe_cached++;
185
return true;
186
}
187
188
static inline bool io_get_cqe(struct io_ring_ctx *ctx, struct io_uring_cqe **ret)
189
{
190
return io_get_cqe_overflow(ctx, ret, false);
191
}
192
193
static inline bool io_defer_get_uncommited_cqe(struct io_ring_ctx *ctx,
194
struct io_uring_cqe **cqe_ret)
195
{
196
io_lockdep_assert_cq_locked(ctx);
197
198
ctx->submit_state.cq_flush = true;
199
return io_get_cqe(ctx, cqe_ret);
200
}
201
202
static __always_inline bool io_fill_cqe_req(struct io_ring_ctx *ctx,
203
struct io_kiocb *req)
204
{
205
struct io_uring_cqe *cqe;
206
207
/*
208
* If we can't get a cq entry, userspace overflowed the
209
* submission (by quite a lot). Increment the overflow count in
210
* the ring.
211
*/
212
if (unlikely(!io_get_cqe(ctx, &cqe)))
213
return false;
214
215
216
memcpy(cqe, &req->cqe, sizeof(*cqe));
217
if (ctx->flags & IORING_SETUP_CQE32) {
218
memcpy(cqe->big_cqe, &req->big_cqe, sizeof(*cqe));
219
memset(&req->big_cqe, 0, sizeof(req->big_cqe));
220
}
221
222
if (trace_io_uring_complete_enabled())
223
trace_io_uring_complete(req->ctx, req, cqe);
224
return true;
225
}
226
227
static inline void req_set_fail(struct io_kiocb *req)
228
{
229
req->flags |= REQ_F_FAIL;
230
if (req->flags & REQ_F_CQE_SKIP) {
231
req->flags &= ~REQ_F_CQE_SKIP;
232
req->flags |= REQ_F_SKIP_LINK_CQES;
233
}
234
}
235
236
static inline void io_req_set_res(struct io_kiocb *req, s32 res, u32 cflags)
237
{
238
req->cqe.res = res;
239
req->cqe.flags = cflags;
240
}
241
242
static inline void *io_uring_alloc_async_data(struct io_alloc_cache *cache,
243
struct io_kiocb *req)
244
{
245
if (cache) {
246
req->async_data = io_cache_alloc(cache, GFP_KERNEL);
247
} else {
248
const struct io_issue_def *def = &io_issue_defs[req->opcode];
249
250
WARN_ON_ONCE(!def->async_size);
251
req->async_data = kmalloc(def->async_size, GFP_KERNEL);
252
}
253
if (req->async_data)
254
req->flags |= REQ_F_ASYNC_DATA;
255
return req->async_data;
256
}
257
258
static inline bool req_has_async_data(struct io_kiocb *req)
259
{
260
return req->flags & REQ_F_ASYNC_DATA;
261
}
262
263
static inline void io_put_file(struct io_kiocb *req)
264
{
265
if (!(req->flags & REQ_F_FIXED_FILE) && req->file)
266
fput(req->file);
267
}
268
269
static inline void io_ring_submit_unlock(struct io_ring_ctx *ctx,
270
unsigned issue_flags)
271
{
272
lockdep_assert_held(&ctx->uring_lock);
273
if (unlikely(issue_flags & IO_URING_F_UNLOCKED))
274
mutex_unlock(&ctx->uring_lock);
275
}
276
277
static inline void io_ring_submit_lock(struct io_ring_ctx *ctx,
278
unsigned issue_flags)
279
{
280
/*
281
* "Normal" inline submissions always hold the uring_lock, since we
282
* grab it from the system call. Same is true for the SQPOLL offload.
283
* The only exception is when we've detached the request and issue it
284
* from an async worker thread, grab the lock for that case.
285
*/
286
if (unlikely(issue_flags & IO_URING_F_UNLOCKED))
287
mutex_lock(&ctx->uring_lock);
288
lockdep_assert_held(&ctx->uring_lock);
289
}
290
291
static inline void io_commit_cqring(struct io_ring_ctx *ctx)
292
{
293
/* order cqe stores with ring update */
294
smp_store_release(&ctx->rings->cq.tail, ctx->cached_cq_tail);
295
}
296
297
static inline void __io_wq_wake(struct wait_queue_head *wq)
298
{
299
/*
300
*
301
* Pass in EPOLLIN|EPOLL_URING_WAKE as the poll wakeup key. The latter
302
* set in the mask so that if we recurse back into our own poll
303
* waitqueue handlers, we know we have a dependency between eventfd or
304
* epoll and should terminate multishot poll at that point.
305
*/
306
if (wq_has_sleeper(wq))
307
__wake_up(wq, TASK_NORMAL, 0, poll_to_key(EPOLL_URING_WAKE | EPOLLIN));
308
}
309
310
static inline void io_poll_wq_wake(struct io_ring_ctx *ctx)
311
{
312
__io_wq_wake(&ctx->poll_wq);
313
}
314
315
static inline void io_cqring_wake(struct io_ring_ctx *ctx)
316
{
317
/*
318
* Trigger waitqueue handler on all waiters on our waitqueue. This
319
* won't necessarily wake up all the tasks, io_should_wake() will make
320
* that decision.
321
*/
322
323
__io_wq_wake(&ctx->cq_wait);
324
}
325
326
static inline bool io_sqring_full(struct io_ring_ctx *ctx)
327
{
328
struct io_rings *r = ctx->rings;
329
330
/*
331
* SQPOLL must use the actual sqring head, as using the cached_sq_head
332
* is race prone if the SQPOLL thread has grabbed entries but not yet
333
* committed them to the ring. For !SQPOLL, this doesn't matter, but
334
* since this helper is just used for SQPOLL sqring waits (or POLLOUT),
335
* just read the actual sqring head unconditionally.
336
*/
337
return READ_ONCE(r->sq.tail) - READ_ONCE(r->sq.head) == ctx->sq_entries;
338
}
339
340
static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx)
341
{
342
struct io_rings *rings = ctx->rings;
343
unsigned int entries;
344
345
/* make sure SQ entry isn't read before tail */
346
entries = smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head;
347
return min(entries, ctx->sq_entries);
348
}
349
350
static inline int io_run_task_work(void)
351
{
352
bool ret = false;
353
354
/*
355
* Always check-and-clear the task_work notification signal. With how
356
* signaling works for task_work, we can find it set with nothing to
357
* run. We need to clear it for that case, like get_signal() does.
358
*/
359
if (test_thread_flag(TIF_NOTIFY_SIGNAL))
360
clear_notify_signal();
361
/*
362
* PF_IO_WORKER never returns to userspace, so check here if we have
363
* notify work that needs processing.
364
*/
365
if (current->flags & PF_IO_WORKER) {
366
if (test_thread_flag(TIF_NOTIFY_RESUME)) {
367
__set_current_state(TASK_RUNNING);
368
resume_user_mode_work(NULL);
369
}
370
if (current->io_uring) {
371
unsigned int count = 0;
372
373
__set_current_state(TASK_RUNNING);
374
tctx_task_work_run(current->io_uring, UINT_MAX, &count);
375
if (count)
376
ret = true;
377
}
378
}
379
if (task_work_pending(current)) {
380
__set_current_state(TASK_RUNNING);
381
task_work_run();
382
ret = true;
383
}
384
385
return ret;
386
}
387
388
static inline bool io_local_work_pending(struct io_ring_ctx *ctx)
389
{
390
return !llist_empty(&ctx->work_llist) || !llist_empty(&ctx->retry_llist);
391
}
392
393
static inline bool io_task_work_pending(struct io_ring_ctx *ctx)
394
{
395
return task_work_pending(current) || io_local_work_pending(ctx);
396
}
397
398
static inline void io_tw_lock(struct io_ring_ctx *ctx, io_tw_token_t tw)
399
{
400
lockdep_assert_held(&ctx->uring_lock);
401
}
402
403
/*
404
* Don't complete immediately but use deferred completion infrastructure.
405
* Protected by ->uring_lock and can only be used either with
406
* IO_URING_F_COMPLETE_DEFER or inside a tw handler holding the mutex.
407
*/
408
static inline void io_req_complete_defer(struct io_kiocb *req)
409
__must_hold(&req->ctx->uring_lock)
410
{
411
struct io_submit_state *state = &req->ctx->submit_state;
412
413
lockdep_assert_held(&req->ctx->uring_lock);
414
415
wq_list_add_tail(&req->comp_list, &state->compl_reqs);
416
}
417
418
static inline void io_commit_cqring_flush(struct io_ring_ctx *ctx)
419
{
420
if (unlikely(ctx->off_timeout_used ||
421
ctx->has_evfd || ctx->poll_activated))
422
__io_commit_cqring_flush(ctx);
423
}
424
425
static inline void io_get_task_refs(int nr)
426
{
427
struct io_uring_task *tctx = current->io_uring;
428
429
tctx->cached_refs -= nr;
430
if (unlikely(tctx->cached_refs < 0))
431
io_task_refs_refill(tctx);
432
}
433
434
static inline bool io_req_cache_empty(struct io_ring_ctx *ctx)
435
{
436
return !ctx->submit_state.free_list.next;
437
}
438
439
extern struct kmem_cache *req_cachep;
440
441
static inline struct io_kiocb *io_extract_req(struct io_ring_ctx *ctx)
442
{
443
struct io_kiocb *req;
444
445
req = container_of(ctx->submit_state.free_list.next, struct io_kiocb, comp_list);
446
wq_stack_extract(&ctx->submit_state.free_list);
447
return req;
448
}
449
450
static inline bool io_alloc_req(struct io_ring_ctx *ctx, struct io_kiocb **req)
451
{
452
if (unlikely(io_req_cache_empty(ctx))) {
453
if (!__io_alloc_req_refill(ctx))
454
return false;
455
}
456
*req = io_extract_req(ctx);
457
return true;
458
}
459
460
static inline bool io_allowed_defer_tw_run(struct io_ring_ctx *ctx)
461
{
462
return likely(ctx->submitter_task == current);
463
}
464
465
static inline bool io_allowed_run_tw(struct io_ring_ctx *ctx)
466
{
467
return likely(!(ctx->flags & IORING_SETUP_DEFER_TASKRUN) ||
468
ctx->submitter_task == current);
469
}
470
471
/*
472
* Terminate the request if either of these conditions are true:
473
*
474
* 1) It's being executed by the original task, but that task is marked
475
* with PF_EXITING as it's exiting.
476
* 2) PF_KTHREAD is set, in which case the invoker of the task_work is
477
* our fallback task_work.
478
*/
479
static inline bool io_should_terminate_tw(void)
480
{
481
return current->flags & (PF_KTHREAD | PF_EXITING);
482
}
483
484
static inline void io_req_queue_tw_complete(struct io_kiocb *req, s32 res)
485
{
486
io_req_set_res(req, res, 0);
487
req->io_task_work.func = io_req_task_complete;
488
io_req_task_work_add(req);
489
}
490
491
/*
492
* IORING_SETUP_SQE128 contexts allocate twice the normal SQE size for each
493
* slot.
494
*/
495
static inline size_t uring_sqe_size(struct io_ring_ctx *ctx)
496
{
497
if (ctx->flags & IORING_SETUP_SQE128)
498
return 2 * sizeof(struct io_uring_sqe);
499
return sizeof(struct io_uring_sqe);
500
}
501
502
static inline bool io_file_can_poll(struct io_kiocb *req)
503
{
504
if (req->flags & REQ_F_CAN_POLL)
505
return true;
506
if (req->file && file_can_poll(req->file)) {
507
req->flags |= REQ_F_CAN_POLL;
508
return true;
509
}
510
return false;
511
}
512
513
static inline ktime_t io_get_time(struct io_ring_ctx *ctx)
514
{
515
if (ctx->clockid == CLOCK_MONOTONIC)
516
return ktime_get();
517
518
return ktime_get_with_offset(ctx->clock_offset);
519
}
520
521
enum {
522
IO_CHECK_CQ_OVERFLOW_BIT,
523
IO_CHECK_CQ_DROPPED_BIT,
524
};
525
526
static inline bool io_has_work(struct io_ring_ctx *ctx)
527
{
528
return test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq) ||
529
io_local_work_pending(ctx);
530
}
531
#endif
532
533