Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
awilliam
GitHub Repository: awilliam/linux-vfio
Path: blob/master/drivers/block/drbd/drbd_worker.c
15179 views
1
/*
2
drbd_worker.c
3
4
This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6
Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7
Copyright (C) 1999-2008, Philipp Reisner <[email protected]>.
8
Copyright (C) 2002-2008, Lars Ellenberg <[email protected]>.
9
10
drbd is free software; you can redistribute it and/or modify
11
it under the terms of the GNU General Public License as published by
12
the Free Software Foundation; either version 2, or (at your option)
13
any later version.
14
15
drbd is distributed in the hope that it will be useful,
16
but WITHOUT ANY WARRANTY; without even the implied warranty of
17
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18
GNU General Public License for more details.
19
20
You should have received a copy of the GNU General Public License
21
along with drbd; see the file COPYING. If not, write to
22
the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24
*/
25
26
#include <linux/module.h>
27
#include <linux/drbd.h>
28
#include <linux/sched.h>
29
#include <linux/wait.h>
30
#include <linux/mm.h>
31
#include <linux/memcontrol.h>
32
#include <linux/mm_inline.h>
33
#include <linux/slab.h>
34
#include <linux/random.h>
35
#include <linux/string.h>
36
#include <linux/scatterlist.h>
37
38
#include "drbd_int.h"
39
#include "drbd_req.h"
40
41
static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
42
static int w_make_resync_request(struct drbd_conf *mdev,
43
struct drbd_work *w, int cancel);
44
45
46
47
/* endio handlers:
48
* drbd_md_io_complete (defined here)
49
* drbd_endio_pri (defined here)
50
* drbd_endio_sec (defined here)
51
* bm_async_io_complete (defined in drbd_bitmap.c)
52
*
53
* For all these callbacks, note the following:
54
* The callbacks will be called in irq context by the IDE drivers,
55
* and in Softirqs/Tasklets/BH context by the SCSI drivers.
56
* Try to get the locking right :)
57
*
58
*/
59
60
61
/* About the global_state_lock
62
Each state transition on an device holds a read lock. In case we have
63
to evaluate the sync after dependencies, we grab a write lock, because
64
we need stable states on all devices for that. */
65
rwlock_t global_state_lock;
66
67
/* used for synchronous meta data and bitmap IO
68
* submitted by drbd_md_sync_page_io()
69
*/
70
void drbd_md_io_complete(struct bio *bio, int error)
71
{
72
struct drbd_md_io *md_io;
73
74
md_io = (struct drbd_md_io *)bio->bi_private;
75
md_io->error = error;
76
77
complete(&md_io->event);
78
}
79
80
/* reads on behalf of the partner,
81
* "submitted" by the receiver
82
*/
83
void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
84
{
85
unsigned long flags = 0;
86
struct drbd_conf *mdev = e->mdev;
87
88
D_ASSERT(e->block_id != ID_VACANT);
89
90
spin_lock_irqsave(&mdev->req_lock, flags);
91
mdev->read_cnt += e->size >> 9;
92
list_del(&e->w.list);
93
if (list_empty(&mdev->read_ee))
94
wake_up(&mdev->ee_wait);
95
if (test_bit(__EE_WAS_ERROR, &e->flags))
96
__drbd_chk_io_error(mdev, false);
97
spin_unlock_irqrestore(&mdev->req_lock, flags);
98
99
drbd_queue_work(&mdev->data.work, &e->w);
100
put_ldev(mdev);
101
}
102
103
/* writes on behalf of the partner, or resync writes,
104
* "submitted" by the receiver, final stage. */
105
static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
106
{
107
unsigned long flags = 0;
108
struct drbd_conf *mdev = e->mdev;
109
sector_t e_sector;
110
int do_wake;
111
int is_syncer_req;
112
int do_al_complete_io;
113
114
D_ASSERT(e->block_id != ID_VACANT);
115
116
/* after we moved e to done_ee,
117
* we may no longer access it,
118
* it may be freed/reused already!
119
* (as soon as we release the req_lock) */
120
e_sector = e->sector;
121
do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
122
is_syncer_req = is_syncer_block_id(e->block_id);
123
124
spin_lock_irqsave(&mdev->req_lock, flags);
125
mdev->writ_cnt += e->size >> 9;
126
list_del(&e->w.list); /* has been on active_ee or sync_ee */
127
list_add_tail(&e->w.list, &mdev->done_ee);
128
129
/* No hlist_del_init(&e->collision) here, we did not send the Ack yet,
130
* neither did we wake possibly waiting conflicting requests.
131
* done from "drbd_process_done_ee" within the appropriate w.cb
132
* (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
133
134
do_wake = is_syncer_req
135
? list_empty(&mdev->sync_ee)
136
: list_empty(&mdev->active_ee);
137
138
if (test_bit(__EE_WAS_ERROR, &e->flags))
139
__drbd_chk_io_error(mdev, false);
140
spin_unlock_irqrestore(&mdev->req_lock, flags);
141
142
if (is_syncer_req)
143
drbd_rs_complete_io(mdev, e_sector);
144
145
if (do_wake)
146
wake_up(&mdev->ee_wait);
147
148
if (do_al_complete_io)
149
drbd_al_complete_io(mdev, e_sector);
150
151
wake_asender(mdev);
152
put_ldev(mdev);
153
}
154
155
/* writes on behalf of the partner, or resync writes,
156
* "submitted" by the receiver.
157
*/
158
void drbd_endio_sec(struct bio *bio, int error)
159
{
160
struct drbd_epoch_entry *e = bio->bi_private;
161
struct drbd_conf *mdev = e->mdev;
162
int uptodate = bio_flagged(bio, BIO_UPTODATE);
163
int is_write = bio_data_dir(bio) == WRITE;
164
165
if (error && __ratelimit(&drbd_ratelimit_state))
166
dev_warn(DEV, "%s: error=%d s=%llus\n",
167
is_write ? "write" : "read", error,
168
(unsigned long long)e->sector);
169
if (!error && !uptodate) {
170
if (__ratelimit(&drbd_ratelimit_state))
171
dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
172
is_write ? "write" : "read",
173
(unsigned long long)e->sector);
174
/* strange behavior of some lower level drivers...
175
* fail the request by clearing the uptodate flag,
176
* but do not return any error?! */
177
error = -EIO;
178
}
179
180
if (error)
181
set_bit(__EE_WAS_ERROR, &e->flags);
182
183
bio_put(bio); /* no need for the bio anymore */
184
if (atomic_dec_and_test(&e->pending_bios)) {
185
if (is_write)
186
drbd_endio_write_sec_final(e);
187
else
188
drbd_endio_read_sec_final(e);
189
}
190
}
191
192
/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
193
*/
194
void drbd_endio_pri(struct bio *bio, int error)
195
{
196
unsigned long flags;
197
struct drbd_request *req = bio->bi_private;
198
struct drbd_conf *mdev = req->mdev;
199
struct bio_and_error m;
200
enum drbd_req_event what;
201
int uptodate = bio_flagged(bio, BIO_UPTODATE);
202
203
if (!error && !uptodate) {
204
dev_warn(DEV, "p %s: setting error to -EIO\n",
205
bio_data_dir(bio) == WRITE ? "write" : "read");
206
/* strange behavior of some lower level drivers...
207
* fail the request by clearing the uptodate flag,
208
* but do not return any error?! */
209
error = -EIO;
210
}
211
212
/* to avoid recursion in __req_mod */
213
if (unlikely(error)) {
214
what = (bio_data_dir(bio) == WRITE)
215
? write_completed_with_error
216
: (bio_rw(bio) == READ)
217
? read_completed_with_error
218
: read_ahead_completed_with_error;
219
} else
220
what = completed_ok;
221
222
bio_put(req->private_bio);
223
req->private_bio = ERR_PTR(error);
224
225
/* not req_mod(), we need irqsave here! */
226
spin_lock_irqsave(&mdev->req_lock, flags);
227
__req_mod(req, what, &m);
228
spin_unlock_irqrestore(&mdev->req_lock, flags);
229
230
if (m.bio)
231
complete_master_bio(mdev, &m);
232
}
233
234
int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
235
{
236
struct drbd_request *req = container_of(w, struct drbd_request, w);
237
238
/* We should not detach for read io-error,
239
* but try to WRITE the P_DATA_REPLY to the failed location,
240
* to give the disk the chance to relocate that block */
241
242
spin_lock_irq(&mdev->req_lock);
243
if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
244
_req_mod(req, read_retry_remote_canceled);
245
spin_unlock_irq(&mdev->req_lock);
246
return 1;
247
}
248
spin_unlock_irq(&mdev->req_lock);
249
250
return w_send_read_req(mdev, w, 0);
251
}
252
253
void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
254
{
255
struct hash_desc desc;
256
struct scatterlist sg;
257
struct page *page = e->pages;
258
struct page *tmp;
259
unsigned len;
260
261
desc.tfm = tfm;
262
desc.flags = 0;
263
264
sg_init_table(&sg, 1);
265
crypto_hash_init(&desc);
266
267
while ((tmp = page_chain_next(page))) {
268
/* all but the last page will be fully used */
269
sg_set_page(&sg, page, PAGE_SIZE, 0);
270
crypto_hash_update(&desc, &sg, sg.length);
271
page = tmp;
272
}
273
/* and now the last, possibly only partially used page */
274
len = e->size & (PAGE_SIZE - 1);
275
sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
276
crypto_hash_update(&desc, &sg, sg.length);
277
crypto_hash_final(&desc, digest);
278
}
279
280
void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
281
{
282
struct hash_desc desc;
283
struct scatterlist sg;
284
struct bio_vec *bvec;
285
int i;
286
287
desc.tfm = tfm;
288
desc.flags = 0;
289
290
sg_init_table(&sg, 1);
291
crypto_hash_init(&desc);
292
293
__bio_for_each_segment(bvec, bio, i, 0) {
294
sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
295
crypto_hash_update(&desc, &sg, sg.length);
296
}
297
crypto_hash_final(&desc, digest);
298
}
299
300
/* TODO merge common code with w_e_end_ov_req */
301
int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
302
{
303
struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
304
int digest_size;
305
void *digest;
306
int ok = 1;
307
308
D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
309
310
if (unlikely(cancel))
311
goto out;
312
313
if (likely((e->flags & EE_WAS_ERROR) != 0))
314
goto out;
315
316
digest_size = crypto_hash_digestsize(mdev->csums_tfm);
317
digest = kmalloc(digest_size, GFP_NOIO);
318
if (digest) {
319
sector_t sector = e->sector;
320
unsigned int size = e->size;
321
drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
322
/* Free e and pages before send.
323
* In case we block on congestion, we could otherwise run into
324
* some distributed deadlock, if the other side blocks on
325
* congestion as well, because our receiver blocks in
326
* drbd_pp_alloc due to pp_in_use > max_buffers. */
327
drbd_free_ee(mdev, e);
328
e = NULL;
329
inc_rs_pending(mdev);
330
ok = drbd_send_drequest_csum(mdev, sector, size,
331
digest, digest_size,
332
P_CSUM_RS_REQUEST);
333
kfree(digest);
334
} else {
335
dev_err(DEV, "kmalloc() of digest failed.\n");
336
ok = 0;
337
}
338
339
out:
340
if (e)
341
drbd_free_ee(mdev, e);
342
343
if (unlikely(!ok))
344
dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
345
return ok;
346
}
347
348
#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
349
350
static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
351
{
352
struct drbd_epoch_entry *e;
353
354
if (!get_ldev(mdev))
355
return -EIO;
356
357
if (drbd_rs_should_slow_down(mdev, sector))
358
goto defer;
359
360
/* GFP_TRY, because if there is no memory available right now, this may
361
* be rescheduled for later. It is "only" background resync, after all. */
362
e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
363
if (!e)
364
goto defer;
365
366
e->w.cb = w_e_send_csum;
367
spin_lock_irq(&mdev->req_lock);
368
list_add(&e->w.list, &mdev->read_ee);
369
spin_unlock_irq(&mdev->req_lock);
370
371
atomic_add(size >> 9, &mdev->rs_sect_ev);
372
if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
373
return 0;
374
375
/* If it failed because of ENOMEM, retry should help. If it failed
376
* because bio_add_page failed (probably broken lower level driver),
377
* retry may or may not help.
378
* If it does not, you may need to force disconnect. */
379
spin_lock_irq(&mdev->req_lock);
380
list_del(&e->w.list);
381
spin_unlock_irq(&mdev->req_lock);
382
383
drbd_free_ee(mdev, e);
384
defer:
385
put_ldev(mdev);
386
return -EAGAIN;
387
}
388
389
int w_resync_timer(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
390
{
391
switch (mdev->state.conn) {
392
case C_VERIFY_S:
393
w_make_ov_request(mdev, w, cancel);
394
break;
395
case C_SYNC_TARGET:
396
w_make_resync_request(mdev, w, cancel);
397
break;
398
}
399
400
return 1;
401
}
402
403
void resync_timer_fn(unsigned long data)
404
{
405
struct drbd_conf *mdev = (struct drbd_conf *) data;
406
407
if (list_empty(&mdev->resync_work.list))
408
drbd_queue_work(&mdev->data.work, &mdev->resync_work);
409
}
410
411
static void fifo_set(struct fifo_buffer *fb, int value)
412
{
413
int i;
414
415
for (i = 0; i < fb->size; i++)
416
fb->values[i] = value;
417
}
418
419
static int fifo_push(struct fifo_buffer *fb, int value)
420
{
421
int ov;
422
423
ov = fb->values[fb->head_index];
424
fb->values[fb->head_index++] = value;
425
426
if (fb->head_index >= fb->size)
427
fb->head_index = 0;
428
429
return ov;
430
}
431
432
static void fifo_add_val(struct fifo_buffer *fb, int value)
433
{
434
int i;
435
436
for (i = 0; i < fb->size; i++)
437
fb->values[i] += value;
438
}
439
440
static int drbd_rs_controller(struct drbd_conf *mdev)
441
{
442
unsigned int sect_in; /* Number of sectors that came in since the last turn */
443
unsigned int want; /* The number of sectors we want in the proxy */
444
int req_sect; /* Number of sectors to request in this turn */
445
int correction; /* Number of sectors more we need in the proxy*/
446
int cps; /* correction per invocation of drbd_rs_controller() */
447
int steps; /* Number of time steps to plan ahead */
448
int curr_corr;
449
int max_sect;
450
451
sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
452
mdev->rs_in_flight -= sect_in;
453
454
spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
455
456
steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
457
458
if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
459
want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
460
} else { /* normal path */
461
want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
462
sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
463
}
464
465
correction = want - mdev->rs_in_flight - mdev->rs_planed;
466
467
/* Plan ahead */
468
cps = correction / steps;
469
fifo_add_val(&mdev->rs_plan_s, cps);
470
mdev->rs_planed += cps * steps;
471
472
/* What we do in this step */
473
curr_corr = fifo_push(&mdev->rs_plan_s, 0);
474
spin_unlock(&mdev->peer_seq_lock);
475
mdev->rs_planed -= curr_corr;
476
477
req_sect = sect_in + curr_corr;
478
if (req_sect < 0)
479
req_sect = 0;
480
481
max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
482
if (req_sect > max_sect)
483
req_sect = max_sect;
484
485
/*
486
dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
487
sect_in, mdev->rs_in_flight, want, correction,
488
steps, cps, mdev->rs_planed, curr_corr, req_sect);
489
*/
490
491
return req_sect;
492
}
493
494
static int drbd_rs_number_requests(struct drbd_conf *mdev)
495
{
496
int number;
497
if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
498
number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
499
mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
500
} else {
501
mdev->c_sync_rate = mdev->sync_conf.rate;
502
number = SLEEP_TIME * mdev->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ);
503
}
504
505
/* ignore the amount of pending requests, the resync controller should
506
* throttle down to incoming reply rate soon enough anyways. */
507
return number;
508
}
509
510
static int w_make_resync_request(struct drbd_conf *mdev,
511
struct drbd_work *w, int cancel)
512
{
513
unsigned long bit;
514
sector_t sector;
515
const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
516
int max_bio_size;
517
int number, rollback_i, size;
518
int align, queued, sndbuf;
519
int i = 0;
520
521
if (unlikely(cancel))
522
return 1;
523
524
if (mdev->rs_total == 0) {
525
/* empty resync? */
526
drbd_resync_finished(mdev);
527
return 1;
528
}
529
530
if (!get_ldev(mdev)) {
531
/* Since we only need to access mdev->rsync a
532
get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
533
to continue resync with a broken disk makes no sense at
534
all */
535
dev_err(DEV, "Disk broke down during resync!\n");
536
return 1;
537
}
538
539
max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
540
number = drbd_rs_number_requests(mdev);
541
if (number == 0)
542
goto requeue;
543
544
for (i = 0; i < number; i++) {
545
/* Stop generating RS requests, when half of the send buffer is filled */
546
mutex_lock(&mdev->data.mutex);
547
if (mdev->data.socket) {
548
queued = mdev->data.socket->sk->sk_wmem_queued;
549
sndbuf = mdev->data.socket->sk->sk_sndbuf;
550
} else {
551
queued = 1;
552
sndbuf = 0;
553
}
554
mutex_unlock(&mdev->data.mutex);
555
if (queued > sndbuf / 2)
556
goto requeue;
557
558
next_sector:
559
size = BM_BLOCK_SIZE;
560
bit = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
561
562
if (bit == DRBD_END_OF_BITMAP) {
563
mdev->bm_resync_fo = drbd_bm_bits(mdev);
564
put_ldev(mdev);
565
return 1;
566
}
567
568
sector = BM_BIT_TO_SECT(bit);
569
570
if (drbd_rs_should_slow_down(mdev, sector) ||
571
drbd_try_rs_begin_io(mdev, sector)) {
572
mdev->bm_resync_fo = bit;
573
goto requeue;
574
}
575
mdev->bm_resync_fo = bit + 1;
576
577
if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
578
drbd_rs_complete_io(mdev, sector);
579
goto next_sector;
580
}
581
582
#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
583
/* try to find some adjacent bits.
584
* we stop if we have already the maximum req size.
585
*
586
* Additionally always align bigger requests, in order to
587
* be prepared for all stripe sizes of software RAIDs.
588
*/
589
align = 1;
590
rollback_i = i;
591
for (;;) {
592
if (size + BM_BLOCK_SIZE > max_bio_size)
593
break;
594
595
/* Be always aligned */
596
if (sector & ((1<<(align+3))-1))
597
break;
598
599
/* do not cross extent boundaries */
600
if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
601
break;
602
/* now, is it actually dirty, after all?
603
* caution, drbd_bm_test_bit is tri-state for some
604
* obscure reason; ( b == 0 ) would get the out-of-band
605
* only accidentally right because of the "oddly sized"
606
* adjustment below */
607
if (drbd_bm_test_bit(mdev, bit+1) != 1)
608
break;
609
bit++;
610
size += BM_BLOCK_SIZE;
611
if ((BM_BLOCK_SIZE << align) <= size)
612
align++;
613
i++;
614
}
615
/* if we merged some,
616
* reset the offset to start the next drbd_bm_find_next from */
617
if (size > BM_BLOCK_SIZE)
618
mdev->bm_resync_fo = bit + 1;
619
#endif
620
621
/* adjust very last sectors, in case we are oddly sized */
622
if (sector + (size>>9) > capacity)
623
size = (capacity-sector)<<9;
624
if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
625
switch (read_for_csum(mdev, sector, size)) {
626
case -EIO: /* Disk failure */
627
put_ldev(mdev);
628
return 0;
629
case -EAGAIN: /* allocation failed, or ldev busy */
630
drbd_rs_complete_io(mdev, sector);
631
mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
632
i = rollback_i;
633
goto requeue;
634
case 0:
635
/* everything ok */
636
break;
637
default:
638
BUG();
639
}
640
} else {
641
inc_rs_pending(mdev);
642
if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
643
sector, size, ID_SYNCER)) {
644
dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
645
dec_rs_pending(mdev);
646
put_ldev(mdev);
647
return 0;
648
}
649
}
650
}
651
652
if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
653
/* last syncer _request_ was sent,
654
* but the P_RS_DATA_REPLY not yet received. sync will end (and
655
* next sync group will resume), as soon as we receive the last
656
* resync data block, and the last bit is cleared.
657
* until then resync "work" is "inactive" ...
658
*/
659
put_ldev(mdev);
660
return 1;
661
}
662
663
requeue:
664
mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
665
mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
666
put_ldev(mdev);
667
return 1;
668
}
669
670
static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
671
{
672
int number, i, size;
673
sector_t sector;
674
const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
675
676
if (unlikely(cancel))
677
return 1;
678
679
number = drbd_rs_number_requests(mdev);
680
681
sector = mdev->ov_position;
682
for (i = 0; i < number; i++) {
683
if (sector >= capacity) {
684
return 1;
685
}
686
687
size = BM_BLOCK_SIZE;
688
689
if (drbd_rs_should_slow_down(mdev, sector) ||
690
drbd_try_rs_begin_io(mdev, sector)) {
691
mdev->ov_position = sector;
692
goto requeue;
693
}
694
695
if (sector + (size>>9) > capacity)
696
size = (capacity-sector)<<9;
697
698
inc_rs_pending(mdev);
699
if (!drbd_send_ov_request(mdev, sector, size)) {
700
dec_rs_pending(mdev);
701
return 0;
702
}
703
sector += BM_SECT_PER_BIT;
704
}
705
mdev->ov_position = sector;
706
707
requeue:
708
mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
709
mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
710
return 1;
711
}
712
713
714
void start_resync_timer_fn(unsigned long data)
715
{
716
struct drbd_conf *mdev = (struct drbd_conf *) data;
717
718
drbd_queue_work(&mdev->data.work, &mdev->start_resync_work);
719
}
720
721
int w_start_resync(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
722
{
723
if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
724
dev_warn(DEV, "w_start_resync later...\n");
725
mdev->start_resync_timer.expires = jiffies + HZ/10;
726
add_timer(&mdev->start_resync_timer);
727
return 1;
728
}
729
730
drbd_start_resync(mdev, C_SYNC_SOURCE);
731
clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags);
732
return 1;
733
}
734
735
int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
736
{
737
kfree(w);
738
ov_oos_print(mdev);
739
drbd_resync_finished(mdev);
740
741
return 1;
742
}
743
744
static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
745
{
746
kfree(w);
747
748
drbd_resync_finished(mdev);
749
750
return 1;
751
}
752
753
static void ping_peer(struct drbd_conf *mdev)
754
{
755
clear_bit(GOT_PING_ACK, &mdev->flags);
756
request_ping(mdev);
757
wait_event(mdev->misc_wait,
758
test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
759
}
760
761
int drbd_resync_finished(struct drbd_conf *mdev)
762
{
763
unsigned long db, dt, dbdt;
764
unsigned long n_oos;
765
union drbd_state os, ns;
766
struct drbd_work *w;
767
char *khelper_cmd = NULL;
768
int verify_done = 0;
769
770
/* Remove all elements from the resync LRU. Since future actions
771
* might set bits in the (main) bitmap, then the entries in the
772
* resync LRU would be wrong. */
773
if (drbd_rs_del_all(mdev)) {
774
/* In case this is not possible now, most probably because
775
* there are P_RS_DATA_REPLY Packets lingering on the worker's
776
* queue (or even the read operations for those packets
777
* is not finished by now). Retry in 100ms. */
778
779
schedule_timeout_interruptible(HZ / 10);
780
w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
781
if (w) {
782
w->cb = w_resync_finished;
783
drbd_queue_work(&mdev->data.work, w);
784
return 1;
785
}
786
dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
787
}
788
789
dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
790
if (dt <= 0)
791
dt = 1;
792
db = mdev->rs_total;
793
dbdt = Bit2KB(db/dt);
794
mdev->rs_paused /= HZ;
795
796
if (!get_ldev(mdev))
797
goto out;
798
799
ping_peer(mdev);
800
801
spin_lock_irq(&mdev->req_lock);
802
os = mdev->state;
803
804
verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
805
806
/* This protects us against multiple calls (that can happen in the presence
807
of application IO), and against connectivity loss just before we arrive here. */
808
if (os.conn <= C_CONNECTED)
809
goto out_unlock;
810
811
ns = os;
812
ns.conn = C_CONNECTED;
813
814
dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
815
verify_done ? "Online verify " : "Resync",
816
dt + mdev->rs_paused, mdev->rs_paused, dbdt);
817
818
n_oos = drbd_bm_total_weight(mdev);
819
820
if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
821
if (n_oos) {
822
dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
823
n_oos, Bit2KB(1));
824
khelper_cmd = "out-of-sync";
825
}
826
} else {
827
D_ASSERT((n_oos - mdev->rs_failed) == 0);
828
829
if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
830
khelper_cmd = "after-resync-target";
831
832
if (mdev->csums_tfm && mdev->rs_total) {
833
const unsigned long s = mdev->rs_same_csum;
834
const unsigned long t = mdev->rs_total;
835
const int ratio =
836
(t == 0) ? 0 :
837
(t < 100000) ? ((s*100)/t) : (s/(t/100));
838
dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
839
"transferred %luK total %luK\n",
840
ratio,
841
Bit2KB(mdev->rs_same_csum),
842
Bit2KB(mdev->rs_total - mdev->rs_same_csum),
843
Bit2KB(mdev->rs_total));
844
}
845
}
846
847
if (mdev->rs_failed) {
848
dev_info(DEV, " %lu failed blocks\n", mdev->rs_failed);
849
850
if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
851
ns.disk = D_INCONSISTENT;
852
ns.pdsk = D_UP_TO_DATE;
853
} else {
854
ns.disk = D_UP_TO_DATE;
855
ns.pdsk = D_INCONSISTENT;
856
}
857
} else {
858
ns.disk = D_UP_TO_DATE;
859
ns.pdsk = D_UP_TO_DATE;
860
861
if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
862
if (mdev->p_uuid) {
863
int i;
864
for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
865
_drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
866
drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
867
_drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
868
} else {
869
dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
870
}
871
}
872
873
if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
874
/* for verify runs, we don't update uuids here,
875
* so there would be nothing to report. */
876
drbd_uuid_set_bm(mdev, 0UL);
877
drbd_print_uuids(mdev, "updated UUIDs");
878
if (mdev->p_uuid) {
879
/* Now the two UUID sets are equal, update what we
880
* know of the peer. */
881
int i;
882
for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
883
mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
884
}
885
}
886
}
887
888
_drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
889
out_unlock:
890
spin_unlock_irq(&mdev->req_lock);
891
put_ldev(mdev);
892
out:
893
mdev->rs_total = 0;
894
mdev->rs_failed = 0;
895
mdev->rs_paused = 0;
896
if (verify_done)
897
mdev->ov_start_sector = 0;
898
899
drbd_md_sync(mdev);
900
901
if (khelper_cmd)
902
drbd_khelper(mdev, khelper_cmd);
903
904
return 1;
905
}
906
907
/* helper */
908
static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
909
{
910
if (drbd_ee_has_active_page(e)) {
911
/* This might happen if sendpage() has not finished */
912
int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
913
atomic_add(i, &mdev->pp_in_use_by_net);
914
atomic_sub(i, &mdev->pp_in_use);
915
spin_lock_irq(&mdev->req_lock);
916
list_add_tail(&e->w.list, &mdev->net_ee);
917
spin_unlock_irq(&mdev->req_lock);
918
wake_up(&drbd_pp_wait);
919
} else
920
drbd_free_ee(mdev, e);
921
}
922
923
/**
924
* w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
925
* @mdev: DRBD device.
926
* @w: work object.
927
* @cancel: The connection will be closed anyways
928
*/
929
int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
930
{
931
struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
932
int ok;
933
934
if (unlikely(cancel)) {
935
drbd_free_ee(mdev, e);
936
dec_unacked(mdev);
937
return 1;
938
}
939
940
if (likely((e->flags & EE_WAS_ERROR) == 0)) {
941
ok = drbd_send_block(mdev, P_DATA_REPLY, e);
942
} else {
943
if (__ratelimit(&drbd_ratelimit_state))
944
dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
945
(unsigned long long)e->sector);
946
947
ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
948
}
949
950
dec_unacked(mdev);
951
952
move_to_net_ee_or_free(mdev, e);
953
954
if (unlikely(!ok))
955
dev_err(DEV, "drbd_send_block() failed\n");
956
return ok;
957
}
958
959
/**
960
* w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
961
* @mdev: DRBD device.
962
* @w: work object.
963
* @cancel: The connection will be closed anyways
964
*/
965
int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
966
{
967
struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
968
int ok;
969
970
if (unlikely(cancel)) {
971
drbd_free_ee(mdev, e);
972
dec_unacked(mdev);
973
return 1;
974
}
975
976
if (get_ldev_if_state(mdev, D_FAILED)) {
977
drbd_rs_complete_io(mdev, e->sector);
978
put_ldev(mdev);
979
}
980
981
if (mdev->state.conn == C_AHEAD) {
982
ok = drbd_send_ack(mdev, P_RS_CANCEL, e);
983
} else if (likely((e->flags & EE_WAS_ERROR) == 0)) {
984
if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
985
inc_rs_pending(mdev);
986
ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
987
} else {
988
if (__ratelimit(&drbd_ratelimit_state))
989
dev_err(DEV, "Not sending RSDataReply, "
990
"partner DISKLESS!\n");
991
ok = 1;
992
}
993
} else {
994
if (__ratelimit(&drbd_ratelimit_state))
995
dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
996
(unsigned long long)e->sector);
997
998
ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
999
1000
/* update resync data with failure */
1001
drbd_rs_failed_io(mdev, e->sector, e->size);
1002
}
1003
1004
dec_unacked(mdev);
1005
1006
move_to_net_ee_or_free(mdev, e);
1007
1008
if (unlikely(!ok))
1009
dev_err(DEV, "drbd_send_block() failed\n");
1010
return ok;
1011
}
1012
1013
int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1014
{
1015
struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1016
struct digest_info *di;
1017
int digest_size;
1018
void *digest = NULL;
1019
int ok, eq = 0;
1020
1021
if (unlikely(cancel)) {
1022
drbd_free_ee(mdev, e);
1023
dec_unacked(mdev);
1024
return 1;
1025
}
1026
1027
if (get_ldev(mdev)) {
1028
drbd_rs_complete_io(mdev, e->sector);
1029
put_ldev(mdev);
1030
}
1031
1032
di = e->digest;
1033
1034
if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1035
/* quick hack to try to avoid a race against reconfiguration.
1036
* a real fix would be much more involved,
1037
* introducing more locking mechanisms */
1038
if (mdev->csums_tfm) {
1039
digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1040
D_ASSERT(digest_size == di->digest_size);
1041
digest = kmalloc(digest_size, GFP_NOIO);
1042
}
1043
if (digest) {
1044
drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1045
eq = !memcmp(digest, di->digest, digest_size);
1046
kfree(digest);
1047
}
1048
1049
if (eq) {
1050
drbd_set_in_sync(mdev, e->sector, e->size);
1051
/* rs_same_csums unit is BM_BLOCK_SIZE */
1052
mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1053
ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1054
} else {
1055
inc_rs_pending(mdev);
1056
e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1057
e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1058
kfree(di);
1059
ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1060
}
1061
} else {
1062
ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1063
if (__ratelimit(&drbd_ratelimit_state))
1064
dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1065
}
1066
1067
dec_unacked(mdev);
1068
move_to_net_ee_or_free(mdev, e);
1069
1070
if (unlikely(!ok))
1071
dev_err(DEV, "drbd_send_block/ack() failed\n");
1072
return ok;
1073
}
1074
1075
/* TODO merge common code with w_e_send_csum */
1076
int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1077
{
1078
struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1079
sector_t sector = e->sector;
1080
unsigned int size = e->size;
1081
int digest_size;
1082
void *digest;
1083
int ok = 1;
1084
1085
if (unlikely(cancel))
1086
goto out;
1087
1088
digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1089
digest = kmalloc(digest_size, GFP_NOIO);
1090
if (!digest) {
1091
ok = 0; /* terminate the connection in case the allocation failed */
1092
goto out;
1093
}
1094
1095
if (likely(!(e->flags & EE_WAS_ERROR)))
1096
drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1097
else
1098
memset(digest, 0, digest_size);
1099
1100
/* Free e and pages before send.
1101
* In case we block on congestion, we could otherwise run into
1102
* some distributed deadlock, if the other side blocks on
1103
* congestion as well, because our receiver blocks in
1104
* drbd_pp_alloc due to pp_in_use > max_buffers. */
1105
drbd_free_ee(mdev, e);
1106
e = NULL;
1107
inc_rs_pending(mdev);
1108
ok = drbd_send_drequest_csum(mdev, sector, size,
1109
digest, digest_size,
1110
P_OV_REPLY);
1111
if (!ok)
1112
dec_rs_pending(mdev);
1113
kfree(digest);
1114
1115
out:
1116
if (e)
1117
drbd_free_ee(mdev, e);
1118
dec_unacked(mdev);
1119
return ok;
1120
}
1121
1122
void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1123
{
1124
if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1125
mdev->ov_last_oos_size += size>>9;
1126
} else {
1127
mdev->ov_last_oos_start = sector;
1128
mdev->ov_last_oos_size = size>>9;
1129
}
1130
drbd_set_out_of_sync(mdev, sector, size);
1131
}
1132
1133
int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1134
{
1135
struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1136
struct digest_info *di;
1137
void *digest;
1138
sector_t sector = e->sector;
1139
unsigned int size = e->size;
1140
int digest_size;
1141
int ok, eq = 0;
1142
1143
if (unlikely(cancel)) {
1144
drbd_free_ee(mdev, e);
1145
dec_unacked(mdev);
1146
return 1;
1147
}
1148
1149
/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1150
* the resync lru has been cleaned up already */
1151
if (get_ldev(mdev)) {
1152
drbd_rs_complete_io(mdev, e->sector);
1153
put_ldev(mdev);
1154
}
1155
1156
di = e->digest;
1157
1158
if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1159
digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1160
digest = kmalloc(digest_size, GFP_NOIO);
1161
if (digest) {
1162
drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1163
1164
D_ASSERT(digest_size == di->digest_size);
1165
eq = !memcmp(digest, di->digest, digest_size);
1166
kfree(digest);
1167
}
1168
}
1169
1170
/* Free e and pages before send.
1171
* In case we block on congestion, we could otherwise run into
1172
* some distributed deadlock, if the other side blocks on
1173
* congestion as well, because our receiver blocks in
1174
* drbd_pp_alloc due to pp_in_use > max_buffers. */
1175
drbd_free_ee(mdev, e);
1176
if (!eq)
1177
drbd_ov_oos_found(mdev, sector, size);
1178
else
1179
ov_oos_print(mdev);
1180
1181
ok = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1182
eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1183
1184
dec_unacked(mdev);
1185
1186
--mdev->ov_left;
1187
1188
/* let's advance progress step marks only for every other megabyte */
1189
if ((mdev->ov_left & 0x200) == 0x200)
1190
drbd_advance_rs_marks(mdev, mdev->ov_left);
1191
1192
if (mdev->ov_left == 0) {
1193
ov_oos_print(mdev);
1194
drbd_resync_finished(mdev);
1195
}
1196
1197
return ok;
1198
}
1199
1200
int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1201
{
1202
struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1203
complete(&b->done);
1204
return 1;
1205
}
1206
1207
int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1208
{
1209
struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1210
struct p_barrier *p = &mdev->data.sbuf.barrier;
1211
int ok = 1;
1212
1213
/* really avoid racing with tl_clear. w.cb may have been referenced
1214
* just before it was reassigned and re-queued, so double check that.
1215
* actually, this race was harmless, since we only try to send the
1216
* barrier packet here, and otherwise do nothing with the object.
1217
* but compare with the head of w_clear_epoch */
1218
spin_lock_irq(&mdev->req_lock);
1219
if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1220
cancel = 1;
1221
spin_unlock_irq(&mdev->req_lock);
1222
if (cancel)
1223
return 1;
1224
1225
if (!drbd_get_data_sock(mdev))
1226
return 0;
1227
p->barrier = b->br_number;
1228
/* inc_ap_pending was done where this was queued.
1229
* dec_ap_pending will be done in got_BarrierAck
1230
* or (on connection loss) in w_clear_epoch. */
1231
ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1232
(struct p_header80 *)p, sizeof(*p), 0);
1233
drbd_put_data_sock(mdev);
1234
1235
return ok;
1236
}
1237
1238
int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1239
{
1240
if (cancel)
1241
return 1;
1242
return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1243
}
1244
1245
int w_send_oos(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1246
{
1247
struct drbd_request *req = container_of(w, struct drbd_request, w);
1248
int ok;
1249
1250
if (unlikely(cancel)) {
1251
req_mod(req, send_canceled);
1252
return 1;
1253
}
1254
1255
ok = drbd_send_oos(mdev, req);
1256
req_mod(req, oos_handed_to_network);
1257
1258
return ok;
1259
}
1260
1261
/**
1262
* w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1263
* @mdev: DRBD device.
1264
* @w: work object.
1265
* @cancel: The connection will be closed anyways
1266
*/
1267
int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1268
{
1269
struct drbd_request *req = container_of(w, struct drbd_request, w);
1270
int ok;
1271
1272
if (unlikely(cancel)) {
1273
req_mod(req, send_canceled);
1274
return 1;
1275
}
1276
1277
ok = drbd_send_dblock(mdev, req);
1278
req_mod(req, ok ? handed_over_to_network : send_failed);
1279
1280
return ok;
1281
}
1282
1283
/**
1284
* w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1285
* @mdev: DRBD device.
1286
* @w: work object.
1287
* @cancel: The connection will be closed anyways
1288
*/
1289
int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1290
{
1291
struct drbd_request *req = container_of(w, struct drbd_request, w);
1292
int ok;
1293
1294
if (unlikely(cancel)) {
1295
req_mod(req, send_canceled);
1296
return 1;
1297
}
1298
1299
ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1300
(unsigned long)req);
1301
1302
if (!ok) {
1303
/* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1304
* so this is probably redundant */
1305
if (mdev->state.conn >= C_CONNECTED)
1306
drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1307
}
1308
req_mod(req, ok ? handed_over_to_network : send_failed);
1309
1310
return ok;
1311
}
1312
1313
int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1314
{
1315
struct drbd_request *req = container_of(w, struct drbd_request, w);
1316
1317
if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1318
drbd_al_begin_io(mdev, req->sector);
1319
/* Calling drbd_al_begin_io() out of the worker might deadlocks
1320
theoretically. Practically it can not deadlock, since this is
1321
only used when unfreezing IOs. All the extents of the requests
1322
that made it into the TL are already active */
1323
1324
drbd_req_make_private_bio(req, req->master_bio);
1325
req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1326
generic_make_request(req->private_bio);
1327
1328
return 1;
1329
}
1330
1331
static int _drbd_may_sync_now(struct drbd_conf *mdev)
1332
{
1333
struct drbd_conf *odev = mdev;
1334
1335
while (1) {
1336
if (odev->sync_conf.after == -1)
1337
return 1;
1338
odev = minor_to_mdev(odev->sync_conf.after);
1339
ERR_IF(!odev) return 1;
1340
if ((odev->state.conn >= C_SYNC_SOURCE &&
1341
odev->state.conn <= C_PAUSED_SYNC_T) ||
1342
odev->state.aftr_isp || odev->state.peer_isp ||
1343
odev->state.user_isp)
1344
return 0;
1345
}
1346
}
1347
1348
/**
1349
* _drbd_pause_after() - Pause resync on all devices that may not resync now
1350
* @mdev: DRBD device.
1351
*
1352
* Called from process context only (admin command and after_state_ch).
1353
*/
1354
static int _drbd_pause_after(struct drbd_conf *mdev)
1355
{
1356
struct drbd_conf *odev;
1357
int i, rv = 0;
1358
1359
for (i = 0; i < minor_count; i++) {
1360
odev = minor_to_mdev(i);
1361
if (!odev)
1362
continue;
1363
if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1364
continue;
1365
if (!_drbd_may_sync_now(odev))
1366
rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1367
!= SS_NOTHING_TO_DO);
1368
}
1369
1370
return rv;
1371
}
1372
1373
/**
1374
* _drbd_resume_next() - Resume resync on all devices that may resync now
1375
* @mdev: DRBD device.
1376
*
1377
* Called from process context only (admin command and worker).
1378
*/
1379
static int _drbd_resume_next(struct drbd_conf *mdev)
1380
{
1381
struct drbd_conf *odev;
1382
int i, rv = 0;
1383
1384
for (i = 0; i < minor_count; i++) {
1385
odev = minor_to_mdev(i);
1386
if (!odev)
1387
continue;
1388
if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1389
continue;
1390
if (odev->state.aftr_isp) {
1391
if (_drbd_may_sync_now(odev))
1392
rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1393
CS_HARD, NULL)
1394
!= SS_NOTHING_TO_DO) ;
1395
}
1396
}
1397
return rv;
1398
}
1399
1400
void resume_next_sg(struct drbd_conf *mdev)
1401
{
1402
write_lock_irq(&global_state_lock);
1403
_drbd_resume_next(mdev);
1404
write_unlock_irq(&global_state_lock);
1405
}
1406
1407
void suspend_other_sg(struct drbd_conf *mdev)
1408
{
1409
write_lock_irq(&global_state_lock);
1410
_drbd_pause_after(mdev);
1411
write_unlock_irq(&global_state_lock);
1412
}
1413
1414
static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1415
{
1416
struct drbd_conf *odev;
1417
1418
if (o_minor == -1)
1419
return NO_ERROR;
1420
if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1421
return ERR_SYNC_AFTER;
1422
1423
/* check for loops */
1424
odev = minor_to_mdev(o_minor);
1425
while (1) {
1426
if (odev == mdev)
1427
return ERR_SYNC_AFTER_CYCLE;
1428
1429
/* dependency chain ends here, no cycles. */
1430
if (odev->sync_conf.after == -1)
1431
return NO_ERROR;
1432
1433
/* follow the dependency chain */
1434
odev = minor_to_mdev(odev->sync_conf.after);
1435
}
1436
}
1437
1438
int drbd_alter_sa(struct drbd_conf *mdev, int na)
1439
{
1440
int changes;
1441
int retcode;
1442
1443
write_lock_irq(&global_state_lock);
1444
retcode = sync_after_error(mdev, na);
1445
if (retcode == NO_ERROR) {
1446
mdev->sync_conf.after = na;
1447
do {
1448
changes = _drbd_pause_after(mdev);
1449
changes |= _drbd_resume_next(mdev);
1450
} while (changes);
1451
}
1452
write_unlock_irq(&global_state_lock);
1453
return retcode;
1454
}
1455
1456
void drbd_rs_controller_reset(struct drbd_conf *mdev)
1457
{
1458
atomic_set(&mdev->rs_sect_in, 0);
1459
atomic_set(&mdev->rs_sect_ev, 0);
1460
mdev->rs_in_flight = 0;
1461
mdev->rs_planed = 0;
1462
spin_lock(&mdev->peer_seq_lock);
1463
fifo_set(&mdev->rs_plan_s, 0);
1464
spin_unlock(&mdev->peer_seq_lock);
1465
}
1466
1467
/**
1468
* drbd_start_resync() - Start the resync process
1469
* @mdev: DRBD device.
1470
* @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1471
*
1472
* This function might bring you directly into one of the
1473
* C_PAUSED_SYNC_* states.
1474
*/
1475
void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1476
{
1477
union drbd_state ns;
1478
int r;
1479
1480
if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1481
dev_err(DEV, "Resync already running!\n");
1482
return;
1483
}
1484
1485
if (mdev->state.conn < C_AHEAD) {
1486
/* In case a previous resync run was aborted by an IO error/detach on the peer. */
1487
drbd_rs_cancel_all(mdev);
1488
/* This should be done when we abort the resync. We definitely do not
1489
want to have this for connections going back and forth between
1490
Ahead/Behind and SyncSource/SyncTarget */
1491
}
1492
1493
if (side == C_SYNC_TARGET) {
1494
/* Since application IO was locked out during C_WF_BITMAP_T and
1495
C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1496
we check that we might make the data inconsistent. */
1497
r = drbd_khelper(mdev, "before-resync-target");
1498
r = (r >> 8) & 0xff;
1499
if (r > 0) {
1500
dev_info(DEV, "before-resync-target handler returned %d, "
1501
"dropping connection.\n", r);
1502
drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1503
return;
1504
}
1505
} else /* C_SYNC_SOURCE */ {
1506
r = drbd_khelper(mdev, "before-resync-source");
1507
r = (r >> 8) & 0xff;
1508
if (r > 0) {
1509
if (r == 3) {
1510
dev_info(DEV, "before-resync-source handler returned %d, "
1511
"ignoring. Old userland tools?", r);
1512
} else {
1513
dev_info(DEV, "before-resync-source handler returned %d, "
1514
"dropping connection.\n", r);
1515
drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1516
return;
1517
}
1518
}
1519
}
1520
1521
drbd_state_lock(mdev);
1522
1523
if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1524
drbd_state_unlock(mdev);
1525
return;
1526
}
1527
1528
write_lock_irq(&global_state_lock);
1529
ns = mdev->state;
1530
1531
ns.aftr_isp = !_drbd_may_sync_now(mdev);
1532
1533
ns.conn = side;
1534
1535
if (side == C_SYNC_TARGET)
1536
ns.disk = D_INCONSISTENT;
1537
else /* side == C_SYNC_SOURCE */
1538
ns.pdsk = D_INCONSISTENT;
1539
1540
r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1541
ns = mdev->state;
1542
1543
if (ns.conn < C_CONNECTED)
1544
r = SS_UNKNOWN_ERROR;
1545
1546
if (r == SS_SUCCESS) {
1547
unsigned long tw = drbd_bm_total_weight(mdev);
1548
unsigned long now = jiffies;
1549
int i;
1550
1551
mdev->rs_failed = 0;
1552
mdev->rs_paused = 0;
1553
mdev->rs_same_csum = 0;
1554
mdev->rs_last_events = 0;
1555
mdev->rs_last_sect_ev = 0;
1556
mdev->rs_total = tw;
1557
mdev->rs_start = now;
1558
for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1559
mdev->rs_mark_left[i] = tw;
1560
mdev->rs_mark_time[i] = now;
1561
}
1562
_drbd_pause_after(mdev);
1563
}
1564
write_unlock_irq(&global_state_lock);
1565
1566
if (r == SS_SUCCESS) {
1567
dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1568
drbd_conn_str(ns.conn),
1569
(unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1570
(unsigned long) mdev->rs_total);
1571
if (side == C_SYNC_TARGET)
1572
mdev->bm_resync_fo = 0;
1573
1574
/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1575
* with w_send_oos, or the sync target will get confused as to
1576
* how much bits to resync. We cannot do that always, because for an
1577
* empty resync and protocol < 95, we need to do it here, as we call
1578
* drbd_resync_finished from here in that case.
1579
* We drbd_gen_and_send_sync_uuid here for protocol < 96,
1580
* and from after_state_ch otherwise. */
1581
if (side == C_SYNC_SOURCE && mdev->agreed_pro_version < 96)
1582
drbd_gen_and_send_sync_uuid(mdev);
1583
1584
if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
1585
/* This still has a race (about when exactly the peers
1586
* detect connection loss) that can lead to a full sync
1587
* on next handshake. In 8.3.9 we fixed this with explicit
1588
* resync-finished notifications, but the fix
1589
* introduces a protocol change. Sleeping for some
1590
* time longer than the ping interval + timeout on the
1591
* SyncSource, to give the SyncTarget the chance to
1592
* detect connection loss, then waiting for a ping
1593
* response (implicit in drbd_resync_finished) reduces
1594
* the race considerably, but does not solve it. */
1595
if (side == C_SYNC_SOURCE)
1596
schedule_timeout_interruptible(
1597
mdev->net_conf->ping_int * HZ +
1598
mdev->net_conf->ping_timeo*HZ/9);
1599
drbd_resync_finished(mdev);
1600
}
1601
1602
drbd_rs_controller_reset(mdev);
1603
/* ns.conn may already be != mdev->state.conn,
1604
* we may have been paused in between, or become paused until
1605
* the timer triggers.
1606
* No matter, that is handled in resync_timer_fn() */
1607
if (ns.conn == C_SYNC_TARGET)
1608
mod_timer(&mdev->resync_timer, jiffies);
1609
1610
drbd_md_sync(mdev);
1611
}
1612
put_ldev(mdev);
1613
drbd_state_unlock(mdev);
1614
}
1615
1616
int drbd_worker(struct drbd_thread *thi)
1617
{
1618
struct drbd_conf *mdev = thi->mdev;
1619
struct drbd_work *w = NULL;
1620
LIST_HEAD(work_list);
1621
int intr = 0, i;
1622
1623
sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1624
1625
while (get_t_state(thi) == Running) {
1626
drbd_thread_current_set_cpu(mdev);
1627
1628
if (down_trylock(&mdev->data.work.s)) {
1629
mutex_lock(&mdev->data.mutex);
1630
if (mdev->data.socket && !mdev->net_conf->no_cork)
1631
drbd_tcp_uncork(mdev->data.socket);
1632
mutex_unlock(&mdev->data.mutex);
1633
1634
intr = down_interruptible(&mdev->data.work.s);
1635
1636
mutex_lock(&mdev->data.mutex);
1637
if (mdev->data.socket && !mdev->net_conf->no_cork)
1638
drbd_tcp_cork(mdev->data.socket);
1639
mutex_unlock(&mdev->data.mutex);
1640
}
1641
1642
if (intr) {
1643
D_ASSERT(intr == -EINTR);
1644
flush_signals(current);
1645
ERR_IF (get_t_state(thi) == Running)
1646
continue;
1647
break;
1648
}
1649
1650
if (get_t_state(thi) != Running)
1651
break;
1652
/* With this break, we have done a down() but not consumed
1653
the entry from the list. The cleanup code takes care of
1654
this... */
1655
1656
w = NULL;
1657
spin_lock_irq(&mdev->data.work.q_lock);
1658
ERR_IF(list_empty(&mdev->data.work.q)) {
1659
/* something terribly wrong in our logic.
1660
* we were able to down() the semaphore,
1661
* but the list is empty... doh.
1662
*
1663
* what is the best thing to do now?
1664
* try again from scratch, restarting the receiver,
1665
* asender, whatnot? could break even more ugly,
1666
* e.g. when we are primary, but no good local data.
1667
*
1668
* I'll try to get away just starting over this loop.
1669
*/
1670
spin_unlock_irq(&mdev->data.work.q_lock);
1671
continue;
1672
}
1673
w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1674
list_del_init(&w->list);
1675
spin_unlock_irq(&mdev->data.work.q_lock);
1676
1677
if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1678
/* dev_warn(DEV, "worker: a callback failed! \n"); */
1679
if (mdev->state.conn >= C_CONNECTED)
1680
drbd_force_state(mdev,
1681
NS(conn, C_NETWORK_FAILURE));
1682
}
1683
}
1684
D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1685
D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1686
1687
spin_lock_irq(&mdev->data.work.q_lock);
1688
i = 0;
1689
while (!list_empty(&mdev->data.work.q)) {
1690
list_splice_init(&mdev->data.work.q, &work_list);
1691
spin_unlock_irq(&mdev->data.work.q_lock);
1692
1693
while (!list_empty(&work_list)) {
1694
w = list_entry(work_list.next, struct drbd_work, list);
1695
list_del_init(&w->list);
1696
w->cb(mdev, w, 1);
1697
i++; /* dead debugging code */
1698
}
1699
1700
spin_lock_irq(&mdev->data.work.q_lock);
1701
}
1702
sema_init(&mdev->data.work.s, 0);
1703
/* DANGEROUS race: if someone did queue his work within the spinlock,
1704
* but up() ed outside the spinlock, we could get an up() on the
1705
* semaphore without corresponding list entry.
1706
* So don't do that.
1707
*/
1708
spin_unlock_irq(&mdev->data.work.q_lock);
1709
1710
D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1711
/* _drbd_set_state only uses stop_nowait.
1712
* wait here for the Exiting receiver. */
1713
drbd_thread_stop(&mdev->receiver);
1714
drbd_mdev_cleanup(mdev);
1715
1716
dev_info(DEV, "worker terminated\n");
1717
1718
clear_bit(DEVICE_DYING, &mdev->flags);
1719
clear_bit(CONFIG_PENDING, &mdev->flags);
1720
wake_up(&mdev->state_wait);
1721
1722
return 0;
1723
}
1724
1725