Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
awilliam
GitHub Repository: awilliam/linux-vfio
Path: blob/master/drivers/md/dm-kcopyd.c
15109 views
1
/*
2
* Copyright (C) 2002 Sistina Software (UK) Limited.
3
* Copyright (C) 2006 Red Hat GmbH
4
*
5
* This file is released under the GPL.
6
*
7
* Kcopyd provides a simple interface for copying an area of one
8
* block-device to one or more other block-devices, with an asynchronous
9
* completion notification.
10
*/
11
12
#include <linux/types.h>
13
#include <asm/atomic.h>
14
#include <linux/blkdev.h>
15
#include <linux/fs.h>
16
#include <linux/init.h>
17
#include <linux/list.h>
18
#include <linux/mempool.h>
19
#include <linux/module.h>
20
#include <linux/pagemap.h>
21
#include <linux/slab.h>
22
#include <linux/vmalloc.h>
23
#include <linux/workqueue.h>
24
#include <linux/mutex.h>
25
#include <linux/device-mapper.h>
26
#include <linux/dm-kcopyd.h>
27
28
#include "dm.h"
29
30
#define SUB_JOB_SIZE 128
31
#define SPLIT_COUNT 8
32
#define MIN_JOBS 8
33
#define RESERVE_PAGES (DIV_ROUND_UP(SUB_JOB_SIZE << SECTOR_SHIFT, PAGE_SIZE))
34
35
/*-----------------------------------------------------------------
36
* Each kcopyd client has its own little pool of preallocated
37
* pages for kcopyd io.
38
*---------------------------------------------------------------*/
39
struct dm_kcopyd_client {
40
struct page_list *pages;
41
unsigned nr_reserved_pages;
42
unsigned nr_free_pages;
43
44
struct dm_io_client *io_client;
45
46
wait_queue_head_t destroyq;
47
atomic_t nr_jobs;
48
49
mempool_t *job_pool;
50
51
struct workqueue_struct *kcopyd_wq;
52
struct work_struct kcopyd_work;
53
54
/*
55
* We maintain three lists of jobs:
56
*
57
* i) jobs waiting for pages
58
* ii) jobs that have pages, and are waiting for the io to be issued.
59
* iii) jobs that have completed.
60
*
61
* All three of these are protected by job_lock.
62
*/
63
spinlock_t job_lock;
64
struct list_head complete_jobs;
65
struct list_head io_jobs;
66
struct list_head pages_jobs;
67
};
68
69
static void wake(struct dm_kcopyd_client *kc)
70
{
71
queue_work(kc->kcopyd_wq, &kc->kcopyd_work);
72
}
73
74
/*
75
* Obtain one page for the use of kcopyd.
76
*/
77
static struct page_list *alloc_pl(gfp_t gfp)
78
{
79
struct page_list *pl;
80
81
pl = kmalloc(sizeof(*pl), gfp);
82
if (!pl)
83
return NULL;
84
85
pl->page = alloc_page(gfp);
86
if (!pl->page) {
87
kfree(pl);
88
return NULL;
89
}
90
91
return pl;
92
}
93
94
static void free_pl(struct page_list *pl)
95
{
96
__free_page(pl->page);
97
kfree(pl);
98
}
99
100
/*
101
* Add the provided pages to a client's free page list, releasing
102
* back to the system any beyond the reserved_pages limit.
103
*/
104
static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl)
105
{
106
struct page_list *next;
107
108
do {
109
next = pl->next;
110
111
if (kc->nr_free_pages >= kc->nr_reserved_pages)
112
free_pl(pl);
113
else {
114
pl->next = kc->pages;
115
kc->pages = pl;
116
kc->nr_free_pages++;
117
}
118
119
pl = next;
120
} while (pl);
121
}
122
123
static int kcopyd_get_pages(struct dm_kcopyd_client *kc,
124
unsigned int nr, struct page_list **pages)
125
{
126
struct page_list *pl;
127
128
*pages = NULL;
129
130
do {
131
pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY);
132
if (unlikely(!pl)) {
133
/* Use reserved pages */
134
pl = kc->pages;
135
if (unlikely(!pl))
136
goto out_of_memory;
137
kc->pages = pl->next;
138
kc->nr_free_pages--;
139
}
140
pl->next = *pages;
141
*pages = pl;
142
} while (--nr);
143
144
return 0;
145
146
out_of_memory:
147
if (*pages)
148
kcopyd_put_pages(kc, *pages);
149
return -ENOMEM;
150
}
151
152
/*
153
* These three functions resize the page pool.
154
*/
155
static void drop_pages(struct page_list *pl)
156
{
157
struct page_list *next;
158
159
while (pl) {
160
next = pl->next;
161
free_pl(pl);
162
pl = next;
163
}
164
}
165
166
/*
167
* Allocate and reserve nr_pages for the use of a specific client.
168
*/
169
static int client_reserve_pages(struct dm_kcopyd_client *kc, unsigned nr_pages)
170
{
171
unsigned i;
172
struct page_list *pl = NULL, *next;
173
174
for (i = 0; i < nr_pages; i++) {
175
next = alloc_pl(GFP_KERNEL);
176
if (!next) {
177
if (pl)
178
drop_pages(pl);
179
return -ENOMEM;
180
}
181
next->next = pl;
182
pl = next;
183
}
184
185
kc->nr_reserved_pages += nr_pages;
186
kcopyd_put_pages(kc, pl);
187
188
return 0;
189
}
190
191
static void client_free_pages(struct dm_kcopyd_client *kc)
192
{
193
BUG_ON(kc->nr_free_pages != kc->nr_reserved_pages);
194
drop_pages(kc->pages);
195
kc->pages = NULL;
196
kc->nr_free_pages = kc->nr_reserved_pages = 0;
197
}
198
199
/*-----------------------------------------------------------------
200
* kcopyd_jobs need to be allocated by the *clients* of kcopyd,
201
* for this reason we use a mempool to prevent the client from
202
* ever having to do io (which could cause a deadlock).
203
*---------------------------------------------------------------*/
204
struct kcopyd_job {
205
struct dm_kcopyd_client *kc;
206
struct list_head list;
207
unsigned long flags;
208
209
/*
210
* Error state of the job.
211
*/
212
int read_err;
213
unsigned long write_err;
214
215
/*
216
* Either READ or WRITE
217
*/
218
int rw;
219
struct dm_io_region source;
220
221
/*
222
* The destinations for the transfer.
223
*/
224
unsigned int num_dests;
225
struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS];
226
227
sector_t offset;
228
unsigned int nr_pages;
229
struct page_list *pages;
230
231
/*
232
* Set this to ensure you are notified when the job has
233
* completed. 'context' is for callback to use.
234
*/
235
dm_kcopyd_notify_fn fn;
236
void *context;
237
238
/*
239
* These fields are only used if the job has been split
240
* into more manageable parts.
241
*/
242
struct mutex lock;
243
atomic_t sub_jobs;
244
sector_t progress;
245
246
struct kcopyd_job *master_job;
247
};
248
249
static struct kmem_cache *_job_cache;
250
251
int __init dm_kcopyd_init(void)
252
{
253
_job_cache = kmem_cache_create("kcopyd_job",
254
sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1),
255
__alignof__(struct kcopyd_job), 0, NULL);
256
if (!_job_cache)
257
return -ENOMEM;
258
259
return 0;
260
}
261
262
void dm_kcopyd_exit(void)
263
{
264
kmem_cache_destroy(_job_cache);
265
_job_cache = NULL;
266
}
267
268
/*
269
* Functions to push and pop a job onto the head of a given job
270
* list.
271
*/
272
static struct kcopyd_job *pop(struct list_head *jobs,
273
struct dm_kcopyd_client *kc)
274
{
275
struct kcopyd_job *job = NULL;
276
unsigned long flags;
277
278
spin_lock_irqsave(&kc->job_lock, flags);
279
280
if (!list_empty(jobs)) {
281
job = list_entry(jobs->next, struct kcopyd_job, list);
282
list_del(&job->list);
283
}
284
spin_unlock_irqrestore(&kc->job_lock, flags);
285
286
return job;
287
}
288
289
static void push(struct list_head *jobs, struct kcopyd_job *job)
290
{
291
unsigned long flags;
292
struct dm_kcopyd_client *kc = job->kc;
293
294
spin_lock_irqsave(&kc->job_lock, flags);
295
list_add_tail(&job->list, jobs);
296
spin_unlock_irqrestore(&kc->job_lock, flags);
297
}
298
299
300
static void push_head(struct list_head *jobs, struct kcopyd_job *job)
301
{
302
unsigned long flags;
303
struct dm_kcopyd_client *kc = job->kc;
304
305
spin_lock_irqsave(&kc->job_lock, flags);
306
list_add(&job->list, jobs);
307
spin_unlock_irqrestore(&kc->job_lock, flags);
308
}
309
310
/*
311
* These three functions process 1 item from the corresponding
312
* job list.
313
*
314
* They return:
315
* < 0: error
316
* 0: success
317
* > 0: can't process yet.
318
*/
319
static int run_complete_job(struct kcopyd_job *job)
320
{
321
void *context = job->context;
322
int read_err = job->read_err;
323
unsigned long write_err = job->write_err;
324
dm_kcopyd_notify_fn fn = job->fn;
325
struct dm_kcopyd_client *kc = job->kc;
326
327
if (job->pages)
328
kcopyd_put_pages(kc, job->pages);
329
/*
330
* If this is the master job, the sub jobs have already
331
* completed so we can free everything.
332
*/
333
if (job->master_job == job)
334
mempool_free(job, kc->job_pool);
335
fn(read_err, write_err, context);
336
337
if (atomic_dec_and_test(&kc->nr_jobs))
338
wake_up(&kc->destroyq);
339
340
return 0;
341
}
342
343
static void complete_io(unsigned long error, void *context)
344
{
345
struct kcopyd_job *job = (struct kcopyd_job *) context;
346
struct dm_kcopyd_client *kc = job->kc;
347
348
if (error) {
349
if (job->rw == WRITE)
350
job->write_err |= error;
351
else
352
job->read_err = 1;
353
354
if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
355
push(&kc->complete_jobs, job);
356
wake(kc);
357
return;
358
}
359
}
360
361
if (job->rw == WRITE)
362
push(&kc->complete_jobs, job);
363
364
else {
365
job->rw = WRITE;
366
push(&kc->io_jobs, job);
367
}
368
369
wake(kc);
370
}
371
372
/*
373
* Request io on as many buffer heads as we can currently get for
374
* a particular job.
375
*/
376
static int run_io_job(struct kcopyd_job *job)
377
{
378
int r;
379
struct dm_io_request io_req = {
380
.bi_rw = job->rw,
381
.mem.type = DM_IO_PAGE_LIST,
382
.mem.ptr.pl = job->pages,
383
.mem.offset = job->offset,
384
.notify.fn = complete_io,
385
.notify.context = job,
386
.client = job->kc->io_client,
387
};
388
389
if (job->rw == READ)
390
r = dm_io(&io_req, 1, &job->source, NULL);
391
else
392
r = dm_io(&io_req, job->num_dests, job->dests, NULL);
393
394
return r;
395
}
396
397
static int run_pages_job(struct kcopyd_job *job)
398
{
399
int r;
400
401
job->nr_pages = dm_div_up(job->dests[0].count + job->offset,
402
PAGE_SIZE >> 9);
403
r = kcopyd_get_pages(job->kc, job->nr_pages, &job->pages);
404
if (!r) {
405
/* this job is ready for io */
406
push(&job->kc->io_jobs, job);
407
return 0;
408
}
409
410
if (r == -ENOMEM)
411
/* can't complete now */
412
return 1;
413
414
return r;
415
}
416
417
/*
418
* Run through a list for as long as possible. Returns the count
419
* of successful jobs.
420
*/
421
static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc,
422
int (*fn) (struct kcopyd_job *))
423
{
424
struct kcopyd_job *job;
425
int r, count = 0;
426
427
while ((job = pop(jobs, kc))) {
428
429
r = fn(job);
430
431
if (r < 0) {
432
/* error this rogue job */
433
if (job->rw == WRITE)
434
job->write_err = (unsigned long) -1L;
435
else
436
job->read_err = 1;
437
push(&kc->complete_jobs, job);
438
break;
439
}
440
441
if (r > 0) {
442
/*
443
* We couldn't service this job ATM, so
444
* push this job back onto the list.
445
*/
446
push_head(jobs, job);
447
break;
448
}
449
450
count++;
451
}
452
453
return count;
454
}
455
456
/*
457
* kcopyd does this every time it's woken up.
458
*/
459
static void do_work(struct work_struct *work)
460
{
461
struct dm_kcopyd_client *kc = container_of(work,
462
struct dm_kcopyd_client, kcopyd_work);
463
struct blk_plug plug;
464
465
/*
466
* The order that these are called is *very* important.
467
* complete jobs can free some pages for pages jobs.
468
* Pages jobs when successful will jump onto the io jobs
469
* list. io jobs call wake when they complete and it all
470
* starts again.
471
*/
472
blk_start_plug(&plug);
473
process_jobs(&kc->complete_jobs, kc, run_complete_job);
474
process_jobs(&kc->pages_jobs, kc, run_pages_job);
475
process_jobs(&kc->io_jobs, kc, run_io_job);
476
blk_finish_plug(&plug);
477
}
478
479
/*
480
* If we are copying a small region we just dispatch a single job
481
* to do the copy, otherwise the io has to be split up into many
482
* jobs.
483
*/
484
static void dispatch_job(struct kcopyd_job *job)
485
{
486
struct dm_kcopyd_client *kc = job->kc;
487
atomic_inc(&kc->nr_jobs);
488
if (unlikely(!job->source.count))
489
push(&kc->complete_jobs, job);
490
else
491
push(&kc->pages_jobs, job);
492
wake(kc);
493
}
494
495
static void segment_complete(int read_err, unsigned long write_err,
496
void *context)
497
{
498
/* FIXME: tidy this function */
499
sector_t progress = 0;
500
sector_t count = 0;
501
struct kcopyd_job *sub_job = (struct kcopyd_job *) context;
502
struct kcopyd_job *job = sub_job->master_job;
503
struct dm_kcopyd_client *kc = job->kc;
504
505
mutex_lock(&job->lock);
506
507
/* update the error */
508
if (read_err)
509
job->read_err = 1;
510
511
if (write_err)
512
job->write_err |= write_err;
513
514
/*
515
* Only dispatch more work if there hasn't been an error.
516
*/
517
if ((!job->read_err && !job->write_err) ||
518
test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
519
/* get the next chunk of work */
520
progress = job->progress;
521
count = job->source.count - progress;
522
if (count) {
523
if (count > SUB_JOB_SIZE)
524
count = SUB_JOB_SIZE;
525
526
job->progress += count;
527
}
528
}
529
mutex_unlock(&job->lock);
530
531
if (count) {
532
int i;
533
534
*sub_job = *job;
535
sub_job->source.sector += progress;
536
sub_job->source.count = count;
537
538
for (i = 0; i < job->num_dests; i++) {
539
sub_job->dests[i].sector += progress;
540
sub_job->dests[i].count = count;
541
}
542
543
sub_job->fn = segment_complete;
544
sub_job->context = sub_job;
545
dispatch_job(sub_job);
546
547
} else if (atomic_dec_and_test(&job->sub_jobs)) {
548
549
/*
550
* Queue the completion callback to the kcopyd thread.
551
*
552
* Some callers assume that all the completions are called
553
* from a single thread and don't race with each other.
554
*
555
* We must not call the callback directly here because this
556
* code may not be executing in the thread.
557
*/
558
push(&kc->complete_jobs, job);
559
wake(kc);
560
}
561
}
562
563
/*
564
* Create some sub jobs to share the work between them.
565
*/
566
static void split_job(struct kcopyd_job *master_job)
567
{
568
int i;
569
570
atomic_inc(&master_job->kc->nr_jobs);
571
572
atomic_set(&master_job->sub_jobs, SPLIT_COUNT);
573
for (i = 0; i < SPLIT_COUNT; i++) {
574
master_job[i + 1].master_job = master_job;
575
segment_complete(0, 0u, &master_job[i + 1]);
576
}
577
}
578
579
int dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from,
580
unsigned int num_dests, struct dm_io_region *dests,
581
unsigned int flags, dm_kcopyd_notify_fn fn, void *context)
582
{
583
struct kcopyd_job *job;
584
585
/*
586
* Allocate an array of jobs consisting of one master job
587
* followed by SPLIT_COUNT sub jobs.
588
*/
589
job = mempool_alloc(kc->job_pool, GFP_NOIO);
590
591
/*
592
* set up for the read.
593
*/
594
job->kc = kc;
595
job->flags = flags;
596
job->read_err = 0;
597
job->write_err = 0;
598
job->rw = READ;
599
600
job->source = *from;
601
602
job->num_dests = num_dests;
603
memcpy(&job->dests, dests, sizeof(*dests) * num_dests);
604
605
job->offset = 0;
606
job->nr_pages = 0;
607
job->pages = NULL;
608
609
job->fn = fn;
610
job->context = context;
611
job->master_job = job;
612
613
if (job->source.count <= SUB_JOB_SIZE)
614
dispatch_job(job);
615
else {
616
mutex_init(&job->lock);
617
job->progress = 0;
618
split_job(job);
619
}
620
621
return 0;
622
}
623
EXPORT_SYMBOL(dm_kcopyd_copy);
624
625
/*
626
* Cancels a kcopyd job, eg. someone might be deactivating a
627
* mirror.
628
*/
629
#if 0
630
int kcopyd_cancel(struct kcopyd_job *job, int block)
631
{
632
/* FIXME: finish */
633
return -1;
634
}
635
#endif /* 0 */
636
637
/*-----------------------------------------------------------------
638
* Client setup
639
*---------------------------------------------------------------*/
640
struct dm_kcopyd_client *dm_kcopyd_client_create(void)
641
{
642
int r = -ENOMEM;
643
struct dm_kcopyd_client *kc;
644
645
kc = kmalloc(sizeof(*kc), GFP_KERNEL);
646
if (!kc)
647
return ERR_PTR(-ENOMEM);
648
649
spin_lock_init(&kc->job_lock);
650
INIT_LIST_HEAD(&kc->complete_jobs);
651
INIT_LIST_HEAD(&kc->io_jobs);
652
INIT_LIST_HEAD(&kc->pages_jobs);
653
654
kc->job_pool = mempool_create_slab_pool(MIN_JOBS, _job_cache);
655
if (!kc->job_pool)
656
goto bad_slab;
657
658
INIT_WORK(&kc->kcopyd_work, do_work);
659
kc->kcopyd_wq = alloc_workqueue("kcopyd",
660
WQ_NON_REENTRANT | WQ_MEM_RECLAIM, 0);
661
if (!kc->kcopyd_wq)
662
goto bad_workqueue;
663
664
kc->pages = NULL;
665
kc->nr_reserved_pages = kc->nr_free_pages = 0;
666
r = client_reserve_pages(kc, RESERVE_PAGES);
667
if (r)
668
goto bad_client_pages;
669
670
kc->io_client = dm_io_client_create();
671
if (IS_ERR(kc->io_client)) {
672
r = PTR_ERR(kc->io_client);
673
goto bad_io_client;
674
}
675
676
init_waitqueue_head(&kc->destroyq);
677
atomic_set(&kc->nr_jobs, 0);
678
679
return kc;
680
681
bad_io_client:
682
client_free_pages(kc);
683
bad_client_pages:
684
destroy_workqueue(kc->kcopyd_wq);
685
bad_workqueue:
686
mempool_destroy(kc->job_pool);
687
bad_slab:
688
kfree(kc);
689
690
return ERR_PTR(r);
691
}
692
EXPORT_SYMBOL(dm_kcopyd_client_create);
693
694
void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc)
695
{
696
/* Wait for completion of all jobs submitted by this client. */
697
wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs));
698
699
BUG_ON(!list_empty(&kc->complete_jobs));
700
BUG_ON(!list_empty(&kc->io_jobs));
701
BUG_ON(!list_empty(&kc->pages_jobs));
702
destroy_workqueue(kc->kcopyd_wq);
703
dm_io_client_destroy(kc->io_client);
704
client_free_pages(kc);
705
mempool_destroy(kc->job_pool);
706
kfree(kc);
707
}
708
EXPORT_SYMBOL(dm_kcopyd_client_destroy);
709
710