Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/io_uring/io-wq.c
26131 views
1
// SPDX-License-Identifier: GPL-2.0
2
/*
3
* Basic worker thread pool for io_uring
4
*
5
* Copyright (C) 2019 Jens Axboe
6
*
7
*/
8
#include <linux/kernel.h>
9
#include <linux/init.h>
10
#include <linux/errno.h>
11
#include <linux/sched/signal.h>
12
#include <linux/percpu.h>
13
#include <linux/slab.h>
14
#include <linux/rculist_nulls.h>
15
#include <linux/cpu.h>
16
#include <linux/cpuset.h>
17
#include <linux/task_work.h>
18
#include <linux/audit.h>
19
#include <linux/mmu_context.h>
20
#include <uapi/linux/io_uring.h>
21
22
#include "io-wq.h"
23
#include "slist.h"
24
#include "io_uring.h"
25
26
#define WORKER_IDLE_TIMEOUT (5 * HZ)
27
#define WORKER_INIT_LIMIT 3
28
29
enum {
30
IO_WORKER_F_UP = 0, /* up and active */
31
IO_WORKER_F_RUNNING = 1, /* account as running */
32
IO_WORKER_F_FREE = 2, /* worker on free list */
33
};
34
35
enum {
36
IO_WQ_BIT_EXIT = 0, /* wq exiting */
37
};
38
39
enum {
40
IO_ACCT_STALLED_BIT = 0, /* stalled on hash */
41
};
42
43
/*
44
* One for each thread in a wq pool
45
*/
46
struct io_worker {
47
refcount_t ref;
48
unsigned long flags;
49
struct hlist_nulls_node nulls_node;
50
struct list_head all_list;
51
struct task_struct *task;
52
struct io_wq *wq;
53
struct io_wq_acct *acct;
54
55
struct io_wq_work *cur_work;
56
raw_spinlock_t lock;
57
58
struct completion ref_done;
59
60
unsigned long create_state;
61
struct callback_head create_work;
62
int init_retries;
63
64
union {
65
struct rcu_head rcu;
66
struct delayed_work work;
67
};
68
};
69
70
#if BITS_PER_LONG == 64
71
#define IO_WQ_HASH_ORDER 6
72
#else
73
#define IO_WQ_HASH_ORDER 5
74
#endif
75
76
#define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER)
77
78
struct io_wq_acct {
79
/**
80
* Protects access to the worker lists.
81
*/
82
raw_spinlock_t workers_lock;
83
84
unsigned nr_workers;
85
unsigned max_workers;
86
atomic_t nr_running;
87
88
/**
89
* The list of free workers. Protected by #workers_lock
90
* (write) and RCU (read).
91
*/
92
struct hlist_nulls_head free_list;
93
94
/**
95
* The list of all workers. Protected by #workers_lock
96
* (write) and RCU (read).
97
*/
98
struct list_head all_list;
99
100
raw_spinlock_t lock;
101
struct io_wq_work_list work_list;
102
unsigned long flags;
103
};
104
105
enum {
106
IO_WQ_ACCT_BOUND,
107
IO_WQ_ACCT_UNBOUND,
108
IO_WQ_ACCT_NR,
109
};
110
111
/*
112
* Per io_wq state
113
*/
114
struct io_wq {
115
unsigned long state;
116
117
struct io_wq_hash *hash;
118
119
atomic_t worker_refs;
120
struct completion worker_done;
121
122
struct hlist_node cpuhp_node;
123
124
struct task_struct *task;
125
126
struct io_wq_acct acct[IO_WQ_ACCT_NR];
127
128
struct wait_queue_entry wait;
129
130
struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
131
132
cpumask_var_t cpu_mask;
133
};
134
135
static enum cpuhp_state io_wq_online;
136
137
struct io_cb_cancel_data {
138
work_cancel_fn *fn;
139
void *data;
140
int nr_running;
141
int nr_pending;
142
bool cancel_all;
143
};
144
145
static bool create_io_worker(struct io_wq *wq, struct io_wq_acct *acct);
146
static void io_wq_dec_running(struct io_worker *worker);
147
static bool io_acct_cancel_pending_work(struct io_wq *wq,
148
struct io_wq_acct *acct,
149
struct io_cb_cancel_data *match);
150
static void create_worker_cb(struct callback_head *cb);
151
static void io_wq_cancel_tw_create(struct io_wq *wq);
152
153
static inline unsigned int __io_get_work_hash(unsigned int work_flags)
154
{
155
return work_flags >> IO_WQ_HASH_SHIFT;
156
}
157
158
static inline unsigned int io_get_work_hash(struct io_wq_work *work)
159
{
160
return __io_get_work_hash(atomic_read(&work->flags));
161
}
162
163
static bool io_worker_get(struct io_worker *worker)
164
{
165
return refcount_inc_not_zero(&worker->ref);
166
}
167
168
static void io_worker_release(struct io_worker *worker)
169
{
170
if (refcount_dec_and_test(&worker->ref))
171
complete(&worker->ref_done);
172
}
173
174
static inline struct io_wq_acct *io_get_acct(struct io_wq *wq, bool bound)
175
{
176
return &wq->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND];
177
}
178
179
static inline struct io_wq_acct *io_work_get_acct(struct io_wq *wq,
180
unsigned int work_flags)
181
{
182
return io_get_acct(wq, !(work_flags & IO_WQ_WORK_UNBOUND));
183
}
184
185
static inline struct io_wq_acct *io_wq_get_acct(struct io_worker *worker)
186
{
187
return worker->acct;
188
}
189
190
static void io_worker_ref_put(struct io_wq *wq)
191
{
192
if (atomic_dec_and_test(&wq->worker_refs))
193
complete(&wq->worker_done);
194
}
195
196
bool io_wq_worker_stopped(void)
197
{
198
struct io_worker *worker = current->worker_private;
199
200
if (WARN_ON_ONCE(!io_wq_current_is_worker()))
201
return true;
202
203
return test_bit(IO_WQ_BIT_EXIT, &worker->wq->state);
204
}
205
206
static void io_worker_cancel_cb(struct io_worker *worker)
207
{
208
struct io_wq_acct *acct = io_wq_get_acct(worker);
209
struct io_wq *wq = worker->wq;
210
211
atomic_dec(&acct->nr_running);
212
raw_spin_lock(&acct->workers_lock);
213
acct->nr_workers--;
214
raw_spin_unlock(&acct->workers_lock);
215
io_worker_ref_put(wq);
216
clear_bit_unlock(0, &worker->create_state);
217
io_worker_release(worker);
218
}
219
220
static bool io_task_worker_match(struct callback_head *cb, void *data)
221
{
222
struct io_worker *worker;
223
224
if (cb->func != create_worker_cb)
225
return false;
226
worker = container_of(cb, struct io_worker, create_work);
227
return worker == data;
228
}
229
230
static void io_worker_exit(struct io_worker *worker)
231
{
232
struct io_wq *wq = worker->wq;
233
struct io_wq_acct *acct = io_wq_get_acct(worker);
234
235
while (1) {
236
struct callback_head *cb = task_work_cancel_match(wq->task,
237
io_task_worker_match, worker);
238
239
if (!cb)
240
break;
241
io_worker_cancel_cb(worker);
242
}
243
244
io_worker_release(worker);
245
wait_for_completion(&worker->ref_done);
246
247
raw_spin_lock(&acct->workers_lock);
248
if (test_bit(IO_WORKER_F_FREE, &worker->flags))
249
hlist_nulls_del_rcu(&worker->nulls_node);
250
list_del_rcu(&worker->all_list);
251
raw_spin_unlock(&acct->workers_lock);
252
io_wq_dec_running(worker);
253
/*
254
* this worker is a goner, clear ->worker_private to avoid any
255
* inc/dec running calls that could happen as part of exit from
256
* touching 'worker'.
257
*/
258
current->worker_private = NULL;
259
260
kfree_rcu(worker, rcu);
261
io_worker_ref_put(wq);
262
do_exit(0);
263
}
264
265
static inline bool __io_acct_run_queue(struct io_wq_acct *acct)
266
{
267
return !test_bit(IO_ACCT_STALLED_BIT, &acct->flags) &&
268
!wq_list_empty(&acct->work_list);
269
}
270
271
/*
272
* If there's work to do, returns true with acct->lock acquired. If not,
273
* returns false with no lock held.
274
*/
275
static inline bool io_acct_run_queue(struct io_wq_acct *acct)
276
__acquires(&acct->lock)
277
{
278
raw_spin_lock(&acct->lock);
279
if (__io_acct_run_queue(acct))
280
return true;
281
282
raw_spin_unlock(&acct->lock);
283
return false;
284
}
285
286
/*
287
* Check head of free list for an available worker. If one isn't available,
288
* caller must create one.
289
*/
290
static bool io_acct_activate_free_worker(struct io_wq_acct *acct)
291
__must_hold(RCU)
292
{
293
struct hlist_nulls_node *n;
294
struct io_worker *worker;
295
296
/*
297
* Iterate free_list and see if we can find an idle worker to
298
* activate. If a given worker is on the free_list but in the process
299
* of exiting, keep trying.
300
*/
301
hlist_nulls_for_each_entry_rcu(worker, n, &acct->free_list, nulls_node) {
302
if (!io_worker_get(worker))
303
continue;
304
/*
305
* If the worker is already running, it's either already
306
* starting work or finishing work. In either case, if it does
307
* to go sleep, we'll kick off a new task for this work anyway.
308
*/
309
wake_up_process(worker->task);
310
io_worker_release(worker);
311
return true;
312
}
313
314
return false;
315
}
316
317
/*
318
* We need a worker. If we find a free one, we're good. If not, and we're
319
* below the max number of workers, create one.
320
*/
321
static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct)
322
{
323
/*
324
* Most likely an attempt to queue unbounded work on an io_wq that
325
* wasn't setup with any unbounded workers.
326
*/
327
if (unlikely(!acct->max_workers))
328
pr_warn_once("io-wq is not configured for unbound workers");
329
330
raw_spin_lock(&acct->workers_lock);
331
if (acct->nr_workers >= acct->max_workers) {
332
raw_spin_unlock(&acct->workers_lock);
333
return true;
334
}
335
acct->nr_workers++;
336
raw_spin_unlock(&acct->workers_lock);
337
atomic_inc(&acct->nr_running);
338
atomic_inc(&wq->worker_refs);
339
return create_io_worker(wq, acct);
340
}
341
342
static void io_wq_inc_running(struct io_worker *worker)
343
{
344
struct io_wq_acct *acct = io_wq_get_acct(worker);
345
346
atomic_inc(&acct->nr_running);
347
}
348
349
static void create_worker_cb(struct callback_head *cb)
350
{
351
struct io_worker *worker;
352
struct io_wq *wq;
353
354
struct io_wq_acct *acct;
355
bool do_create = false;
356
357
worker = container_of(cb, struct io_worker, create_work);
358
wq = worker->wq;
359
acct = worker->acct;
360
361
rcu_read_lock();
362
do_create = !io_acct_activate_free_worker(acct);
363
rcu_read_unlock();
364
if (!do_create)
365
goto no_need_create;
366
367
raw_spin_lock(&acct->workers_lock);
368
369
if (acct->nr_workers < acct->max_workers) {
370
acct->nr_workers++;
371
do_create = true;
372
}
373
raw_spin_unlock(&acct->workers_lock);
374
if (do_create) {
375
create_io_worker(wq, acct);
376
} else {
377
no_need_create:
378
atomic_dec(&acct->nr_running);
379
io_worker_ref_put(wq);
380
}
381
clear_bit_unlock(0, &worker->create_state);
382
io_worker_release(worker);
383
}
384
385
static bool io_queue_worker_create(struct io_worker *worker,
386
struct io_wq_acct *acct,
387
task_work_func_t func)
388
{
389
struct io_wq *wq = worker->wq;
390
391
/* raced with exit, just ignore create call */
392
if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
393
goto fail;
394
if (!io_worker_get(worker))
395
goto fail;
396
/*
397
* create_state manages ownership of create_work/index. We should
398
* only need one entry per worker, as the worker going to sleep
399
* will trigger the condition, and waking will clear it once it
400
* runs the task_work.
401
*/
402
if (test_bit(0, &worker->create_state) ||
403
test_and_set_bit_lock(0, &worker->create_state))
404
goto fail_release;
405
406
atomic_inc(&wq->worker_refs);
407
init_task_work(&worker->create_work, func);
408
if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) {
409
/*
410
* EXIT may have been set after checking it above, check after
411
* adding the task_work and remove any creation item if it is
412
* now set. wq exit does that too, but we can have added this
413
* work item after we canceled in io_wq_exit_workers().
414
*/
415
if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
416
io_wq_cancel_tw_create(wq);
417
io_worker_ref_put(wq);
418
return true;
419
}
420
io_worker_ref_put(wq);
421
clear_bit_unlock(0, &worker->create_state);
422
fail_release:
423
io_worker_release(worker);
424
fail:
425
atomic_dec(&acct->nr_running);
426
io_worker_ref_put(wq);
427
return false;
428
}
429
430
/* Defer if current and next work are both hashed to the same chain */
431
static bool io_wq_hash_defer(struct io_wq_work *work, struct io_wq_acct *acct)
432
{
433
unsigned int hash, work_flags;
434
struct io_wq_work *next;
435
436
lockdep_assert_held(&acct->lock);
437
438
work_flags = atomic_read(&work->flags);
439
if (!__io_wq_is_hashed(work_flags))
440
return false;
441
442
/* should not happen, io_acct_run_queue() said we had work */
443
if (wq_list_empty(&acct->work_list))
444
return true;
445
446
hash = __io_get_work_hash(work_flags);
447
next = container_of(acct->work_list.first, struct io_wq_work, list);
448
work_flags = atomic_read(&next->flags);
449
if (!__io_wq_is_hashed(work_flags))
450
return false;
451
return hash == __io_get_work_hash(work_flags);
452
}
453
454
static void io_wq_dec_running(struct io_worker *worker)
455
{
456
struct io_wq_acct *acct = io_wq_get_acct(worker);
457
struct io_wq *wq = worker->wq;
458
459
if (!test_bit(IO_WORKER_F_UP, &worker->flags))
460
return;
461
462
if (!atomic_dec_and_test(&acct->nr_running))
463
return;
464
if (!worker->cur_work)
465
return;
466
if (!io_acct_run_queue(acct))
467
return;
468
if (io_wq_hash_defer(worker->cur_work, acct)) {
469
raw_spin_unlock(&acct->lock);
470
return;
471
}
472
473
raw_spin_unlock(&acct->lock);
474
atomic_inc(&acct->nr_running);
475
atomic_inc(&wq->worker_refs);
476
io_queue_worker_create(worker, acct, create_worker_cb);
477
}
478
479
/*
480
* Worker will start processing some work. Move it to the busy list, if
481
* it's currently on the freelist
482
*/
483
static void __io_worker_busy(struct io_wq_acct *acct, struct io_worker *worker)
484
{
485
if (test_bit(IO_WORKER_F_FREE, &worker->flags)) {
486
clear_bit(IO_WORKER_F_FREE, &worker->flags);
487
raw_spin_lock(&acct->workers_lock);
488
hlist_nulls_del_init_rcu(&worker->nulls_node);
489
raw_spin_unlock(&acct->workers_lock);
490
}
491
}
492
493
/*
494
* No work, worker going to sleep. Move to freelist.
495
*/
496
static void __io_worker_idle(struct io_wq_acct *acct, struct io_worker *worker)
497
__must_hold(acct->workers_lock)
498
{
499
if (!test_bit(IO_WORKER_F_FREE, &worker->flags)) {
500
set_bit(IO_WORKER_F_FREE, &worker->flags);
501
hlist_nulls_add_head_rcu(&worker->nulls_node, &acct->free_list);
502
}
503
}
504
505
static bool io_wait_on_hash(struct io_wq *wq, unsigned int hash)
506
{
507
bool ret = false;
508
509
spin_lock_irq(&wq->hash->wait.lock);
510
if (list_empty(&wq->wait.entry)) {
511
__add_wait_queue(&wq->hash->wait, &wq->wait);
512
if (!test_bit(hash, &wq->hash->map)) {
513
__set_current_state(TASK_RUNNING);
514
list_del_init(&wq->wait.entry);
515
ret = true;
516
}
517
}
518
spin_unlock_irq(&wq->hash->wait.lock);
519
return ret;
520
}
521
522
static struct io_wq_work *io_get_next_work(struct io_wq_acct *acct,
523
struct io_wq *wq)
524
__must_hold(acct->lock)
525
{
526
struct io_wq_work_node *node, *prev;
527
struct io_wq_work *work, *tail;
528
unsigned int stall_hash = -1U;
529
530
wq_list_for_each(node, prev, &acct->work_list) {
531
unsigned int work_flags;
532
unsigned int hash;
533
534
work = container_of(node, struct io_wq_work, list);
535
536
/* not hashed, can run anytime */
537
work_flags = atomic_read(&work->flags);
538
if (!__io_wq_is_hashed(work_flags)) {
539
wq_list_del(&acct->work_list, node, prev);
540
return work;
541
}
542
543
hash = __io_get_work_hash(work_flags);
544
/* all items with this hash lie in [work, tail] */
545
tail = wq->hash_tail[hash];
546
547
/* hashed, can run if not already running */
548
if (!test_and_set_bit(hash, &wq->hash->map)) {
549
wq->hash_tail[hash] = NULL;
550
wq_list_cut(&acct->work_list, &tail->list, prev);
551
return work;
552
}
553
if (stall_hash == -1U)
554
stall_hash = hash;
555
/* fast forward to a next hash, for-each will fix up @prev */
556
node = &tail->list;
557
}
558
559
if (stall_hash != -1U) {
560
bool unstalled;
561
562
/*
563
* Set this before dropping the lock to avoid racing with new
564
* work being added and clearing the stalled bit.
565
*/
566
set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
567
raw_spin_unlock(&acct->lock);
568
unstalled = io_wait_on_hash(wq, stall_hash);
569
raw_spin_lock(&acct->lock);
570
if (unstalled) {
571
clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
572
if (wq_has_sleeper(&wq->hash->wait))
573
wake_up(&wq->hash->wait);
574
}
575
}
576
577
return NULL;
578
}
579
580
static void io_assign_current_work(struct io_worker *worker,
581
struct io_wq_work *work)
582
{
583
if (work) {
584
io_run_task_work();
585
cond_resched();
586
}
587
588
raw_spin_lock(&worker->lock);
589
worker->cur_work = work;
590
raw_spin_unlock(&worker->lock);
591
}
592
593
/*
594
* Called with acct->lock held, drops it before returning
595
*/
596
static void io_worker_handle_work(struct io_wq_acct *acct,
597
struct io_worker *worker)
598
__releases(&acct->lock)
599
{
600
struct io_wq *wq = worker->wq;
601
bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
602
603
do {
604
struct io_wq_work *work;
605
606
/*
607
* If we got some work, mark us as busy. If we didn't, but
608
* the list isn't empty, it means we stalled on hashed work.
609
* Mark us stalled so we don't keep looking for work when we
610
* can't make progress, any work completion or insertion will
611
* clear the stalled flag.
612
*/
613
work = io_get_next_work(acct, wq);
614
if (work) {
615
/*
616
* Make sure cancelation can find this, even before
617
* it becomes the active work. That avoids a window
618
* where the work has been removed from our general
619
* work list, but isn't yet discoverable as the
620
* current work item for this worker.
621
*/
622
raw_spin_lock(&worker->lock);
623
worker->cur_work = work;
624
raw_spin_unlock(&worker->lock);
625
}
626
627
raw_spin_unlock(&acct->lock);
628
629
if (!work)
630
break;
631
632
__io_worker_busy(acct, worker);
633
634
io_assign_current_work(worker, work);
635
__set_current_state(TASK_RUNNING);
636
637
/* handle a whole dependent link */
638
do {
639
struct io_wq_work *next_hashed, *linked;
640
unsigned int work_flags = atomic_read(&work->flags);
641
unsigned int hash = __io_wq_is_hashed(work_flags)
642
? __io_get_work_hash(work_flags)
643
: -1U;
644
645
next_hashed = wq_next_work(work);
646
647
if (do_kill &&
648
(work_flags & IO_WQ_WORK_UNBOUND))
649
atomic_or(IO_WQ_WORK_CANCEL, &work->flags);
650
io_wq_submit_work(work);
651
io_assign_current_work(worker, NULL);
652
653
linked = io_wq_free_work(work);
654
work = next_hashed;
655
if (!work && linked && !io_wq_is_hashed(linked)) {
656
work = linked;
657
linked = NULL;
658
}
659
io_assign_current_work(worker, work);
660
if (linked)
661
io_wq_enqueue(wq, linked);
662
663
if (hash != -1U && !next_hashed) {
664
/* serialize hash clear with wake_up() */
665
spin_lock_irq(&wq->hash->wait.lock);
666
clear_bit(hash, &wq->hash->map);
667
clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
668
spin_unlock_irq(&wq->hash->wait.lock);
669
if (wq_has_sleeper(&wq->hash->wait))
670
wake_up(&wq->hash->wait);
671
}
672
} while (work);
673
674
if (!__io_acct_run_queue(acct))
675
break;
676
raw_spin_lock(&acct->lock);
677
} while (1);
678
}
679
680
static int io_wq_worker(void *data)
681
{
682
struct io_worker *worker = data;
683
struct io_wq_acct *acct = io_wq_get_acct(worker);
684
struct io_wq *wq = worker->wq;
685
bool exit_mask = false, last_timeout = false;
686
char buf[TASK_COMM_LEN] = {};
687
688
set_mask_bits(&worker->flags, 0,
689
BIT(IO_WORKER_F_UP) | BIT(IO_WORKER_F_RUNNING));
690
691
snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid);
692
set_task_comm(current, buf);
693
694
while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
695
long ret;
696
697
set_current_state(TASK_INTERRUPTIBLE);
698
699
/*
700
* If we have work to do, io_acct_run_queue() returns with
701
* the acct->lock held. If not, it will drop it.
702
*/
703
while (io_acct_run_queue(acct))
704
io_worker_handle_work(acct, worker);
705
706
raw_spin_lock(&acct->workers_lock);
707
/*
708
* Last sleep timed out. Exit if we're not the last worker,
709
* or if someone modified our affinity.
710
*/
711
if (last_timeout && (exit_mask || acct->nr_workers > 1)) {
712
acct->nr_workers--;
713
raw_spin_unlock(&acct->workers_lock);
714
__set_current_state(TASK_RUNNING);
715
break;
716
}
717
last_timeout = false;
718
__io_worker_idle(acct, worker);
719
raw_spin_unlock(&acct->workers_lock);
720
if (io_run_task_work())
721
continue;
722
ret = schedule_timeout(WORKER_IDLE_TIMEOUT);
723
if (signal_pending(current)) {
724
struct ksignal ksig;
725
726
if (!get_signal(&ksig))
727
continue;
728
break;
729
}
730
if (!ret) {
731
last_timeout = true;
732
exit_mask = !cpumask_test_cpu(raw_smp_processor_id(),
733
wq->cpu_mask);
734
}
735
}
736
737
if (test_bit(IO_WQ_BIT_EXIT, &wq->state) && io_acct_run_queue(acct))
738
io_worker_handle_work(acct, worker);
739
740
io_worker_exit(worker);
741
return 0;
742
}
743
744
/*
745
* Called when a worker is scheduled in. Mark us as currently running.
746
*/
747
void io_wq_worker_running(struct task_struct *tsk)
748
{
749
struct io_worker *worker = tsk->worker_private;
750
751
if (!worker)
752
return;
753
if (!test_bit(IO_WORKER_F_UP, &worker->flags))
754
return;
755
if (test_bit(IO_WORKER_F_RUNNING, &worker->flags))
756
return;
757
set_bit(IO_WORKER_F_RUNNING, &worker->flags);
758
io_wq_inc_running(worker);
759
}
760
761
/*
762
* Called when worker is going to sleep. If there are no workers currently
763
* running and we have work pending, wake up a free one or create a new one.
764
*/
765
void io_wq_worker_sleeping(struct task_struct *tsk)
766
{
767
struct io_worker *worker = tsk->worker_private;
768
769
if (!worker)
770
return;
771
if (!test_bit(IO_WORKER_F_UP, &worker->flags))
772
return;
773
if (!test_bit(IO_WORKER_F_RUNNING, &worker->flags))
774
return;
775
776
clear_bit(IO_WORKER_F_RUNNING, &worker->flags);
777
io_wq_dec_running(worker);
778
}
779
780
static void io_init_new_worker(struct io_wq *wq, struct io_wq_acct *acct, struct io_worker *worker,
781
struct task_struct *tsk)
782
{
783
tsk->worker_private = worker;
784
worker->task = tsk;
785
set_cpus_allowed_ptr(tsk, wq->cpu_mask);
786
787
raw_spin_lock(&acct->workers_lock);
788
hlist_nulls_add_head_rcu(&worker->nulls_node, &acct->free_list);
789
list_add_tail_rcu(&worker->all_list, &acct->all_list);
790
set_bit(IO_WORKER_F_FREE, &worker->flags);
791
raw_spin_unlock(&acct->workers_lock);
792
wake_up_new_task(tsk);
793
}
794
795
static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
796
{
797
return true;
798
}
799
800
static inline bool io_should_retry_thread(struct io_worker *worker, long err)
801
{
802
/*
803
* Prevent perpetual task_work retry, if the task (or its group) is
804
* exiting.
805
*/
806
if (fatal_signal_pending(current))
807
return false;
808
if (worker->init_retries++ >= WORKER_INIT_LIMIT)
809
return false;
810
811
switch (err) {
812
case -EAGAIN:
813
case -ERESTARTSYS:
814
case -ERESTARTNOINTR:
815
case -ERESTARTNOHAND:
816
return true;
817
default:
818
return false;
819
}
820
}
821
822
static void queue_create_worker_retry(struct io_worker *worker)
823
{
824
/*
825
* We only bother retrying because there's a chance that the
826
* failure to create a worker is due to some temporary condition
827
* in the forking task (e.g. outstanding signal); give the task
828
* some time to clear that condition.
829
*/
830
schedule_delayed_work(&worker->work,
831
msecs_to_jiffies(worker->init_retries * 5));
832
}
833
834
static void create_worker_cont(struct callback_head *cb)
835
{
836
struct io_worker *worker;
837
struct task_struct *tsk;
838
struct io_wq *wq;
839
struct io_wq_acct *acct;
840
841
worker = container_of(cb, struct io_worker, create_work);
842
clear_bit_unlock(0, &worker->create_state);
843
wq = worker->wq;
844
acct = io_wq_get_acct(worker);
845
tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE);
846
if (!IS_ERR(tsk)) {
847
io_init_new_worker(wq, acct, worker, tsk);
848
io_worker_release(worker);
849
return;
850
} else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) {
851
atomic_dec(&acct->nr_running);
852
raw_spin_lock(&acct->workers_lock);
853
acct->nr_workers--;
854
if (!acct->nr_workers) {
855
struct io_cb_cancel_data match = {
856
.fn = io_wq_work_match_all,
857
.cancel_all = true,
858
};
859
860
raw_spin_unlock(&acct->workers_lock);
861
while (io_acct_cancel_pending_work(wq, acct, &match))
862
;
863
} else {
864
raw_spin_unlock(&acct->workers_lock);
865
}
866
io_worker_ref_put(wq);
867
kfree(worker);
868
return;
869
}
870
871
/* re-create attempts grab a new worker ref, drop the existing one */
872
io_worker_release(worker);
873
queue_create_worker_retry(worker);
874
}
875
876
static void io_workqueue_create(struct work_struct *work)
877
{
878
struct io_worker *worker = container_of(work, struct io_worker,
879
work.work);
880
struct io_wq_acct *acct = io_wq_get_acct(worker);
881
882
if (!io_queue_worker_create(worker, acct, create_worker_cont))
883
kfree(worker);
884
}
885
886
static bool create_io_worker(struct io_wq *wq, struct io_wq_acct *acct)
887
{
888
struct io_worker *worker;
889
struct task_struct *tsk;
890
891
__set_current_state(TASK_RUNNING);
892
893
worker = kzalloc(sizeof(*worker), GFP_KERNEL);
894
if (!worker) {
895
fail:
896
atomic_dec(&acct->nr_running);
897
raw_spin_lock(&acct->workers_lock);
898
acct->nr_workers--;
899
raw_spin_unlock(&acct->workers_lock);
900
io_worker_ref_put(wq);
901
return false;
902
}
903
904
refcount_set(&worker->ref, 1);
905
worker->wq = wq;
906
worker->acct = acct;
907
raw_spin_lock_init(&worker->lock);
908
init_completion(&worker->ref_done);
909
910
tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE);
911
if (!IS_ERR(tsk)) {
912
io_init_new_worker(wq, acct, worker, tsk);
913
} else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) {
914
kfree(worker);
915
goto fail;
916
} else {
917
INIT_DELAYED_WORK(&worker->work, io_workqueue_create);
918
queue_create_worker_retry(worker);
919
}
920
921
return true;
922
}
923
924
/*
925
* Iterate the passed in list and call the specific function for each
926
* worker that isn't exiting
927
*/
928
static bool io_acct_for_each_worker(struct io_wq_acct *acct,
929
bool (*func)(struct io_worker *, void *),
930
void *data)
931
{
932
struct io_worker *worker;
933
bool ret = false;
934
935
list_for_each_entry_rcu(worker, &acct->all_list, all_list) {
936
if (io_worker_get(worker)) {
937
/* no task if node is/was offline */
938
if (worker->task)
939
ret = func(worker, data);
940
io_worker_release(worker);
941
if (ret)
942
break;
943
}
944
}
945
946
return ret;
947
}
948
949
static bool io_wq_for_each_worker(struct io_wq *wq,
950
bool (*func)(struct io_worker *, void *),
951
void *data)
952
{
953
for (int i = 0; i < IO_WQ_ACCT_NR; i++) {
954
if (!io_acct_for_each_worker(&wq->acct[i], func, data))
955
return false;
956
}
957
958
return true;
959
}
960
961
static bool io_wq_worker_wake(struct io_worker *worker, void *data)
962
{
963
__set_notify_signal(worker->task);
964
wake_up_process(worker->task);
965
return false;
966
}
967
968
static void io_run_cancel(struct io_wq_work *work, struct io_wq *wq)
969
{
970
do {
971
atomic_or(IO_WQ_WORK_CANCEL, &work->flags);
972
io_wq_submit_work(work);
973
work = io_wq_free_work(work);
974
} while (work);
975
}
976
977
static void io_wq_insert_work(struct io_wq *wq, struct io_wq_acct *acct,
978
struct io_wq_work *work, unsigned int work_flags)
979
{
980
unsigned int hash;
981
struct io_wq_work *tail;
982
983
if (!__io_wq_is_hashed(work_flags)) {
984
append:
985
wq_list_add_tail(&work->list, &acct->work_list);
986
return;
987
}
988
989
hash = __io_get_work_hash(work_flags);
990
tail = wq->hash_tail[hash];
991
wq->hash_tail[hash] = work;
992
if (!tail)
993
goto append;
994
995
wq_list_add_after(&work->list, &tail->list, &acct->work_list);
996
}
997
998
static bool io_wq_work_match_item(struct io_wq_work *work, void *data)
999
{
1000
return work == data;
1001
}
1002
1003
void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
1004
{
1005
unsigned int work_flags = atomic_read(&work->flags);
1006
struct io_wq_acct *acct = io_work_get_acct(wq, work_flags);
1007
struct io_cb_cancel_data match = {
1008
.fn = io_wq_work_match_item,
1009
.data = work,
1010
.cancel_all = false,
1011
};
1012
bool do_create;
1013
1014
/*
1015
* If io-wq is exiting for this task, or if the request has explicitly
1016
* been marked as one that should not get executed, cancel it here.
1017
*/
1018
if (test_bit(IO_WQ_BIT_EXIT, &wq->state) ||
1019
(work_flags & IO_WQ_WORK_CANCEL)) {
1020
io_run_cancel(work, wq);
1021
return;
1022
}
1023
1024
raw_spin_lock(&acct->lock);
1025
io_wq_insert_work(wq, acct, work, work_flags);
1026
clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
1027
raw_spin_unlock(&acct->lock);
1028
1029
rcu_read_lock();
1030
do_create = !io_acct_activate_free_worker(acct);
1031
rcu_read_unlock();
1032
1033
if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) ||
1034
!atomic_read(&acct->nr_running))) {
1035
bool did_create;
1036
1037
did_create = io_wq_create_worker(wq, acct);
1038
if (likely(did_create))
1039
return;
1040
1041
raw_spin_lock(&acct->workers_lock);
1042
if (acct->nr_workers) {
1043
raw_spin_unlock(&acct->workers_lock);
1044
return;
1045
}
1046
raw_spin_unlock(&acct->workers_lock);
1047
1048
/* fatal condition, failed to create the first worker */
1049
io_acct_cancel_pending_work(wq, acct, &match);
1050
}
1051
}
1052
1053
/*
1054
* Work items that hash to the same value will not be done in parallel.
1055
* Used to limit concurrent writes, generally hashed by inode.
1056
*/
1057
void io_wq_hash_work(struct io_wq_work *work, void *val)
1058
{
1059
unsigned int bit;
1060
1061
bit = hash_ptr(val, IO_WQ_HASH_ORDER);
1062
atomic_or(IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT), &work->flags);
1063
}
1064
1065
static bool __io_wq_worker_cancel(struct io_worker *worker,
1066
struct io_cb_cancel_data *match,
1067
struct io_wq_work *work)
1068
{
1069
if (work && match->fn(work, match->data)) {
1070
atomic_or(IO_WQ_WORK_CANCEL, &work->flags);
1071
__set_notify_signal(worker->task);
1072
return true;
1073
}
1074
1075
return false;
1076
}
1077
1078
static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
1079
{
1080
struct io_cb_cancel_data *match = data;
1081
1082
/*
1083
* Hold the lock to avoid ->cur_work going out of scope, caller
1084
* may dereference the passed in work.
1085
*/
1086
raw_spin_lock(&worker->lock);
1087
if (__io_wq_worker_cancel(worker, match, worker->cur_work))
1088
match->nr_running++;
1089
raw_spin_unlock(&worker->lock);
1090
1091
return match->nr_running && !match->cancel_all;
1092
}
1093
1094
static inline void io_wq_remove_pending(struct io_wq *wq,
1095
struct io_wq_acct *acct,
1096
struct io_wq_work *work,
1097
struct io_wq_work_node *prev)
1098
{
1099
unsigned int hash = io_get_work_hash(work);
1100
struct io_wq_work *prev_work = NULL;
1101
1102
if (io_wq_is_hashed(work) && work == wq->hash_tail[hash]) {
1103
if (prev)
1104
prev_work = container_of(prev, struct io_wq_work, list);
1105
if (prev_work && io_get_work_hash(prev_work) == hash)
1106
wq->hash_tail[hash] = prev_work;
1107
else
1108
wq->hash_tail[hash] = NULL;
1109
}
1110
wq_list_del(&acct->work_list, &work->list, prev);
1111
}
1112
1113
static bool io_acct_cancel_pending_work(struct io_wq *wq,
1114
struct io_wq_acct *acct,
1115
struct io_cb_cancel_data *match)
1116
{
1117
struct io_wq_work_node *node, *prev;
1118
struct io_wq_work *work;
1119
1120
raw_spin_lock(&acct->lock);
1121
wq_list_for_each(node, prev, &acct->work_list) {
1122
work = container_of(node, struct io_wq_work, list);
1123
if (!match->fn(work, match->data))
1124
continue;
1125
io_wq_remove_pending(wq, acct, work, prev);
1126
raw_spin_unlock(&acct->lock);
1127
io_run_cancel(work, wq);
1128
match->nr_pending++;
1129
/* not safe to continue after unlock */
1130
return true;
1131
}
1132
raw_spin_unlock(&acct->lock);
1133
1134
return false;
1135
}
1136
1137
static void io_wq_cancel_pending_work(struct io_wq *wq,
1138
struct io_cb_cancel_data *match)
1139
{
1140
int i;
1141
retry:
1142
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1143
struct io_wq_acct *acct = io_get_acct(wq, i == 0);
1144
1145
if (io_acct_cancel_pending_work(wq, acct, match)) {
1146
if (match->cancel_all)
1147
goto retry;
1148
break;
1149
}
1150
}
1151
}
1152
1153
static void io_acct_cancel_running_work(struct io_wq_acct *acct,
1154
struct io_cb_cancel_data *match)
1155
{
1156
raw_spin_lock(&acct->workers_lock);
1157
io_acct_for_each_worker(acct, io_wq_worker_cancel, match);
1158
raw_spin_unlock(&acct->workers_lock);
1159
}
1160
1161
static void io_wq_cancel_running_work(struct io_wq *wq,
1162
struct io_cb_cancel_data *match)
1163
{
1164
rcu_read_lock();
1165
1166
for (int i = 0; i < IO_WQ_ACCT_NR; i++)
1167
io_acct_cancel_running_work(&wq->acct[i], match);
1168
1169
rcu_read_unlock();
1170
}
1171
1172
enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
1173
void *data, bool cancel_all)
1174
{
1175
struct io_cb_cancel_data match = {
1176
.fn = cancel,
1177
.data = data,
1178
.cancel_all = cancel_all,
1179
};
1180
1181
/*
1182
* First check pending list, if we're lucky we can just remove it
1183
* from there. CANCEL_OK means that the work is returned as-new,
1184
* no completion will be posted for it.
1185
*
1186
* Then check if a free (going busy) or busy worker has the work
1187
* currently running. If we find it there, we'll return CANCEL_RUNNING
1188
* as an indication that we attempt to signal cancellation. The
1189
* completion will run normally in this case.
1190
*
1191
* Do both of these while holding the acct->workers_lock, to ensure that
1192
* we'll find a work item regardless of state.
1193
*/
1194
io_wq_cancel_pending_work(wq, &match);
1195
if (match.nr_pending && !match.cancel_all)
1196
return IO_WQ_CANCEL_OK;
1197
1198
io_wq_cancel_running_work(wq, &match);
1199
if (match.nr_running && !match.cancel_all)
1200
return IO_WQ_CANCEL_RUNNING;
1201
1202
if (match.nr_running)
1203
return IO_WQ_CANCEL_RUNNING;
1204
if (match.nr_pending)
1205
return IO_WQ_CANCEL_OK;
1206
return IO_WQ_CANCEL_NOTFOUND;
1207
}
1208
1209
static int io_wq_hash_wake(struct wait_queue_entry *wait, unsigned mode,
1210
int sync, void *key)
1211
{
1212
struct io_wq *wq = container_of(wait, struct io_wq, wait);
1213
int i;
1214
1215
list_del_init(&wait->entry);
1216
1217
rcu_read_lock();
1218
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1219
struct io_wq_acct *acct = &wq->acct[i];
1220
1221
if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags))
1222
io_acct_activate_free_worker(acct);
1223
}
1224
rcu_read_unlock();
1225
return 1;
1226
}
1227
1228
struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
1229
{
1230
int ret, i;
1231
struct io_wq *wq;
1232
1233
if (WARN_ON_ONCE(!bounded))
1234
return ERR_PTR(-EINVAL);
1235
1236
wq = kzalloc(sizeof(struct io_wq), GFP_KERNEL);
1237
if (!wq)
1238
return ERR_PTR(-ENOMEM);
1239
1240
refcount_inc(&data->hash->refs);
1241
wq->hash = data->hash;
1242
1243
ret = -ENOMEM;
1244
1245
if (!alloc_cpumask_var(&wq->cpu_mask, GFP_KERNEL))
1246
goto err;
1247
cpuset_cpus_allowed(data->task, wq->cpu_mask);
1248
wq->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
1249
wq->acct[IO_WQ_ACCT_UNBOUND].max_workers =
1250
task_rlimit(current, RLIMIT_NPROC);
1251
INIT_LIST_HEAD(&wq->wait.entry);
1252
wq->wait.func = io_wq_hash_wake;
1253
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1254
struct io_wq_acct *acct = &wq->acct[i];
1255
1256
atomic_set(&acct->nr_running, 0);
1257
1258
raw_spin_lock_init(&acct->workers_lock);
1259
INIT_HLIST_NULLS_HEAD(&acct->free_list, 0);
1260
INIT_LIST_HEAD(&acct->all_list);
1261
1262
INIT_WQ_LIST(&acct->work_list);
1263
raw_spin_lock_init(&acct->lock);
1264
}
1265
1266
wq->task = get_task_struct(data->task);
1267
atomic_set(&wq->worker_refs, 1);
1268
init_completion(&wq->worker_done);
1269
ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node);
1270
if (ret) {
1271
put_task_struct(wq->task);
1272
goto err;
1273
}
1274
1275
return wq;
1276
err:
1277
io_wq_put_hash(data->hash);
1278
free_cpumask_var(wq->cpu_mask);
1279
kfree(wq);
1280
return ERR_PTR(ret);
1281
}
1282
1283
static bool io_task_work_match(struct callback_head *cb, void *data)
1284
{
1285
struct io_worker *worker;
1286
1287
if (cb->func != create_worker_cb && cb->func != create_worker_cont)
1288
return false;
1289
worker = container_of(cb, struct io_worker, create_work);
1290
return worker->wq == data;
1291
}
1292
1293
void io_wq_exit_start(struct io_wq *wq)
1294
{
1295
set_bit(IO_WQ_BIT_EXIT, &wq->state);
1296
}
1297
1298
static void io_wq_cancel_tw_create(struct io_wq *wq)
1299
{
1300
struct callback_head *cb;
1301
1302
while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) {
1303
struct io_worker *worker;
1304
1305
worker = container_of(cb, struct io_worker, create_work);
1306
io_worker_cancel_cb(worker);
1307
/*
1308
* Only the worker continuation helper has worker allocated and
1309
* hence needs freeing.
1310
*/
1311
if (cb->func == create_worker_cont)
1312
kfree(worker);
1313
}
1314
}
1315
1316
static void io_wq_exit_workers(struct io_wq *wq)
1317
{
1318
if (!wq->task)
1319
return;
1320
1321
io_wq_cancel_tw_create(wq);
1322
1323
rcu_read_lock();
1324
io_wq_for_each_worker(wq, io_wq_worker_wake, NULL);
1325
rcu_read_unlock();
1326
io_worker_ref_put(wq);
1327
wait_for_completion(&wq->worker_done);
1328
1329
spin_lock_irq(&wq->hash->wait.lock);
1330
list_del_init(&wq->wait.entry);
1331
spin_unlock_irq(&wq->hash->wait.lock);
1332
1333
put_task_struct(wq->task);
1334
wq->task = NULL;
1335
}
1336
1337
static void io_wq_destroy(struct io_wq *wq)
1338
{
1339
struct io_cb_cancel_data match = {
1340
.fn = io_wq_work_match_all,
1341
.cancel_all = true,
1342
};
1343
1344
cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
1345
io_wq_cancel_pending_work(wq, &match);
1346
free_cpumask_var(wq->cpu_mask);
1347
io_wq_put_hash(wq->hash);
1348
kfree(wq);
1349
}
1350
1351
void io_wq_put_and_exit(struct io_wq *wq)
1352
{
1353
WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state));
1354
1355
io_wq_exit_workers(wq);
1356
io_wq_destroy(wq);
1357
}
1358
1359
struct online_data {
1360
unsigned int cpu;
1361
bool online;
1362
};
1363
1364
static bool io_wq_worker_affinity(struct io_worker *worker, void *data)
1365
{
1366
struct online_data *od = data;
1367
1368
if (od->online)
1369
cpumask_set_cpu(od->cpu, worker->wq->cpu_mask);
1370
else
1371
cpumask_clear_cpu(od->cpu, worker->wq->cpu_mask);
1372
return false;
1373
}
1374
1375
static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online)
1376
{
1377
struct online_data od = {
1378
.cpu = cpu,
1379
.online = online
1380
};
1381
1382
rcu_read_lock();
1383
io_wq_for_each_worker(wq, io_wq_worker_affinity, &od);
1384
rcu_read_unlock();
1385
return 0;
1386
}
1387
1388
static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node)
1389
{
1390
struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);
1391
1392
return __io_wq_cpu_online(wq, cpu, true);
1393
}
1394
1395
static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node)
1396
{
1397
struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);
1398
1399
return __io_wq_cpu_online(wq, cpu, false);
1400
}
1401
1402
int io_wq_cpu_affinity(struct io_uring_task *tctx, cpumask_var_t mask)
1403
{
1404
cpumask_var_t allowed_mask;
1405
int ret = 0;
1406
1407
if (!tctx || !tctx->io_wq)
1408
return -EINVAL;
1409
1410
if (!alloc_cpumask_var(&allowed_mask, GFP_KERNEL))
1411
return -ENOMEM;
1412
1413
rcu_read_lock();
1414
cpuset_cpus_allowed(tctx->io_wq->task, allowed_mask);
1415
if (mask) {
1416
if (cpumask_subset(mask, allowed_mask))
1417
cpumask_copy(tctx->io_wq->cpu_mask, mask);
1418
else
1419
ret = -EINVAL;
1420
} else {
1421
cpumask_copy(tctx->io_wq->cpu_mask, allowed_mask);
1422
}
1423
rcu_read_unlock();
1424
1425
free_cpumask_var(allowed_mask);
1426
return ret;
1427
}
1428
1429
/*
1430
* Set max number of unbounded workers, returns old value. If new_count is 0,
1431
* then just return the old value.
1432
*/
1433
int io_wq_max_workers(struct io_wq *wq, int *new_count)
1434
{
1435
struct io_wq_acct *acct;
1436
int prev[IO_WQ_ACCT_NR];
1437
int i;
1438
1439
BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND != (int) IO_WQ_BOUND);
1440
BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND);
1441
BUILD_BUG_ON((int) IO_WQ_ACCT_NR != 2);
1442
1443
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1444
if (new_count[i] > task_rlimit(current, RLIMIT_NPROC))
1445
new_count[i] = task_rlimit(current, RLIMIT_NPROC);
1446
}
1447
1448
for (i = 0; i < IO_WQ_ACCT_NR; i++)
1449
prev[i] = 0;
1450
1451
rcu_read_lock();
1452
1453
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1454
acct = &wq->acct[i];
1455
raw_spin_lock(&acct->workers_lock);
1456
prev[i] = max_t(int, acct->max_workers, prev[i]);
1457
if (new_count[i])
1458
acct->max_workers = new_count[i];
1459
raw_spin_unlock(&acct->workers_lock);
1460
}
1461
rcu_read_unlock();
1462
1463
for (i = 0; i < IO_WQ_ACCT_NR; i++)
1464
new_count[i] = prev[i];
1465
1466
return 0;
1467
}
1468
1469
static __init int io_wq_init(void)
1470
{
1471
int ret;
1472
1473
ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online",
1474
io_wq_cpu_online, io_wq_cpu_offline);
1475
if (ret < 0)
1476
return ret;
1477
io_wq_online = ret;
1478
return 0;
1479
}
1480
subsys_initcall(io_wq_init);
1481
1482