Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
awilliam
GitHub Repository: awilliam/linux-vfio
Path: blob/master/fs/btrfs/async-thread.c
15111 views
1
/*
2
* Copyright (C) 2007 Oracle. All rights reserved.
3
*
4
* This program is free software; you can redistribute it and/or
5
* modify it under the terms of the GNU General Public
6
* License v2 as published by the Free Software Foundation.
7
*
8
* This program is distributed in the hope that it will be useful,
9
* but WITHOUT ANY WARRANTY; without even the implied warranty of
10
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11
* General Public License for more details.
12
*
13
* You should have received a copy of the GNU General Public
14
* License along with this program; if not, write to the
15
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
16
* Boston, MA 021110-1307, USA.
17
*/
18
19
#include <linux/kthread.h>
20
#include <linux/slab.h>
21
#include <linux/list.h>
22
#include <linux/spinlock.h>
23
#include <linux/freezer.h>
24
#include "async-thread.h"
25
26
#define WORK_QUEUED_BIT 0
27
#define WORK_DONE_BIT 1
28
#define WORK_ORDER_DONE_BIT 2
29
#define WORK_HIGH_PRIO_BIT 3
30
31
/*
32
* container for the kthread task pointer and the list of pending work
33
* One of these is allocated per thread.
34
*/
35
struct btrfs_worker_thread {
36
/* pool we belong to */
37
struct btrfs_workers *workers;
38
39
/* list of struct btrfs_work that are waiting for service */
40
struct list_head pending;
41
struct list_head prio_pending;
42
43
/* list of worker threads from struct btrfs_workers */
44
struct list_head worker_list;
45
46
/* kthread */
47
struct task_struct *task;
48
49
/* number of things on the pending list */
50
atomic_t num_pending;
51
52
/* reference counter for this struct */
53
atomic_t refs;
54
55
unsigned long sequence;
56
57
/* protects the pending list. */
58
spinlock_t lock;
59
60
/* set to non-zero when this thread is already awake and kicking */
61
int working;
62
63
/* are we currently idle */
64
int idle;
65
};
66
67
/*
68
* btrfs_start_workers uses kthread_run, which can block waiting for memory
69
* for a very long time. It will actually throttle on page writeback,
70
* and so it may not make progress until after our btrfs worker threads
71
* process all of the pending work structs in their queue
72
*
73
* This means we can't use btrfs_start_workers from inside a btrfs worker
74
* thread that is used as part of cleaning dirty memory, which pretty much
75
* involves all of the worker threads.
76
*
77
* Instead we have a helper queue who never has more than one thread
78
* where we scheduler thread start operations. This worker_start struct
79
* is used to contain the work and hold a pointer to the queue that needs
80
* another worker.
81
*/
82
struct worker_start {
83
struct btrfs_work work;
84
struct btrfs_workers *queue;
85
};
86
87
static void start_new_worker_func(struct btrfs_work *work)
88
{
89
struct worker_start *start;
90
start = container_of(work, struct worker_start, work);
91
btrfs_start_workers(start->queue, 1);
92
kfree(start);
93
}
94
95
static int start_new_worker(struct btrfs_workers *queue)
96
{
97
struct worker_start *start;
98
int ret;
99
100
start = kzalloc(sizeof(*start), GFP_NOFS);
101
if (!start)
102
return -ENOMEM;
103
104
start->work.func = start_new_worker_func;
105
start->queue = queue;
106
ret = btrfs_queue_worker(queue->atomic_worker_start, &start->work);
107
if (ret)
108
kfree(start);
109
return ret;
110
}
111
112
/*
113
* helper function to move a thread onto the idle list after it
114
* has finished some requests.
115
*/
116
static void check_idle_worker(struct btrfs_worker_thread *worker)
117
{
118
if (!worker->idle && atomic_read(&worker->num_pending) <
119
worker->workers->idle_thresh / 2) {
120
unsigned long flags;
121
spin_lock_irqsave(&worker->workers->lock, flags);
122
worker->idle = 1;
123
124
/* the list may be empty if the worker is just starting */
125
if (!list_empty(&worker->worker_list)) {
126
list_move(&worker->worker_list,
127
&worker->workers->idle_list);
128
}
129
spin_unlock_irqrestore(&worker->workers->lock, flags);
130
}
131
}
132
133
/*
134
* helper function to move a thread off the idle list after new
135
* pending work is added.
136
*/
137
static void check_busy_worker(struct btrfs_worker_thread *worker)
138
{
139
if (worker->idle && atomic_read(&worker->num_pending) >=
140
worker->workers->idle_thresh) {
141
unsigned long flags;
142
spin_lock_irqsave(&worker->workers->lock, flags);
143
worker->idle = 0;
144
145
if (!list_empty(&worker->worker_list)) {
146
list_move_tail(&worker->worker_list,
147
&worker->workers->worker_list);
148
}
149
spin_unlock_irqrestore(&worker->workers->lock, flags);
150
}
151
}
152
153
static void check_pending_worker_creates(struct btrfs_worker_thread *worker)
154
{
155
struct btrfs_workers *workers = worker->workers;
156
unsigned long flags;
157
158
rmb();
159
if (!workers->atomic_start_pending)
160
return;
161
162
spin_lock_irqsave(&workers->lock, flags);
163
if (!workers->atomic_start_pending)
164
goto out;
165
166
workers->atomic_start_pending = 0;
167
if (workers->num_workers + workers->num_workers_starting >=
168
workers->max_workers)
169
goto out;
170
171
workers->num_workers_starting += 1;
172
spin_unlock_irqrestore(&workers->lock, flags);
173
start_new_worker(workers);
174
return;
175
176
out:
177
spin_unlock_irqrestore(&workers->lock, flags);
178
}
179
180
static noinline int run_ordered_completions(struct btrfs_workers *workers,
181
struct btrfs_work *work)
182
{
183
if (!workers->ordered)
184
return 0;
185
186
set_bit(WORK_DONE_BIT, &work->flags);
187
188
spin_lock(&workers->order_lock);
189
190
while (1) {
191
if (!list_empty(&workers->prio_order_list)) {
192
work = list_entry(workers->prio_order_list.next,
193
struct btrfs_work, order_list);
194
} else if (!list_empty(&workers->order_list)) {
195
work = list_entry(workers->order_list.next,
196
struct btrfs_work, order_list);
197
} else {
198
break;
199
}
200
if (!test_bit(WORK_DONE_BIT, &work->flags))
201
break;
202
203
/* we are going to call the ordered done function, but
204
* we leave the work item on the list as a barrier so
205
* that later work items that are done don't have their
206
* functions called before this one returns
207
*/
208
if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
209
break;
210
211
spin_unlock(&workers->order_lock);
212
213
work->ordered_func(work);
214
215
/* now take the lock again and call the freeing code */
216
spin_lock(&workers->order_lock);
217
list_del(&work->order_list);
218
work->ordered_free(work);
219
}
220
221
spin_unlock(&workers->order_lock);
222
return 0;
223
}
224
225
static void put_worker(struct btrfs_worker_thread *worker)
226
{
227
if (atomic_dec_and_test(&worker->refs))
228
kfree(worker);
229
}
230
231
static int try_worker_shutdown(struct btrfs_worker_thread *worker)
232
{
233
int freeit = 0;
234
235
spin_lock_irq(&worker->lock);
236
spin_lock(&worker->workers->lock);
237
if (worker->workers->num_workers > 1 &&
238
worker->idle &&
239
!worker->working &&
240
!list_empty(&worker->worker_list) &&
241
list_empty(&worker->prio_pending) &&
242
list_empty(&worker->pending) &&
243
atomic_read(&worker->num_pending) == 0) {
244
freeit = 1;
245
list_del_init(&worker->worker_list);
246
worker->workers->num_workers--;
247
}
248
spin_unlock(&worker->workers->lock);
249
spin_unlock_irq(&worker->lock);
250
251
if (freeit)
252
put_worker(worker);
253
return freeit;
254
}
255
256
static struct btrfs_work *get_next_work(struct btrfs_worker_thread *worker,
257
struct list_head *prio_head,
258
struct list_head *head)
259
{
260
struct btrfs_work *work = NULL;
261
struct list_head *cur = NULL;
262
263
if(!list_empty(prio_head))
264
cur = prio_head->next;
265
266
smp_mb();
267
if (!list_empty(&worker->prio_pending))
268
goto refill;
269
270
if (!list_empty(head))
271
cur = head->next;
272
273
if (cur)
274
goto out;
275
276
refill:
277
spin_lock_irq(&worker->lock);
278
list_splice_tail_init(&worker->prio_pending, prio_head);
279
list_splice_tail_init(&worker->pending, head);
280
281
if (!list_empty(prio_head))
282
cur = prio_head->next;
283
else if (!list_empty(head))
284
cur = head->next;
285
spin_unlock_irq(&worker->lock);
286
287
if (!cur)
288
goto out_fail;
289
290
out:
291
work = list_entry(cur, struct btrfs_work, list);
292
293
out_fail:
294
return work;
295
}
296
297
/*
298
* main loop for servicing work items
299
*/
300
static int worker_loop(void *arg)
301
{
302
struct btrfs_worker_thread *worker = arg;
303
struct list_head head;
304
struct list_head prio_head;
305
struct btrfs_work *work;
306
307
INIT_LIST_HEAD(&head);
308
INIT_LIST_HEAD(&prio_head);
309
310
do {
311
again:
312
while (1) {
313
314
315
work = get_next_work(worker, &prio_head, &head);
316
if (!work)
317
break;
318
319
list_del(&work->list);
320
clear_bit(WORK_QUEUED_BIT, &work->flags);
321
322
work->worker = worker;
323
324
work->func(work);
325
326
atomic_dec(&worker->num_pending);
327
/*
328
* unless this is an ordered work queue,
329
* 'work' was probably freed by func above.
330
*/
331
run_ordered_completions(worker->workers, work);
332
333
check_pending_worker_creates(worker);
334
335
}
336
337
spin_lock_irq(&worker->lock);
338
check_idle_worker(worker);
339
340
if (freezing(current)) {
341
worker->working = 0;
342
spin_unlock_irq(&worker->lock);
343
refrigerator();
344
} else {
345
spin_unlock_irq(&worker->lock);
346
if (!kthread_should_stop()) {
347
cpu_relax();
348
/*
349
* we've dropped the lock, did someone else
350
* jump_in?
351
*/
352
smp_mb();
353
if (!list_empty(&worker->pending) ||
354
!list_empty(&worker->prio_pending))
355
continue;
356
357
/*
358
* this short schedule allows more work to
359
* come in without the queue functions
360
* needing to go through wake_up_process()
361
*
362
* worker->working is still 1, so nobody
363
* is going to try and wake us up
364
*/
365
schedule_timeout(1);
366
smp_mb();
367
if (!list_empty(&worker->pending) ||
368
!list_empty(&worker->prio_pending))
369
continue;
370
371
if (kthread_should_stop())
372
break;
373
374
/* still no more work?, sleep for real */
375
spin_lock_irq(&worker->lock);
376
set_current_state(TASK_INTERRUPTIBLE);
377
if (!list_empty(&worker->pending) ||
378
!list_empty(&worker->prio_pending)) {
379
spin_unlock_irq(&worker->lock);
380
set_current_state(TASK_RUNNING);
381
goto again;
382
}
383
384
/*
385
* this makes sure we get a wakeup when someone
386
* adds something new to the queue
387
*/
388
worker->working = 0;
389
spin_unlock_irq(&worker->lock);
390
391
if (!kthread_should_stop()) {
392
schedule_timeout(HZ * 120);
393
if (!worker->working &&
394
try_worker_shutdown(worker)) {
395
return 0;
396
}
397
}
398
}
399
__set_current_state(TASK_RUNNING);
400
}
401
} while (!kthread_should_stop());
402
return 0;
403
}
404
405
/*
406
* this will wait for all the worker threads to shutdown
407
*/
408
int btrfs_stop_workers(struct btrfs_workers *workers)
409
{
410
struct list_head *cur;
411
struct btrfs_worker_thread *worker;
412
int can_stop;
413
414
spin_lock_irq(&workers->lock);
415
list_splice_init(&workers->idle_list, &workers->worker_list);
416
while (!list_empty(&workers->worker_list)) {
417
cur = workers->worker_list.next;
418
worker = list_entry(cur, struct btrfs_worker_thread,
419
worker_list);
420
421
atomic_inc(&worker->refs);
422
workers->num_workers -= 1;
423
if (!list_empty(&worker->worker_list)) {
424
list_del_init(&worker->worker_list);
425
put_worker(worker);
426
can_stop = 1;
427
} else
428
can_stop = 0;
429
spin_unlock_irq(&workers->lock);
430
if (can_stop)
431
kthread_stop(worker->task);
432
spin_lock_irq(&workers->lock);
433
put_worker(worker);
434
}
435
spin_unlock_irq(&workers->lock);
436
return 0;
437
}
438
439
/*
440
* simple init on struct btrfs_workers
441
*/
442
void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max,
443
struct btrfs_workers *async_helper)
444
{
445
workers->num_workers = 0;
446
workers->num_workers_starting = 0;
447
INIT_LIST_HEAD(&workers->worker_list);
448
INIT_LIST_HEAD(&workers->idle_list);
449
INIT_LIST_HEAD(&workers->order_list);
450
INIT_LIST_HEAD(&workers->prio_order_list);
451
spin_lock_init(&workers->lock);
452
spin_lock_init(&workers->order_lock);
453
workers->max_workers = max;
454
workers->idle_thresh = 32;
455
workers->name = name;
456
workers->ordered = 0;
457
workers->atomic_start_pending = 0;
458
workers->atomic_worker_start = async_helper;
459
}
460
461
/*
462
* starts new worker threads. This does not enforce the max worker
463
* count in case you need to temporarily go past it.
464
*/
465
static int __btrfs_start_workers(struct btrfs_workers *workers,
466
int num_workers)
467
{
468
struct btrfs_worker_thread *worker;
469
int ret = 0;
470
int i;
471
472
for (i = 0; i < num_workers; i++) {
473
worker = kzalloc(sizeof(*worker), GFP_NOFS);
474
if (!worker) {
475
ret = -ENOMEM;
476
goto fail;
477
}
478
479
INIT_LIST_HEAD(&worker->pending);
480
INIT_LIST_HEAD(&worker->prio_pending);
481
INIT_LIST_HEAD(&worker->worker_list);
482
spin_lock_init(&worker->lock);
483
484
atomic_set(&worker->num_pending, 0);
485
atomic_set(&worker->refs, 1);
486
worker->workers = workers;
487
worker->task = kthread_run(worker_loop, worker,
488
"btrfs-%s-%d", workers->name,
489
workers->num_workers + i);
490
if (IS_ERR(worker->task)) {
491
ret = PTR_ERR(worker->task);
492
kfree(worker);
493
goto fail;
494
}
495
spin_lock_irq(&workers->lock);
496
list_add_tail(&worker->worker_list, &workers->idle_list);
497
worker->idle = 1;
498
workers->num_workers++;
499
workers->num_workers_starting--;
500
WARN_ON(workers->num_workers_starting < 0);
501
spin_unlock_irq(&workers->lock);
502
}
503
return 0;
504
fail:
505
btrfs_stop_workers(workers);
506
return ret;
507
}
508
509
int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
510
{
511
spin_lock_irq(&workers->lock);
512
workers->num_workers_starting += num_workers;
513
spin_unlock_irq(&workers->lock);
514
return __btrfs_start_workers(workers, num_workers);
515
}
516
517
/*
518
* run through the list and find a worker thread that doesn't have a lot
519
* to do right now. This can return null if we aren't yet at the thread
520
* count limit and all of the threads are busy.
521
*/
522
static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers)
523
{
524
struct btrfs_worker_thread *worker;
525
struct list_head *next;
526
int enforce_min;
527
528
enforce_min = (workers->num_workers + workers->num_workers_starting) <
529
workers->max_workers;
530
531
/*
532
* if we find an idle thread, don't move it to the end of the
533
* idle list. This improves the chance that the next submission
534
* will reuse the same thread, and maybe catch it while it is still
535
* working
536
*/
537
if (!list_empty(&workers->idle_list)) {
538
next = workers->idle_list.next;
539
worker = list_entry(next, struct btrfs_worker_thread,
540
worker_list);
541
return worker;
542
}
543
if (enforce_min || list_empty(&workers->worker_list))
544
return NULL;
545
546
/*
547
* if we pick a busy task, move the task to the end of the list.
548
* hopefully this will keep things somewhat evenly balanced.
549
* Do the move in batches based on the sequence number. This groups
550
* requests submitted at roughly the same time onto the same worker.
551
*/
552
next = workers->worker_list.next;
553
worker = list_entry(next, struct btrfs_worker_thread, worker_list);
554
worker->sequence++;
555
556
if (worker->sequence % workers->idle_thresh == 0)
557
list_move_tail(next, &workers->worker_list);
558
return worker;
559
}
560
561
/*
562
* selects a worker thread to take the next job. This will either find
563
* an idle worker, start a new worker up to the max count, or just return
564
* one of the existing busy workers.
565
*/
566
static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
567
{
568
struct btrfs_worker_thread *worker;
569
unsigned long flags;
570
struct list_head *fallback;
571
572
again:
573
spin_lock_irqsave(&workers->lock, flags);
574
worker = next_worker(workers);
575
576
if (!worker) {
577
if (workers->num_workers + workers->num_workers_starting >=
578
workers->max_workers) {
579
goto fallback;
580
} else if (workers->atomic_worker_start) {
581
workers->atomic_start_pending = 1;
582
goto fallback;
583
} else {
584
workers->num_workers_starting++;
585
spin_unlock_irqrestore(&workers->lock, flags);
586
/* we're below the limit, start another worker */
587
__btrfs_start_workers(workers, 1);
588
goto again;
589
}
590
}
591
goto found;
592
593
fallback:
594
fallback = NULL;
595
/*
596
* we have failed to find any workers, just
597
* return the first one we can find.
598
*/
599
if (!list_empty(&workers->worker_list))
600
fallback = workers->worker_list.next;
601
if (!list_empty(&workers->idle_list))
602
fallback = workers->idle_list.next;
603
BUG_ON(!fallback);
604
worker = list_entry(fallback,
605
struct btrfs_worker_thread, worker_list);
606
found:
607
/*
608
* this makes sure the worker doesn't exit before it is placed
609
* onto a busy/idle list
610
*/
611
atomic_inc(&worker->num_pending);
612
spin_unlock_irqrestore(&workers->lock, flags);
613
return worker;
614
}
615
616
/*
617
* btrfs_requeue_work just puts the work item back on the tail of the list
618
* it was taken from. It is intended for use with long running work functions
619
* that make some progress and want to give the cpu up for others.
620
*/
621
int btrfs_requeue_work(struct btrfs_work *work)
622
{
623
struct btrfs_worker_thread *worker = work->worker;
624
unsigned long flags;
625
int wake = 0;
626
627
if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
628
goto out;
629
630
spin_lock_irqsave(&worker->lock, flags);
631
if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
632
list_add_tail(&work->list, &worker->prio_pending);
633
else
634
list_add_tail(&work->list, &worker->pending);
635
atomic_inc(&worker->num_pending);
636
637
/* by definition we're busy, take ourselves off the idle
638
* list
639
*/
640
if (worker->idle) {
641
spin_lock(&worker->workers->lock);
642
worker->idle = 0;
643
list_move_tail(&worker->worker_list,
644
&worker->workers->worker_list);
645
spin_unlock(&worker->workers->lock);
646
}
647
if (!worker->working) {
648
wake = 1;
649
worker->working = 1;
650
}
651
652
if (wake)
653
wake_up_process(worker->task);
654
spin_unlock_irqrestore(&worker->lock, flags);
655
out:
656
657
return 0;
658
}
659
660
void btrfs_set_work_high_prio(struct btrfs_work *work)
661
{
662
set_bit(WORK_HIGH_PRIO_BIT, &work->flags);
663
}
664
665
/*
666
* places a struct btrfs_work into the pending queue of one of the kthreads
667
*/
668
int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
669
{
670
struct btrfs_worker_thread *worker;
671
unsigned long flags;
672
int wake = 0;
673
674
/* don't requeue something already on a list */
675
if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
676
goto out;
677
678
worker = find_worker(workers);
679
if (workers->ordered) {
680
/*
681
* you're not allowed to do ordered queues from an
682
* interrupt handler
683
*/
684
spin_lock(&workers->order_lock);
685
if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) {
686
list_add_tail(&work->order_list,
687
&workers->prio_order_list);
688
} else {
689
list_add_tail(&work->order_list, &workers->order_list);
690
}
691
spin_unlock(&workers->order_lock);
692
} else {
693
INIT_LIST_HEAD(&work->order_list);
694
}
695
696
spin_lock_irqsave(&worker->lock, flags);
697
698
if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
699
list_add_tail(&work->list, &worker->prio_pending);
700
else
701
list_add_tail(&work->list, &worker->pending);
702
check_busy_worker(worker);
703
704
/*
705
* avoid calling into wake_up_process if this thread has already
706
* been kicked
707
*/
708
if (!worker->working)
709
wake = 1;
710
worker->working = 1;
711
712
if (wake)
713
wake_up_process(worker->task);
714
spin_unlock_irqrestore(&worker->lock, flags);
715
716
out:
717
return 0;
718
}
719
720