Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/io_uring/io-wq.c
50682 views
1
// SPDX-License-Identifier: GPL-2.0
2
/*
3
* Basic worker thread pool for io_uring
4
*
5
* Copyright (C) 2019 Jens Axboe
6
*
7
*/
8
#include <linux/kernel.h>
9
#include <linux/init.h>
10
#include <linux/errno.h>
11
#include <linux/sched/signal.h>
12
#include <linux/percpu.h>
13
#include <linux/slab.h>
14
#include <linux/rculist_nulls.h>
15
#include <linux/cpu.h>
16
#include <linux/cpuset.h>
17
#include <linux/task_work.h>
18
#include <linux/audit.h>
19
#include <linux/mmu_context.h>
20
#include <linux/sched/sysctl.h>
21
#include <uapi/linux/io_uring.h>
22
23
#include "io-wq.h"
24
#include "slist.h"
25
#include "io_uring.h"
26
27
#define WORKER_IDLE_TIMEOUT (5 * HZ)
28
#define WORKER_INIT_LIMIT 3
29
30
enum {
31
IO_WORKER_F_UP = 0, /* up and active */
32
IO_WORKER_F_RUNNING = 1, /* account as running */
33
IO_WORKER_F_FREE = 2, /* worker on free list */
34
};
35
36
enum {
37
IO_WQ_BIT_EXIT = 0, /* wq exiting */
38
IO_WQ_BIT_EXIT_ON_IDLE = 1, /* allow all workers to exit on idle */
39
};
40
41
enum {
42
IO_ACCT_STALLED_BIT = 0, /* stalled on hash */
43
};
44
45
/*
46
* One for each thread in a wq pool
47
*/
48
struct io_worker {
49
refcount_t ref;
50
unsigned long flags;
51
struct hlist_nulls_node nulls_node;
52
struct list_head all_list;
53
struct task_struct *task;
54
struct io_wq *wq;
55
struct io_wq_acct *acct;
56
57
struct io_wq_work *cur_work;
58
raw_spinlock_t lock;
59
60
struct completion ref_done;
61
62
unsigned long create_state;
63
struct callback_head create_work;
64
int init_retries;
65
66
union {
67
struct rcu_head rcu;
68
struct delayed_work work;
69
};
70
};
71
72
#if BITS_PER_LONG == 64
73
#define IO_WQ_HASH_ORDER 6
74
#else
75
#define IO_WQ_HASH_ORDER 5
76
#endif
77
78
#define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER)
79
80
struct io_wq_acct {
81
/**
82
* Protects access to the worker lists.
83
*/
84
raw_spinlock_t workers_lock;
85
86
unsigned nr_workers;
87
unsigned max_workers;
88
atomic_t nr_running;
89
90
/**
91
* The list of free workers. Protected by #workers_lock
92
* (write) and RCU (read).
93
*/
94
struct hlist_nulls_head free_list;
95
96
/**
97
* The list of all workers. Protected by #workers_lock
98
* (write) and RCU (read).
99
*/
100
struct list_head all_list;
101
102
raw_spinlock_t lock;
103
struct io_wq_work_list work_list;
104
unsigned long flags;
105
};
106
107
enum {
108
IO_WQ_ACCT_BOUND,
109
IO_WQ_ACCT_UNBOUND,
110
IO_WQ_ACCT_NR,
111
};
112
113
/*
114
* Per io_wq state
115
*/
116
struct io_wq {
117
unsigned long state;
118
119
struct io_wq_hash *hash;
120
121
atomic_t worker_refs;
122
struct completion worker_done;
123
124
struct hlist_node cpuhp_node;
125
126
struct task_struct *task;
127
128
struct io_wq_acct acct[IO_WQ_ACCT_NR];
129
130
struct wait_queue_entry wait;
131
132
struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
133
134
cpumask_var_t cpu_mask;
135
};
136
137
static enum cpuhp_state io_wq_online;
138
139
struct io_cb_cancel_data {
140
work_cancel_fn *fn;
141
void *data;
142
int nr_running;
143
int nr_pending;
144
bool cancel_all;
145
};
146
147
static bool create_io_worker(struct io_wq *wq, struct io_wq_acct *acct);
148
static void io_wq_dec_running(struct io_worker *worker);
149
static bool io_acct_cancel_pending_work(struct io_wq *wq,
150
struct io_wq_acct *acct,
151
struct io_cb_cancel_data *match);
152
static void create_worker_cb(struct callback_head *cb);
153
static void io_wq_cancel_tw_create(struct io_wq *wq);
154
155
static inline unsigned int __io_get_work_hash(unsigned int work_flags)
156
{
157
return work_flags >> IO_WQ_HASH_SHIFT;
158
}
159
160
static inline unsigned int io_get_work_hash(struct io_wq_work *work)
161
{
162
return __io_get_work_hash(atomic_read(&work->flags));
163
}
164
165
static bool io_worker_get(struct io_worker *worker)
166
{
167
return refcount_inc_not_zero(&worker->ref);
168
}
169
170
static void io_worker_release(struct io_worker *worker)
171
{
172
if (refcount_dec_and_test(&worker->ref))
173
complete(&worker->ref_done);
174
}
175
176
static inline struct io_wq_acct *io_get_acct(struct io_wq *wq, bool bound)
177
{
178
return &wq->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND];
179
}
180
181
static inline struct io_wq_acct *io_work_get_acct(struct io_wq *wq,
182
unsigned int work_flags)
183
{
184
return io_get_acct(wq, !(work_flags & IO_WQ_WORK_UNBOUND));
185
}
186
187
static inline struct io_wq_acct *io_wq_get_acct(struct io_worker *worker)
188
{
189
return worker->acct;
190
}
191
192
static void io_worker_ref_put(struct io_wq *wq)
193
{
194
if (atomic_dec_and_test(&wq->worker_refs))
195
complete(&wq->worker_done);
196
}
197
198
bool io_wq_worker_stopped(void)
199
{
200
struct io_worker *worker = current->worker_private;
201
202
if (WARN_ON_ONCE(!io_wq_current_is_worker()))
203
return true;
204
205
return test_bit(IO_WQ_BIT_EXIT, &worker->wq->state);
206
}
207
208
static void io_worker_cancel_cb(struct io_worker *worker)
209
{
210
struct io_wq_acct *acct = io_wq_get_acct(worker);
211
struct io_wq *wq = worker->wq;
212
213
atomic_dec(&acct->nr_running);
214
raw_spin_lock(&acct->workers_lock);
215
acct->nr_workers--;
216
raw_spin_unlock(&acct->workers_lock);
217
io_worker_ref_put(wq);
218
clear_bit_unlock(0, &worker->create_state);
219
io_worker_release(worker);
220
}
221
222
static bool io_task_worker_match(struct callback_head *cb, void *data)
223
{
224
struct io_worker *worker;
225
226
if (cb->func != create_worker_cb)
227
return false;
228
worker = container_of(cb, struct io_worker, create_work);
229
return worker == data;
230
}
231
232
static void io_worker_exit(struct io_worker *worker)
233
{
234
struct io_wq *wq = worker->wq;
235
struct io_wq_acct *acct = io_wq_get_acct(worker);
236
237
while (1) {
238
struct callback_head *cb = task_work_cancel_match(wq->task,
239
io_task_worker_match, worker);
240
241
if (!cb)
242
break;
243
io_worker_cancel_cb(worker);
244
}
245
246
io_worker_release(worker);
247
wait_for_completion(&worker->ref_done);
248
249
raw_spin_lock(&acct->workers_lock);
250
if (test_bit(IO_WORKER_F_FREE, &worker->flags))
251
hlist_nulls_del_rcu(&worker->nulls_node);
252
list_del_rcu(&worker->all_list);
253
raw_spin_unlock(&acct->workers_lock);
254
io_wq_dec_running(worker);
255
/*
256
* this worker is a goner, clear ->worker_private to avoid any
257
* inc/dec running calls that could happen as part of exit from
258
* touching 'worker'.
259
*/
260
current->worker_private = NULL;
261
262
kfree_rcu(worker, rcu);
263
io_worker_ref_put(wq);
264
do_exit(0);
265
}
266
267
static inline bool __io_acct_run_queue(struct io_wq_acct *acct)
268
{
269
return !test_bit(IO_ACCT_STALLED_BIT, &acct->flags) &&
270
!wq_list_empty(&acct->work_list);
271
}
272
273
/*
274
* If there's work to do, returns true with acct->lock acquired. If not,
275
* returns false with no lock held.
276
*/
277
static inline bool io_acct_run_queue(struct io_wq_acct *acct)
278
__acquires(&acct->lock)
279
{
280
raw_spin_lock(&acct->lock);
281
if (__io_acct_run_queue(acct))
282
return true;
283
284
raw_spin_unlock(&acct->lock);
285
return false;
286
}
287
288
/*
289
* Check head of free list for an available worker. If one isn't available,
290
* caller must create one.
291
*/
292
static bool io_acct_activate_free_worker(struct io_wq_acct *acct)
293
__must_hold(RCU)
294
{
295
struct hlist_nulls_node *n;
296
struct io_worker *worker;
297
298
/*
299
* Iterate free_list and see if we can find an idle worker to
300
* activate. If a given worker is on the free_list but in the process
301
* of exiting, keep trying.
302
*/
303
hlist_nulls_for_each_entry_rcu(worker, n, &acct->free_list, nulls_node) {
304
if (!io_worker_get(worker))
305
continue;
306
/*
307
* If the worker is already running, it's either already
308
* starting work or finishing work. In either case, if it does
309
* to go sleep, we'll kick off a new task for this work anyway.
310
*/
311
wake_up_process(worker->task);
312
io_worker_release(worker);
313
return true;
314
}
315
316
return false;
317
}
318
319
/*
320
* We need a worker. If we find a free one, we're good. If not, and we're
321
* below the max number of workers, create one.
322
*/
323
static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct)
324
{
325
/*
326
* Most likely an attempt to queue unbounded work on an io_wq that
327
* wasn't setup with any unbounded workers.
328
*/
329
if (unlikely(!acct->max_workers))
330
pr_warn_once("io-wq is not configured for unbound workers");
331
332
raw_spin_lock(&acct->workers_lock);
333
if (acct->nr_workers >= acct->max_workers) {
334
raw_spin_unlock(&acct->workers_lock);
335
return true;
336
}
337
acct->nr_workers++;
338
raw_spin_unlock(&acct->workers_lock);
339
atomic_inc(&acct->nr_running);
340
atomic_inc(&wq->worker_refs);
341
return create_io_worker(wq, acct);
342
}
343
344
static void io_wq_inc_running(struct io_worker *worker)
345
{
346
struct io_wq_acct *acct = io_wq_get_acct(worker);
347
348
atomic_inc(&acct->nr_running);
349
}
350
351
static void create_worker_cb(struct callback_head *cb)
352
{
353
struct io_worker *worker;
354
struct io_wq *wq;
355
356
struct io_wq_acct *acct;
357
bool activated_free_worker, do_create = false;
358
359
worker = container_of(cb, struct io_worker, create_work);
360
wq = worker->wq;
361
acct = worker->acct;
362
363
rcu_read_lock();
364
activated_free_worker = io_acct_activate_free_worker(acct);
365
rcu_read_unlock();
366
if (activated_free_worker)
367
goto no_need_create;
368
369
raw_spin_lock(&acct->workers_lock);
370
371
if (acct->nr_workers < acct->max_workers) {
372
acct->nr_workers++;
373
do_create = true;
374
}
375
raw_spin_unlock(&acct->workers_lock);
376
if (do_create) {
377
create_io_worker(wq, acct);
378
} else {
379
no_need_create:
380
atomic_dec(&acct->nr_running);
381
io_worker_ref_put(wq);
382
}
383
clear_bit_unlock(0, &worker->create_state);
384
io_worker_release(worker);
385
}
386
387
static bool io_queue_worker_create(struct io_worker *worker,
388
struct io_wq_acct *acct,
389
task_work_func_t func)
390
{
391
struct io_wq *wq = worker->wq;
392
393
/* raced with exit, just ignore create call */
394
if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
395
goto fail;
396
if (!io_worker_get(worker))
397
goto fail;
398
/*
399
* create_state manages ownership of create_work/index. We should
400
* only need one entry per worker, as the worker going to sleep
401
* will trigger the condition, and waking will clear it once it
402
* runs the task_work.
403
*/
404
if (test_bit(0, &worker->create_state) ||
405
test_and_set_bit_lock(0, &worker->create_state))
406
goto fail_release;
407
408
atomic_inc(&wq->worker_refs);
409
init_task_work(&worker->create_work, func);
410
if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) {
411
/*
412
* EXIT may have been set after checking it above, check after
413
* adding the task_work and remove any creation item if it is
414
* now set. wq exit does that too, but we can have added this
415
* work item after we canceled in io_wq_exit_workers().
416
*/
417
if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
418
io_wq_cancel_tw_create(wq);
419
io_worker_ref_put(wq);
420
return true;
421
}
422
io_worker_ref_put(wq);
423
clear_bit_unlock(0, &worker->create_state);
424
fail_release:
425
io_worker_release(worker);
426
fail:
427
atomic_dec(&acct->nr_running);
428
io_worker_ref_put(wq);
429
return false;
430
}
431
432
/* Defer if current and next work are both hashed to the same chain */
433
static bool io_wq_hash_defer(struct io_wq_work *work, struct io_wq_acct *acct)
434
{
435
unsigned int hash, work_flags;
436
struct io_wq_work *next;
437
438
lockdep_assert_held(&acct->lock);
439
440
work_flags = atomic_read(&work->flags);
441
if (!__io_wq_is_hashed(work_flags))
442
return false;
443
444
/* should not happen, io_acct_run_queue() said we had work */
445
if (wq_list_empty(&acct->work_list))
446
return true;
447
448
hash = __io_get_work_hash(work_flags);
449
next = container_of(acct->work_list.first, struct io_wq_work, list);
450
work_flags = atomic_read(&next->flags);
451
if (!__io_wq_is_hashed(work_flags))
452
return false;
453
return hash == __io_get_work_hash(work_flags);
454
}
455
456
static void io_wq_dec_running(struct io_worker *worker)
457
{
458
struct io_wq_acct *acct = io_wq_get_acct(worker);
459
struct io_wq *wq = worker->wq;
460
461
if (!test_bit(IO_WORKER_F_UP, &worker->flags))
462
return;
463
464
if (!atomic_dec_and_test(&acct->nr_running))
465
return;
466
if (!worker->cur_work)
467
return;
468
if (!io_acct_run_queue(acct))
469
return;
470
if (io_wq_hash_defer(worker->cur_work, acct)) {
471
raw_spin_unlock(&acct->lock);
472
return;
473
}
474
475
raw_spin_unlock(&acct->lock);
476
atomic_inc(&acct->nr_running);
477
atomic_inc(&wq->worker_refs);
478
io_queue_worker_create(worker, acct, create_worker_cb);
479
}
480
481
/*
482
* Worker will start processing some work. Move it to the busy list, if
483
* it's currently on the freelist
484
*/
485
static void __io_worker_busy(struct io_wq_acct *acct, struct io_worker *worker)
486
{
487
if (test_bit(IO_WORKER_F_FREE, &worker->flags)) {
488
clear_bit(IO_WORKER_F_FREE, &worker->flags);
489
raw_spin_lock(&acct->workers_lock);
490
hlist_nulls_del_init_rcu(&worker->nulls_node);
491
raw_spin_unlock(&acct->workers_lock);
492
}
493
}
494
495
/*
496
* No work, worker going to sleep. Move to freelist.
497
*/
498
static void __io_worker_idle(struct io_wq_acct *acct, struct io_worker *worker)
499
__must_hold(acct->workers_lock)
500
{
501
if (!test_bit(IO_WORKER_F_FREE, &worker->flags)) {
502
set_bit(IO_WORKER_F_FREE, &worker->flags);
503
hlist_nulls_add_head_rcu(&worker->nulls_node, &acct->free_list);
504
}
505
}
506
507
static bool io_wait_on_hash(struct io_wq *wq, unsigned int hash)
508
{
509
bool ret = false;
510
511
spin_lock_irq(&wq->hash->wait.lock);
512
if (list_empty(&wq->wait.entry)) {
513
__add_wait_queue(&wq->hash->wait, &wq->wait);
514
if (!test_bit(hash, &wq->hash->map)) {
515
__set_current_state(TASK_RUNNING);
516
list_del_init(&wq->wait.entry);
517
ret = true;
518
}
519
}
520
spin_unlock_irq(&wq->hash->wait.lock);
521
return ret;
522
}
523
524
static struct io_wq_work *io_get_next_work(struct io_wq_acct *acct,
525
struct io_wq *wq)
526
__must_hold(acct->lock)
527
{
528
struct io_wq_work_node *node, *prev;
529
struct io_wq_work *work, *tail;
530
unsigned int stall_hash = -1U;
531
532
wq_list_for_each(node, prev, &acct->work_list) {
533
unsigned int work_flags;
534
unsigned int hash;
535
536
work = container_of(node, struct io_wq_work, list);
537
538
/* not hashed, can run anytime */
539
work_flags = atomic_read(&work->flags);
540
if (!__io_wq_is_hashed(work_flags)) {
541
wq_list_del(&acct->work_list, node, prev);
542
return work;
543
}
544
545
hash = __io_get_work_hash(work_flags);
546
/* all items with this hash lie in [work, tail] */
547
tail = wq->hash_tail[hash];
548
549
/* hashed, can run if not already running */
550
if (!test_and_set_bit(hash, &wq->hash->map)) {
551
wq->hash_tail[hash] = NULL;
552
wq_list_cut(&acct->work_list, &tail->list, prev);
553
return work;
554
}
555
if (stall_hash == -1U)
556
stall_hash = hash;
557
/* fast forward to a next hash, for-each will fix up @prev */
558
node = &tail->list;
559
}
560
561
if (stall_hash != -1U) {
562
bool unstalled;
563
564
/*
565
* Set this before dropping the lock to avoid racing with new
566
* work being added and clearing the stalled bit.
567
*/
568
set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
569
raw_spin_unlock(&acct->lock);
570
unstalled = io_wait_on_hash(wq, stall_hash);
571
raw_spin_lock(&acct->lock);
572
if (unstalled) {
573
clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
574
if (wq_has_sleeper(&wq->hash->wait))
575
wake_up(&wq->hash->wait);
576
}
577
}
578
579
return NULL;
580
}
581
582
static void io_assign_current_work(struct io_worker *worker,
583
struct io_wq_work *work)
584
{
585
if (work) {
586
io_run_task_work();
587
cond_resched();
588
}
589
590
raw_spin_lock(&worker->lock);
591
worker->cur_work = work;
592
raw_spin_unlock(&worker->lock);
593
}
594
595
/*
596
* Called with acct->lock held, drops it before returning
597
*/
598
static void io_worker_handle_work(struct io_wq_acct *acct,
599
struct io_worker *worker)
600
__releases(&acct->lock)
601
{
602
struct io_wq *wq = worker->wq;
603
604
do {
605
bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
606
struct io_wq_work *work;
607
608
/*
609
* If we got some work, mark us as busy. If we didn't, but
610
* the list isn't empty, it means we stalled on hashed work.
611
* Mark us stalled so we don't keep looking for work when we
612
* can't make progress, any work completion or insertion will
613
* clear the stalled flag.
614
*/
615
work = io_get_next_work(acct, wq);
616
if (work) {
617
/*
618
* Make sure cancelation can find this, even before
619
* it becomes the active work. That avoids a window
620
* where the work has been removed from our general
621
* work list, but isn't yet discoverable as the
622
* current work item for this worker.
623
*/
624
raw_spin_lock(&worker->lock);
625
worker->cur_work = work;
626
raw_spin_unlock(&worker->lock);
627
}
628
629
raw_spin_unlock(&acct->lock);
630
631
if (!work)
632
break;
633
634
__io_worker_busy(acct, worker);
635
636
io_assign_current_work(worker, work);
637
__set_current_state(TASK_RUNNING);
638
639
/* handle a whole dependent link */
640
do {
641
struct io_wq_work *next_hashed, *linked;
642
unsigned int work_flags = atomic_read(&work->flags);
643
unsigned int hash = __io_wq_is_hashed(work_flags)
644
? __io_get_work_hash(work_flags)
645
: -1U;
646
647
next_hashed = wq_next_work(work);
648
649
if (do_kill &&
650
(work_flags & IO_WQ_WORK_UNBOUND))
651
atomic_or(IO_WQ_WORK_CANCEL, &work->flags);
652
io_wq_submit_work(work);
653
io_assign_current_work(worker, NULL);
654
655
linked = io_wq_free_work(work);
656
work = next_hashed;
657
if (!work && linked && !io_wq_is_hashed(linked)) {
658
work = linked;
659
linked = NULL;
660
}
661
io_assign_current_work(worker, work);
662
if (linked)
663
io_wq_enqueue(wq, linked);
664
665
if (hash != -1U && !next_hashed) {
666
/* serialize hash clear with wake_up() */
667
spin_lock_irq(&wq->hash->wait.lock);
668
clear_bit(hash, &wq->hash->map);
669
clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
670
spin_unlock_irq(&wq->hash->wait.lock);
671
if (wq_has_sleeper(&wq->hash->wait))
672
wake_up(&wq->hash->wait);
673
}
674
} while (work);
675
676
if (!__io_acct_run_queue(acct))
677
break;
678
raw_spin_lock(&acct->lock);
679
} while (1);
680
}
681
682
static int io_wq_worker(void *data)
683
{
684
struct io_worker *worker = data;
685
struct io_wq_acct *acct = io_wq_get_acct(worker);
686
struct io_wq *wq = worker->wq;
687
bool exit_mask = false, last_timeout = false;
688
char buf[TASK_COMM_LEN] = {};
689
690
set_mask_bits(&worker->flags, 0,
691
BIT(IO_WORKER_F_UP) | BIT(IO_WORKER_F_RUNNING));
692
693
snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid);
694
set_task_comm(current, buf);
695
696
while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
697
long ret;
698
699
set_current_state(TASK_INTERRUPTIBLE);
700
701
/*
702
* If we have work to do, io_acct_run_queue() returns with
703
* the acct->lock held. If not, it will drop it.
704
*/
705
while (io_acct_run_queue(acct))
706
io_worker_handle_work(acct, worker);
707
708
raw_spin_lock(&acct->workers_lock);
709
/*
710
* Last sleep timed out. Exit if we're not the last worker,
711
* or if someone modified our affinity. If wq is marked
712
* idle-exit, drop the worker as well. This is used to avoid
713
* keeping io-wq workers around for tasks that no longer have
714
* any active io_uring instances.
715
*/
716
if ((last_timeout && (exit_mask || acct->nr_workers > 1)) ||
717
test_bit(IO_WQ_BIT_EXIT_ON_IDLE, &wq->state)) {
718
acct->nr_workers--;
719
raw_spin_unlock(&acct->workers_lock);
720
__set_current_state(TASK_RUNNING);
721
break;
722
}
723
last_timeout = false;
724
__io_worker_idle(acct, worker);
725
raw_spin_unlock(&acct->workers_lock);
726
if (io_run_task_work())
727
continue;
728
ret = schedule_timeout(WORKER_IDLE_TIMEOUT);
729
if (signal_pending(current)) {
730
struct ksignal ksig;
731
732
if (!get_signal(&ksig))
733
continue;
734
break;
735
}
736
if (!ret) {
737
last_timeout = true;
738
exit_mask = !cpumask_test_cpu(raw_smp_processor_id(),
739
wq->cpu_mask);
740
}
741
}
742
743
if (test_bit(IO_WQ_BIT_EXIT, &wq->state) && io_acct_run_queue(acct))
744
io_worker_handle_work(acct, worker);
745
746
io_worker_exit(worker);
747
return 0;
748
}
749
750
/*
751
* Called when a worker is scheduled in. Mark us as currently running.
752
*/
753
void io_wq_worker_running(struct task_struct *tsk)
754
{
755
struct io_worker *worker = tsk->worker_private;
756
757
if (!worker)
758
return;
759
if (!test_bit(IO_WORKER_F_UP, &worker->flags))
760
return;
761
if (test_bit(IO_WORKER_F_RUNNING, &worker->flags))
762
return;
763
set_bit(IO_WORKER_F_RUNNING, &worker->flags);
764
io_wq_inc_running(worker);
765
}
766
767
/*
768
* Called when worker is going to sleep. If there are no workers currently
769
* running and we have work pending, wake up a free one or create a new one.
770
*/
771
void io_wq_worker_sleeping(struct task_struct *tsk)
772
{
773
struct io_worker *worker = tsk->worker_private;
774
775
if (!worker)
776
return;
777
if (!test_bit(IO_WORKER_F_UP, &worker->flags))
778
return;
779
if (!test_bit(IO_WORKER_F_RUNNING, &worker->flags))
780
return;
781
782
clear_bit(IO_WORKER_F_RUNNING, &worker->flags);
783
io_wq_dec_running(worker);
784
}
785
786
static void io_init_new_worker(struct io_wq *wq, struct io_wq_acct *acct, struct io_worker *worker,
787
struct task_struct *tsk)
788
{
789
tsk->worker_private = worker;
790
worker->task = tsk;
791
set_cpus_allowed_ptr(tsk, wq->cpu_mask);
792
793
raw_spin_lock(&acct->workers_lock);
794
hlist_nulls_add_head_rcu(&worker->nulls_node, &acct->free_list);
795
list_add_tail_rcu(&worker->all_list, &acct->all_list);
796
set_bit(IO_WORKER_F_FREE, &worker->flags);
797
raw_spin_unlock(&acct->workers_lock);
798
wake_up_new_task(tsk);
799
}
800
801
static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
802
{
803
return true;
804
}
805
806
static inline bool io_should_retry_thread(struct io_worker *worker, long err)
807
{
808
/*
809
* Prevent perpetual task_work retry, if the task (or its group) is
810
* exiting.
811
*/
812
if (fatal_signal_pending(current))
813
return false;
814
815
worker->init_retries++;
816
switch (err) {
817
case -EAGAIN:
818
return worker->init_retries <= WORKER_INIT_LIMIT;
819
/* Analogous to a fork() syscall, always retry on a restartable error */
820
case -ERESTARTSYS:
821
case -ERESTARTNOINTR:
822
case -ERESTARTNOHAND:
823
return true;
824
default:
825
return false;
826
}
827
}
828
829
static void queue_create_worker_retry(struct io_worker *worker)
830
{
831
/*
832
* We only bother retrying because there's a chance that the
833
* failure to create a worker is due to some temporary condition
834
* in the forking task (e.g. outstanding signal); give the task
835
* some time to clear that condition.
836
*/
837
schedule_delayed_work(&worker->work,
838
msecs_to_jiffies(worker->init_retries * 5));
839
}
840
841
static void create_worker_cont(struct callback_head *cb)
842
{
843
struct io_worker *worker;
844
struct task_struct *tsk;
845
struct io_wq *wq;
846
struct io_wq_acct *acct;
847
848
worker = container_of(cb, struct io_worker, create_work);
849
clear_bit_unlock(0, &worker->create_state);
850
wq = worker->wq;
851
acct = io_wq_get_acct(worker);
852
tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE);
853
if (!IS_ERR(tsk)) {
854
io_init_new_worker(wq, acct, worker, tsk);
855
io_worker_release(worker);
856
return;
857
} else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) {
858
atomic_dec(&acct->nr_running);
859
raw_spin_lock(&acct->workers_lock);
860
acct->nr_workers--;
861
if (!acct->nr_workers) {
862
struct io_cb_cancel_data match = {
863
.fn = io_wq_work_match_all,
864
.cancel_all = true,
865
};
866
867
raw_spin_unlock(&acct->workers_lock);
868
while (io_acct_cancel_pending_work(wq, acct, &match))
869
;
870
} else {
871
raw_spin_unlock(&acct->workers_lock);
872
}
873
io_worker_ref_put(wq);
874
kfree(worker);
875
return;
876
}
877
878
/* re-create attempts grab a new worker ref, drop the existing one */
879
io_worker_release(worker);
880
queue_create_worker_retry(worker);
881
}
882
883
static void io_workqueue_create(struct work_struct *work)
884
{
885
struct io_worker *worker = container_of(work, struct io_worker,
886
work.work);
887
struct io_wq_acct *acct = io_wq_get_acct(worker);
888
889
if (!io_queue_worker_create(worker, acct, create_worker_cont))
890
kfree(worker);
891
}
892
893
static bool create_io_worker(struct io_wq *wq, struct io_wq_acct *acct)
894
{
895
struct io_worker *worker;
896
struct task_struct *tsk;
897
898
__set_current_state(TASK_RUNNING);
899
900
worker = kzalloc(sizeof(*worker), GFP_KERNEL);
901
if (!worker) {
902
fail:
903
atomic_dec(&acct->nr_running);
904
raw_spin_lock(&acct->workers_lock);
905
acct->nr_workers--;
906
raw_spin_unlock(&acct->workers_lock);
907
io_worker_ref_put(wq);
908
return false;
909
}
910
911
refcount_set(&worker->ref, 1);
912
worker->wq = wq;
913
worker->acct = acct;
914
raw_spin_lock_init(&worker->lock);
915
init_completion(&worker->ref_done);
916
917
tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE);
918
if (!IS_ERR(tsk)) {
919
io_init_new_worker(wq, acct, worker, tsk);
920
} else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) {
921
kfree(worker);
922
goto fail;
923
} else {
924
INIT_DELAYED_WORK(&worker->work, io_workqueue_create);
925
queue_create_worker_retry(worker);
926
}
927
928
return true;
929
}
930
931
/*
932
* Iterate the passed in list and call the specific function for each
933
* worker that isn't exiting
934
*/
935
static bool io_acct_for_each_worker(struct io_wq_acct *acct,
936
bool (*func)(struct io_worker *, void *),
937
void *data)
938
{
939
struct io_worker *worker;
940
bool ret = false;
941
942
list_for_each_entry_rcu(worker, &acct->all_list, all_list) {
943
if (io_worker_get(worker)) {
944
/* no task if node is/was offline */
945
if (worker->task)
946
ret = func(worker, data);
947
io_worker_release(worker);
948
if (ret)
949
break;
950
}
951
}
952
953
return ret;
954
}
955
956
static void io_wq_for_each_worker(struct io_wq *wq,
957
bool (*func)(struct io_worker *, void *),
958
void *data)
959
{
960
for (int i = 0; i < IO_WQ_ACCT_NR; i++)
961
if (io_acct_for_each_worker(&wq->acct[i], func, data))
962
break;
963
}
964
965
static bool io_wq_worker_wake(struct io_worker *worker, void *data)
966
{
967
__set_notify_signal(worker->task);
968
wake_up_process(worker->task);
969
return false;
970
}
971
972
void io_wq_set_exit_on_idle(struct io_wq *wq, bool enable)
973
{
974
if (!wq->task)
975
return;
976
977
if (!enable) {
978
clear_bit(IO_WQ_BIT_EXIT_ON_IDLE, &wq->state);
979
return;
980
}
981
982
if (test_and_set_bit(IO_WQ_BIT_EXIT_ON_IDLE, &wq->state))
983
return;
984
985
rcu_read_lock();
986
io_wq_for_each_worker(wq, io_wq_worker_wake, NULL);
987
rcu_read_unlock();
988
}
989
990
static void io_run_cancel(struct io_wq_work *work, struct io_wq *wq)
991
{
992
do {
993
atomic_or(IO_WQ_WORK_CANCEL, &work->flags);
994
io_wq_submit_work(work);
995
work = io_wq_free_work(work);
996
} while (work);
997
}
998
999
static void io_wq_insert_work(struct io_wq *wq, struct io_wq_acct *acct,
1000
struct io_wq_work *work, unsigned int work_flags)
1001
{
1002
unsigned int hash;
1003
struct io_wq_work *tail;
1004
1005
if (!__io_wq_is_hashed(work_flags)) {
1006
append:
1007
wq_list_add_tail(&work->list, &acct->work_list);
1008
return;
1009
}
1010
1011
hash = __io_get_work_hash(work_flags);
1012
tail = wq->hash_tail[hash];
1013
wq->hash_tail[hash] = work;
1014
if (!tail)
1015
goto append;
1016
1017
wq_list_add_after(&work->list, &tail->list, &acct->work_list);
1018
}
1019
1020
static bool io_wq_work_match_item(struct io_wq_work *work, void *data)
1021
{
1022
return work == data;
1023
}
1024
1025
void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
1026
{
1027
unsigned int work_flags = atomic_read(&work->flags);
1028
struct io_wq_acct *acct = io_work_get_acct(wq, work_flags);
1029
struct io_cb_cancel_data match = {
1030
.fn = io_wq_work_match_item,
1031
.data = work,
1032
.cancel_all = false,
1033
};
1034
bool do_create;
1035
1036
/*
1037
* If io-wq is exiting for this task, or if the request has explicitly
1038
* been marked as one that should not get executed, cancel it here.
1039
*/
1040
if (test_bit(IO_WQ_BIT_EXIT, &wq->state) ||
1041
(work_flags & IO_WQ_WORK_CANCEL)) {
1042
io_run_cancel(work, wq);
1043
return;
1044
}
1045
1046
raw_spin_lock(&acct->lock);
1047
io_wq_insert_work(wq, acct, work, work_flags);
1048
clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
1049
raw_spin_unlock(&acct->lock);
1050
1051
rcu_read_lock();
1052
do_create = !io_acct_activate_free_worker(acct);
1053
rcu_read_unlock();
1054
1055
if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) ||
1056
!atomic_read(&acct->nr_running))) {
1057
bool did_create;
1058
1059
did_create = io_wq_create_worker(wq, acct);
1060
if (likely(did_create))
1061
return;
1062
1063
raw_spin_lock(&acct->workers_lock);
1064
if (acct->nr_workers) {
1065
raw_spin_unlock(&acct->workers_lock);
1066
return;
1067
}
1068
raw_spin_unlock(&acct->workers_lock);
1069
1070
/* fatal condition, failed to create the first worker */
1071
io_acct_cancel_pending_work(wq, acct, &match);
1072
}
1073
}
1074
1075
/*
1076
* Work items that hash to the same value will not be done in parallel.
1077
* Used to limit concurrent writes, generally hashed by inode.
1078
*/
1079
void io_wq_hash_work(struct io_wq_work *work, void *val)
1080
{
1081
unsigned int bit;
1082
1083
bit = hash_ptr(val, IO_WQ_HASH_ORDER);
1084
atomic_or(IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT), &work->flags);
1085
}
1086
1087
static bool __io_wq_worker_cancel(struct io_worker *worker,
1088
struct io_cb_cancel_data *match,
1089
struct io_wq_work *work)
1090
{
1091
if (work && match->fn(work, match->data)) {
1092
atomic_or(IO_WQ_WORK_CANCEL, &work->flags);
1093
__set_notify_signal(worker->task);
1094
return true;
1095
}
1096
1097
return false;
1098
}
1099
1100
static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
1101
{
1102
struct io_cb_cancel_data *match = data;
1103
1104
/*
1105
* Hold the lock to avoid ->cur_work going out of scope, caller
1106
* may dereference the passed in work.
1107
*/
1108
raw_spin_lock(&worker->lock);
1109
if (__io_wq_worker_cancel(worker, match, worker->cur_work))
1110
match->nr_running++;
1111
raw_spin_unlock(&worker->lock);
1112
1113
return match->nr_running && !match->cancel_all;
1114
}
1115
1116
static inline void io_wq_remove_pending(struct io_wq *wq,
1117
struct io_wq_acct *acct,
1118
struct io_wq_work *work,
1119
struct io_wq_work_node *prev)
1120
{
1121
unsigned int hash = io_get_work_hash(work);
1122
struct io_wq_work *prev_work = NULL;
1123
1124
if (io_wq_is_hashed(work) && work == wq->hash_tail[hash]) {
1125
if (prev)
1126
prev_work = container_of(prev, struct io_wq_work, list);
1127
if (prev_work && io_get_work_hash(prev_work) == hash)
1128
wq->hash_tail[hash] = prev_work;
1129
else
1130
wq->hash_tail[hash] = NULL;
1131
}
1132
wq_list_del(&acct->work_list, &work->list, prev);
1133
}
1134
1135
static bool io_acct_cancel_pending_work(struct io_wq *wq,
1136
struct io_wq_acct *acct,
1137
struct io_cb_cancel_data *match)
1138
{
1139
struct io_wq_work_node *node, *prev;
1140
struct io_wq_work *work;
1141
1142
raw_spin_lock(&acct->lock);
1143
wq_list_for_each(node, prev, &acct->work_list) {
1144
work = container_of(node, struct io_wq_work, list);
1145
if (!match->fn(work, match->data))
1146
continue;
1147
io_wq_remove_pending(wq, acct, work, prev);
1148
raw_spin_unlock(&acct->lock);
1149
io_run_cancel(work, wq);
1150
match->nr_pending++;
1151
/* not safe to continue after unlock */
1152
return true;
1153
}
1154
raw_spin_unlock(&acct->lock);
1155
1156
return false;
1157
}
1158
1159
static void io_wq_cancel_pending_work(struct io_wq *wq,
1160
struct io_cb_cancel_data *match)
1161
{
1162
int i;
1163
retry:
1164
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1165
struct io_wq_acct *acct = io_get_acct(wq, i == 0);
1166
1167
if (io_acct_cancel_pending_work(wq, acct, match)) {
1168
if (match->cancel_all)
1169
goto retry;
1170
break;
1171
}
1172
}
1173
}
1174
1175
static void io_acct_cancel_running_work(struct io_wq_acct *acct,
1176
struct io_cb_cancel_data *match)
1177
{
1178
raw_spin_lock(&acct->workers_lock);
1179
io_acct_for_each_worker(acct, io_wq_worker_cancel, match);
1180
raw_spin_unlock(&acct->workers_lock);
1181
}
1182
1183
static void io_wq_cancel_running_work(struct io_wq *wq,
1184
struct io_cb_cancel_data *match)
1185
{
1186
rcu_read_lock();
1187
1188
for (int i = 0; i < IO_WQ_ACCT_NR; i++)
1189
io_acct_cancel_running_work(&wq->acct[i], match);
1190
1191
rcu_read_unlock();
1192
}
1193
1194
enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
1195
void *data, bool cancel_all)
1196
{
1197
struct io_cb_cancel_data match = {
1198
.fn = cancel,
1199
.data = data,
1200
.cancel_all = cancel_all,
1201
};
1202
1203
/*
1204
* First check pending list, if we're lucky we can just remove it
1205
* from there. CANCEL_OK means that the work is returned as-new,
1206
* no completion will be posted for it.
1207
*
1208
* Then check if a free (going busy) or busy worker has the work
1209
* currently running. If we find it there, we'll return CANCEL_RUNNING
1210
* as an indication that we attempt to signal cancellation. The
1211
* completion will run normally in this case.
1212
*
1213
* Do both of these while holding the acct->workers_lock, to ensure that
1214
* we'll find a work item regardless of state.
1215
*/
1216
io_wq_cancel_pending_work(wq, &match);
1217
if (match.nr_pending && !match.cancel_all)
1218
return IO_WQ_CANCEL_OK;
1219
1220
io_wq_cancel_running_work(wq, &match);
1221
if (match.nr_running && !match.cancel_all)
1222
return IO_WQ_CANCEL_RUNNING;
1223
1224
if (match.nr_running)
1225
return IO_WQ_CANCEL_RUNNING;
1226
if (match.nr_pending)
1227
return IO_WQ_CANCEL_OK;
1228
return IO_WQ_CANCEL_NOTFOUND;
1229
}
1230
1231
static int io_wq_hash_wake(struct wait_queue_entry *wait, unsigned mode,
1232
int sync, void *key)
1233
{
1234
struct io_wq *wq = container_of(wait, struct io_wq, wait);
1235
int i;
1236
1237
list_del_init(&wait->entry);
1238
1239
rcu_read_lock();
1240
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1241
struct io_wq_acct *acct = &wq->acct[i];
1242
1243
if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags))
1244
io_acct_activate_free_worker(acct);
1245
}
1246
rcu_read_unlock();
1247
return 1;
1248
}
1249
1250
struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
1251
{
1252
int ret, i;
1253
struct io_wq *wq;
1254
1255
if (WARN_ON_ONCE(!bounded))
1256
return ERR_PTR(-EINVAL);
1257
1258
wq = kzalloc(sizeof(struct io_wq), GFP_KERNEL);
1259
if (!wq)
1260
return ERR_PTR(-ENOMEM);
1261
1262
refcount_inc(&data->hash->refs);
1263
wq->hash = data->hash;
1264
1265
ret = -ENOMEM;
1266
1267
if (!alloc_cpumask_var(&wq->cpu_mask, GFP_KERNEL))
1268
goto err;
1269
cpuset_cpus_allowed(data->task, wq->cpu_mask);
1270
wq->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
1271
wq->acct[IO_WQ_ACCT_UNBOUND].max_workers =
1272
task_rlimit(current, RLIMIT_NPROC);
1273
INIT_LIST_HEAD(&wq->wait.entry);
1274
wq->wait.func = io_wq_hash_wake;
1275
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1276
struct io_wq_acct *acct = &wq->acct[i];
1277
1278
atomic_set(&acct->nr_running, 0);
1279
1280
raw_spin_lock_init(&acct->workers_lock);
1281
INIT_HLIST_NULLS_HEAD(&acct->free_list, 0);
1282
INIT_LIST_HEAD(&acct->all_list);
1283
1284
INIT_WQ_LIST(&acct->work_list);
1285
raw_spin_lock_init(&acct->lock);
1286
}
1287
1288
wq->task = get_task_struct(data->task);
1289
atomic_set(&wq->worker_refs, 1);
1290
init_completion(&wq->worker_done);
1291
ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node);
1292
if (ret) {
1293
put_task_struct(wq->task);
1294
goto err;
1295
}
1296
1297
return wq;
1298
err:
1299
io_wq_put_hash(data->hash);
1300
free_cpumask_var(wq->cpu_mask);
1301
kfree(wq);
1302
return ERR_PTR(ret);
1303
}
1304
1305
static bool io_task_work_match(struct callback_head *cb, void *data)
1306
{
1307
struct io_worker *worker;
1308
1309
if (cb->func != create_worker_cb && cb->func != create_worker_cont)
1310
return false;
1311
worker = container_of(cb, struct io_worker, create_work);
1312
return worker->wq == data;
1313
}
1314
1315
void io_wq_exit_start(struct io_wq *wq)
1316
{
1317
set_bit(IO_WQ_BIT_EXIT, &wq->state);
1318
}
1319
1320
static void io_wq_cancel_tw_create(struct io_wq *wq)
1321
{
1322
struct callback_head *cb;
1323
1324
while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) {
1325
struct io_worker *worker;
1326
1327
worker = container_of(cb, struct io_worker, create_work);
1328
io_worker_cancel_cb(worker);
1329
/*
1330
* Only the worker continuation helper has worker allocated and
1331
* hence needs freeing.
1332
*/
1333
if (cb->func == create_worker_cont)
1334
kfree(worker);
1335
}
1336
}
1337
1338
static void io_wq_exit_workers(struct io_wq *wq)
1339
{
1340
unsigned long timeout, warn_timeout;
1341
1342
if (!wq->task)
1343
return;
1344
1345
io_wq_cancel_tw_create(wq);
1346
1347
rcu_read_lock();
1348
io_wq_for_each_worker(wq, io_wq_worker_wake, NULL);
1349
rcu_read_unlock();
1350
io_worker_ref_put(wq);
1351
1352
/*
1353
* Shut up hung task complaint, see for example
1354
*
1355
* https://lore.kernel.org/all/[email protected]/
1356
*
1357
* where completely overloading the system with tons of long running
1358
* io-wq items can easily trigger the hung task timeout. Only sleep
1359
* uninterruptibly for half that time, and warn if we exceeded end
1360
* up waiting more than IO_URING_EXIT_WAIT_MAX.
1361
*/
1362
timeout = sysctl_hung_task_timeout_secs * HZ / 2;
1363
if (!timeout)
1364
timeout = MAX_SCHEDULE_TIMEOUT;
1365
warn_timeout = jiffies + IO_URING_EXIT_WAIT_MAX;
1366
do {
1367
if (wait_for_completion_timeout(&wq->worker_done, timeout))
1368
break;
1369
WARN_ON_ONCE(time_after(jiffies, warn_timeout));
1370
} while (1);
1371
1372
spin_lock_irq(&wq->hash->wait.lock);
1373
list_del_init(&wq->wait.entry);
1374
spin_unlock_irq(&wq->hash->wait.lock);
1375
1376
put_task_struct(wq->task);
1377
wq->task = NULL;
1378
}
1379
1380
static void io_wq_destroy(struct io_wq *wq)
1381
{
1382
struct io_cb_cancel_data match = {
1383
.fn = io_wq_work_match_all,
1384
.cancel_all = true,
1385
};
1386
1387
cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
1388
io_wq_cancel_pending_work(wq, &match);
1389
free_cpumask_var(wq->cpu_mask);
1390
io_wq_put_hash(wq->hash);
1391
kfree(wq);
1392
}
1393
1394
void io_wq_put_and_exit(struct io_wq *wq)
1395
{
1396
WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state));
1397
1398
io_wq_exit_workers(wq);
1399
io_wq_destroy(wq);
1400
}
1401
1402
struct online_data {
1403
unsigned int cpu;
1404
bool online;
1405
};
1406
1407
static bool io_wq_worker_affinity(struct io_worker *worker, void *data)
1408
{
1409
struct online_data *od = data;
1410
1411
if (od->online)
1412
cpumask_set_cpu(od->cpu, worker->wq->cpu_mask);
1413
else
1414
cpumask_clear_cpu(od->cpu, worker->wq->cpu_mask);
1415
return false;
1416
}
1417
1418
static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online)
1419
{
1420
struct online_data od = {
1421
.cpu = cpu,
1422
.online = online
1423
};
1424
1425
rcu_read_lock();
1426
io_wq_for_each_worker(wq, io_wq_worker_affinity, &od);
1427
rcu_read_unlock();
1428
return 0;
1429
}
1430
1431
static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node)
1432
{
1433
struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);
1434
1435
return __io_wq_cpu_online(wq, cpu, true);
1436
}
1437
1438
static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node)
1439
{
1440
struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);
1441
1442
return __io_wq_cpu_online(wq, cpu, false);
1443
}
1444
1445
int io_wq_cpu_affinity(struct io_uring_task *tctx, cpumask_var_t mask)
1446
{
1447
cpumask_var_t allowed_mask;
1448
int ret = 0;
1449
1450
if (!tctx || !tctx->io_wq)
1451
return -EINVAL;
1452
1453
if (!alloc_cpumask_var(&allowed_mask, GFP_KERNEL))
1454
return -ENOMEM;
1455
1456
rcu_read_lock();
1457
cpuset_cpus_allowed(tctx->io_wq->task, allowed_mask);
1458
if (mask) {
1459
if (cpumask_subset(mask, allowed_mask))
1460
cpumask_copy(tctx->io_wq->cpu_mask, mask);
1461
else
1462
ret = -EINVAL;
1463
} else {
1464
cpumask_copy(tctx->io_wq->cpu_mask, allowed_mask);
1465
}
1466
rcu_read_unlock();
1467
1468
free_cpumask_var(allowed_mask);
1469
return ret;
1470
}
1471
1472
/*
1473
* Set max number of unbounded workers, returns old value. If new_count is 0,
1474
* then just return the old value.
1475
*/
1476
int io_wq_max_workers(struct io_wq *wq, int *new_count)
1477
{
1478
struct io_wq_acct *acct;
1479
int prev[IO_WQ_ACCT_NR];
1480
int i;
1481
1482
BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND != (int) IO_WQ_BOUND);
1483
BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND);
1484
BUILD_BUG_ON((int) IO_WQ_ACCT_NR != 2);
1485
1486
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1487
if (new_count[i] > task_rlimit(current, RLIMIT_NPROC))
1488
new_count[i] = task_rlimit(current, RLIMIT_NPROC);
1489
}
1490
1491
for (i = 0; i < IO_WQ_ACCT_NR; i++)
1492
prev[i] = 0;
1493
1494
rcu_read_lock();
1495
1496
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1497
acct = &wq->acct[i];
1498
raw_spin_lock(&acct->workers_lock);
1499
prev[i] = max_t(int, acct->max_workers, prev[i]);
1500
if (new_count[i])
1501
acct->max_workers = new_count[i];
1502
raw_spin_unlock(&acct->workers_lock);
1503
}
1504
rcu_read_unlock();
1505
1506
for (i = 0; i < IO_WQ_ACCT_NR; i++)
1507
new_count[i] = prev[i];
1508
1509
return 0;
1510
}
1511
1512
static __init int io_wq_init(void)
1513
{
1514
int ret;
1515
1516
ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online",
1517
io_wq_cpu_online, io_wq_cpu_offline);
1518
if (ret < 0)
1519
return ret;
1520
io_wq_online = ret;
1521
return 0;
1522
}
1523
subsys_initcall(io_wq_init);
1524
1525