Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/drivers/block/drbd/drbd_worker.c
26282 views
1
// SPDX-License-Identifier: GPL-2.0-only
2
/*
3
drbd_worker.c
4
5
This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6
7
Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
8
Copyright (C) 1999-2008, Philipp Reisner <[email protected]>.
9
Copyright (C) 2002-2008, Lars Ellenberg <[email protected]>.
10
11
12
*/
13
14
#include <linux/module.h>
15
#include <linux/drbd.h>
16
#include <linux/sched/signal.h>
17
#include <linux/wait.h>
18
#include <linux/mm.h>
19
#include <linux/memcontrol.h>
20
#include <linux/mm_inline.h>
21
#include <linux/slab.h>
22
#include <linux/random.h>
23
#include <linux/string.h>
24
#include <linux/scatterlist.h>
25
#include <linux/part_stat.h>
26
27
#include "drbd_int.h"
28
#include "drbd_protocol.h"
29
#include "drbd_req.h"
30
31
static int make_ov_request(struct drbd_peer_device *, int);
32
static int make_resync_request(struct drbd_peer_device *, int);
33
34
/* endio handlers:
35
* drbd_md_endio (defined here)
36
* drbd_request_endio (defined here)
37
* drbd_peer_request_endio (defined here)
38
* drbd_bm_endio (defined in drbd_bitmap.c)
39
*
40
* For all these callbacks, note the following:
41
* The callbacks will be called in irq context by the IDE drivers,
42
* and in Softirqs/Tasklets/BH context by the SCSI drivers.
43
* Try to get the locking right :)
44
*
45
*/
46
47
/* used for synchronous meta data and bitmap IO
48
* submitted by drbd_md_sync_page_io()
49
*/
50
void drbd_md_endio(struct bio *bio)
51
{
52
struct drbd_device *device;
53
54
device = bio->bi_private;
55
device->md_io.error = blk_status_to_errno(bio->bi_status);
56
57
/* special case: drbd_md_read() during drbd_adm_attach() */
58
if (device->ldev)
59
put_ldev(device);
60
bio_put(bio);
61
62
/* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
63
* to timeout on the lower level device, and eventually detach from it.
64
* If this io completion runs after that timeout expired, this
65
* drbd_md_put_buffer() may allow us to finally try and re-attach.
66
* During normal operation, this only puts that extra reference
67
* down to 1 again.
68
* Make sure we first drop the reference, and only then signal
69
* completion, or we may (in drbd_al_read_log()) cycle so fast into the
70
* next drbd_md_sync_page_io(), that we trigger the
71
* ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
72
*/
73
drbd_md_put_buffer(device);
74
device->md_io.done = 1;
75
wake_up(&device->misc_wait);
76
}
77
78
/* reads on behalf of the partner,
79
* "submitted" by the receiver
80
*/
81
static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
82
{
83
unsigned long flags = 0;
84
struct drbd_peer_device *peer_device = peer_req->peer_device;
85
struct drbd_device *device = peer_device->device;
86
87
spin_lock_irqsave(&device->resource->req_lock, flags);
88
device->read_cnt += peer_req->i.size >> 9;
89
list_del(&peer_req->w.list);
90
if (list_empty(&device->read_ee))
91
wake_up(&device->ee_wait);
92
if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
93
__drbd_chk_io_error(device, DRBD_READ_ERROR);
94
spin_unlock_irqrestore(&device->resource->req_lock, flags);
95
96
drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
97
put_ldev(device);
98
}
99
100
/* writes on behalf of the partner, or resync writes,
101
* "submitted" by the receiver, final stage. */
102
void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
103
{
104
unsigned long flags = 0;
105
struct drbd_peer_device *peer_device = peer_req->peer_device;
106
struct drbd_device *device = peer_device->device;
107
struct drbd_connection *connection = peer_device->connection;
108
struct drbd_interval i;
109
int do_wake;
110
u64 block_id;
111
int do_al_complete_io;
112
113
/* after we moved peer_req to done_ee,
114
* we may no longer access it,
115
* it may be freed/reused already!
116
* (as soon as we release the req_lock) */
117
i = peer_req->i;
118
do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
119
block_id = peer_req->block_id;
120
peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
121
122
if (peer_req->flags & EE_WAS_ERROR) {
123
/* In protocol != C, we usually do not send write acks.
124
* In case of a write error, send the neg ack anyways. */
125
if (!__test_and_set_bit(__EE_SEND_WRITE_ACK, &peer_req->flags))
126
inc_unacked(device);
127
drbd_set_out_of_sync(peer_device, peer_req->i.sector, peer_req->i.size);
128
}
129
130
spin_lock_irqsave(&device->resource->req_lock, flags);
131
device->writ_cnt += peer_req->i.size >> 9;
132
list_move_tail(&peer_req->w.list, &device->done_ee);
133
134
/*
135
* Do not remove from the write_requests tree here: we did not send the
136
* Ack yet and did not wake possibly waiting conflicting requests.
137
* Removed from the tree from "drbd_process_done_ee" within the
138
* appropriate dw.cb (e_end_block/e_end_resync_block) or from
139
* _drbd_clear_done_ee.
140
*/
141
142
do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
143
144
/* FIXME do we want to detach for failed REQ_OP_DISCARD?
145
* ((peer_req->flags & (EE_WAS_ERROR|EE_TRIM)) == EE_WAS_ERROR) */
146
if (peer_req->flags & EE_WAS_ERROR)
147
__drbd_chk_io_error(device, DRBD_WRITE_ERROR);
148
149
if (connection->cstate >= C_WF_REPORT_PARAMS) {
150
kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
151
if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
152
kref_put(&device->kref, drbd_destroy_device);
153
}
154
spin_unlock_irqrestore(&device->resource->req_lock, flags);
155
156
if (block_id == ID_SYNCER)
157
drbd_rs_complete_io(device, i.sector);
158
159
if (do_wake)
160
wake_up(&device->ee_wait);
161
162
if (do_al_complete_io)
163
drbd_al_complete_io(device, &i);
164
165
put_ldev(device);
166
}
167
168
/* writes on behalf of the partner, or resync writes,
169
* "submitted" by the receiver.
170
*/
171
void drbd_peer_request_endio(struct bio *bio)
172
{
173
struct drbd_peer_request *peer_req = bio->bi_private;
174
struct drbd_device *device = peer_req->peer_device->device;
175
bool is_write = bio_data_dir(bio) == WRITE;
176
bool is_discard = bio_op(bio) == REQ_OP_WRITE_ZEROES ||
177
bio_op(bio) == REQ_OP_DISCARD;
178
179
if (bio->bi_status && drbd_ratelimit())
180
drbd_warn(device, "%s: error=%d s=%llus\n",
181
is_write ? (is_discard ? "discard" : "write")
182
: "read", bio->bi_status,
183
(unsigned long long)peer_req->i.sector);
184
185
if (bio->bi_status)
186
set_bit(__EE_WAS_ERROR, &peer_req->flags);
187
188
bio_put(bio); /* no need for the bio anymore */
189
if (atomic_dec_and_test(&peer_req->pending_bios)) {
190
if (is_write)
191
drbd_endio_write_sec_final(peer_req);
192
else
193
drbd_endio_read_sec_final(peer_req);
194
}
195
}
196
197
static void
198
drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
199
{
200
panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
201
device->minor, device->resource->name, device->vnr);
202
}
203
204
/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
205
*/
206
void drbd_request_endio(struct bio *bio)
207
{
208
unsigned long flags;
209
struct drbd_request *req = bio->bi_private;
210
struct drbd_device *device = req->device;
211
struct bio_and_error m;
212
enum drbd_req_event what;
213
214
/* If this request was aborted locally before,
215
* but now was completed "successfully",
216
* chances are that this caused arbitrary data corruption.
217
*
218
* "aborting" requests, or force-detaching the disk, is intended for
219
* completely blocked/hung local backing devices which do no longer
220
* complete requests at all, not even do error completions. In this
221
* situation, usually a hard-reset and failover is the only way out.
222
*
223
* By "aborting", basically faking a local error-completion,
224
* we allow for a more graceful swichover by cleanly migrating services.
225
* Still the affected node has to be rebooted "soon".
226
*
227
* By completing these requests, we allow the upper layers to re-use
228
* the associated data pages.
229
*
230
* If later the local backing device "recovers", and now DMAs some data
231
* from disk into the original request pages, in the best case it will
232
* just put random data into unused pages; but typically it will corrupt
233
* meanwhile completely unrelated data, causing all sorts of damage.
234
*
235
* Which means delayed successful completion,
236
* especially for READ requests,
237
* is a reason to panic().
238
*
239
* We assume that a delayed *error* completion is OK,
240
* though we still will complain noisily about it.
241
*/
242
if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
243
if (drbd_ratelimit())
244
drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
245
246
if (!bio->bi_status)
247
drbd_panic_after_delayed_completion_of_aborted_request(device);
248
}
249
250
/* to avoid recursion in __req_mod */
251
if (unlikely(bio->bi_status)) {
252
switch (bio_op(bio)) {
253
case REQ_OP_WRITE_ZEROES:
254
case REQ_OP_DISCARD:
255
if (bio->bi_status == BLK_STS_NOTSUPP)
256
what = DISCARD_COMPLETED_NOTSUPP;
257
else
258
what = DISCARD_COMPLETED_WITH_ERROR;
259
break;
260
case REQ_OP_READ:
261
if (bio->bi_opf & REQ_RAHEAD)
262
what = READ_AHEAD_COMPLETED_WITH_ERROR;
263
else
264
what = READ_COMPLETED_WITH_ERROR;
265
break;
266
default:
267
what = WRITE_COMPLETED_WITH_ERROR;
268
break;
269
}
270
} else {
271
what = COMPLETED_OK;
272
}
273
274
req->private_bio = ERR_PTR(blk_status_to_errno(bio->bi_status));
275
bio_put(bio);
276
277
/* not req_mod(), we need irqsave here! */
278
spin_lock_irqsave(&device->resource->req_lock, flags);
279
__req_mod(req, what, NULL, &m);
280
spin_unlock_irqrestore(&device->resource->req_lock, flags);
281
put_ldev(device);
282
283
if (m.bio)
284
complete_master_bio(device, &m);
285
}
286
287
void drbd_csum_ee(struct crypto_shash *tfm, struct drbd_peer_request *peer_req, void *digest)
288
{
289
SHASH_DESC_ON_STACK(desc, tfm);
290
struct page *page = peer_req->pages;
291
struct page *tmp;
292
unsigned len;
293
void *src;
294
295
desc->tfm = tfm;
296
297
crypto_shash_init(desc);
298
299
src = kmap_atomic(page);
300
while ((tmp = page_chain_next(page))) {
301
/* all but the last page will be fully used */
302
crypto_shash_update(desc, src, PAGE_SIZE);
303
kunmap_atomic(src);
304
page = tmp;
305
src = kmap_atomic(page);
306
}
307
/* and now the last, possibly only partially used page */
308
len = peer_req->i.size & (PAGE_SIZE - 1);
309
crypto_shash_update(desc, src, len ?: PAGE_SIZE);
310
kunmap_atomic(src);
311
312
crypto_shash_final(desc, digest);
313
shash_desc_zero(desc);
314
}
315
316
void drbd_csum_bio(struct crypto_shash *tfm, struct bio *bio, void *digest)
317
{
318
SHASH_DESC_ON_STACK(desc, tfm);
319
struct bio_vec bvec;
320
struct bvec_iter iter;
321
322
desc->tfm = tfm;
323
324
crypto_shash_init(desc);
325
326
bio_for_each_segment(bvec, bio, iter) {
327
u8 *src;
328
329
src = bvec_kmap_local(&bvec);
330
crypto_shash_update(desc, src, bvec.bv_len);
331
kunmap_local(src);
332
}
333
crypto_shash_final(desc, digest);
334
shash_desc_zero(desc);
335
}
336
337
/* MAYBE merge common code with w_e_end_ov_req */
338
static int w_e_send_csum(struct drbd_work *w, int cancel)
339
{
340
struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
341
struct drbd_peer_device *peer_device = peer_req->peer_device;
342
struct drbd_device *device = peer_device->device;
343
int digest_size;
344
void *digest;
345
int err = 0;
346
347
if (unlikely(cancel))
348
goto out;
349
350
if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
351
goto out;
352
353
digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
354
digest = kmalloc(digest_size, GFP_NOIO);
355
if (digest) {
356
sector_t sector = peer_req->i.sector;
357
unsigned int size = peer_req->i.size;
358
drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
359
/* Free peer_req and pages before send.
360
* In case we block on congestion, we could otherwise run into
361
* some distributed deadlock, if the other side blocks on
362
* congestion as well, because our receiver blocks in
363
* drbd_alloc_pages due to pp_in_use > max_buffers. */
364
drbd_free_peer_req(device, peer_req);
365
peer_req = NULL;
366
inc_rs_pending(peer_device);
367
err = drbd_send_drequest_csum(peer_device, sector, size,
368
digest, digest_size,
369
P_CSUM_RS_REQUEST);
370
kfree(digest);
371
} else {
372
drbd_err(device, "kmalloc() of digest failed.\n");
373
err = -ENOMEM;
374
}
375
376
out:
377
if (peer_req)
378
drbd_free_peer_req(device, peer_req);
379
380
if (unlikely(err))
381
drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
382
return err;
383
}
384
385
#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
386
387
static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
388
{
389
struct drbd_device *device = peer_device->device;
390
struct drbd_peer_request *peer_req;
391
392
if (!get_ldev(device))
393
return -EIO;
394
395
/* GFP_TRY, because if there is no memory available right now, this may
396
* be rescheduled for later. It is "only" background resync, after all. */
397
peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
398
size, size, GFP_TRY);
399
if (!peer_req)
400
goto defer;
401
402
peer_req->w.cb = w_e_send_csum;
403
peer_req->opf = REQ_OP_READ;
404
spin_lock_irq(&device->resource->req_lock);
405
list_add_tail(&peer_req->w.list, &device->read_ee);
406
spin_unlock_irq(&device->resource->req_lock);
407
408
atomic_add(size >> 9, &device->rs_sect_ev);
409
if (drbd_submit_peer_request(peer_req) == 0)
410
return 0;
411
412
/* If it failed because of ENOMEM, retry should help. If it failed
413
* because bio_add_page failed (probably broken lower level driver),
414
* retry may or may not help.
415
* If it does not, you may need to force disconnect. */
416
spin_lock_irq(&device->resource->req_lock);
417
list_del(&peer_req->w.list);
418
spin_unlock_irq(&device->resource->req_lock);
419
420
drbd_free_peer_req(device, peer_req);
421
defer:
422
put_ldev(device);
423
return -EAGAIN;
424
}
425
426
int w_resync_timer(struct drbd_work *w, int cancel)
427
{
428
struct drbd_device *device =
429
container_of(w, struct drbd_device, resync_work);
430
431
switch (device->state.conn) {
432
case C_VERIFY_S:
433
make_ov_request(first_peer_device(device), cancel);
434
break;
435
case C_SYNC_TARGET:
436
make_resync_request(first_peer_device(device), cancel);
437
break;
438
}
439
440
return 0;
441
}
442
443
void resync_timer_fn(struct timer_list *t)
444
{
445
struct drbd_device *device = timer_container_of(device, t,
446
resync_timer);
447
448
drbd_queue_work_if_unqueued(
449
&first_peer_device(device)->connection->sender_work,
450
&device->resync_work);
451
}
452
453
static void fifo_set(struct fifo_buffer *fb, int value)
454
{
455
int i;
456
457
for (i = 0; i < fb->size; i++)
458
fb->values[i] = value;
459
}
460
461
static int fifo_push(struct fifo_buffer *fb, int value)
462
{
463
int ov;
464
465
ov = fb->values[fb->head_index];
466
fb->values[fb->head_index++] = value;
467
468
if (fb->head_index >= fb->size)
469
fb->head_index = 0;
470
471
return ov;
472
}
473
474
static void fifo_add_val(struct fifo_buffer *fb, int value)
475
{
476
int i;
477
478
for (i = 0; i < fb->size; i++)
479
fb->values[i] += value;
480
}
481
482
struct fifo_buffer *fifo_alloc(unsigned int fifo_size)
483
{
484
struct fifo_buffer *fb;
485
486
fb = kzalloc(struct_size(fb, values, fifo_size), GFP_NOIO);
487
if (!fb)
488
return NULL;
489
490
fb->head_index = 0;
491
fb->size = fifo_size;
492
fb->total = 0;
493
494
return fb;
495
}
496
497
static int drbd_rs_controller(struct drbd_peer_device *peer_device, unsigned int sect_in)
498
{
499
struct drbd_device *device = peer_device->device;
500
struct disk_conf *dc;
501
unsigned int want; /* The number of sectors we want in-flight */
502
int req_sect; /* Number of sectors to request in this turn */
503
int correction; /* Number of sectors more we need in-flight */
504
int cps; /* correction per invocation of drbd_rs_controller() */
505
int steps; /* Number of time steps to plan ahead */
506
int curr_corr;
507
int max_sect;
508
struct fifo_buffer *plan;
509
510
dc = rcu_dereference(device->ldev->disk_conf);
511
plan = rcu_dereference(device->rs_plan_s);
512
513
steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
514
515
if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
516
want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
517
} else { /* normal path */
518
want = dc->c_fill_target ? dc->c_fill_target :
519
sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
520
}
521
522
correction = want - device->rs_in_flight - plan->total;
523
524
/* Plan ahead */
525
cps = correction / steps;
526
fifo_add_val(plan, cps);
527
plan->total += cps * steps;
528
529
/* What we do in this step */
530
curr_corr = fifo_push(plan, 0);
531
plan->total -= curr_corr;
532
533
req_sect = sect_in + curr_corr;
534
if (req_sect < 0)
535
req_sect = 0;
536
537
max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
538
if (req_sect > max_sect)
539
req_sect = max_sect;
540
541
/*
542
drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
543
sect_in, device->rs_in_flight, want, correction,
544
steps, cps, device->rs_planed, curr_corr, req_sect);
545
*/
546
547
return req_sect;
548
}
549
550
static int drbd_rs_number_requests(struct drbd_peer_device *peer_device)
551
{
552
struct drbd_device *device = peer_device->device;
553
unsigned int sect_in; /* Number of sectors that came in since the last turn */
554
int number, mxb;
555
556
sect_in = atomic_xchg(&device->rs_sect_in, 0);
557
device->rs_in_flight -= sect_in;
558
559
rcu_read_lock();
560
mxb = drbd_get_max_buffers(device) / 2;
561
if (rcu_dereference(device->rs_plan_s)->size) {
562
number = drbd_rs_controller(peer_device, sect_in) >> (BM_BLOCK_SHIFT - 9);
563
device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
564
} else {
565
device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
566
number = SLEEP_TIME * device->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ);
567
}
568
rcu_read_unlock();
569
570
/* Don't have more than "max-buffers"/2 in-flight.
571
* Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
572
* potentially causing a distributed deadlock on congestion during
573
* online-verify or (checksum-based) resync, if max-buffers,
574
* socket buffer sizes and resync rate settings are mis-configured. */
575
576
/* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
577
* mxb (as used here, and in drbd_alloc_pages on the peer) is
578
* "number of pages" (typically also 4k),
579
* but "rs_in_flight" is in "sectors" (512 Byte). */
580
if (mxb - device->rs_in_flight/8 < number)
581
number = mxb - device->rs_in_flight/8;
582
583
return number;
584
}
585
586
static int make_resync_request(struct drbd_peer_device *const peer_device, int cancel)
587
{
588
struct drbd_device *const device = peer_device->device;
589
struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
590
unsigned long bit;
591
sector_t sector;
592
const sector_t capacity = get_capacity(device->vdisk);
593
int max_bio_size;
594
int number, rollback_i, size;
595
int align, requeue = 0;
596
int i = 0;
597
int discard_granularity = 0;
598
599
if (unlikely(cancel))
600
return 0;
601
602
if (device->rs_total == 0) {
603
/* empty resync? */
604
drbd_resync_finished(peer_device);
605
return 0;
606
}
607
608
if (!get_ldev(device)) {
609
/* Since we only need to access device->rsync a
610
get_ldev_if_state(device,D_FAILED) would be sufficient, but
611
to continue resync with a broken disk makes no sense at
612
all */
613
drbd_err(device, "Disk broke down during resync!\n");
614
return 0;
615
}
616
617
if (connection->agreed_features & DRBD_FF_THIN_RESYNC) {
618
rcu_read_lock();
619
discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity;
620
rcu_read_unlock();
621
}
622
623
max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
624
number = drbd_rs_number_requests(peer_device);
625
if (number <= 0)
626
goto requeue;
627
628
for (i = 0; i < number; i++) {
629
/* Stop generating RS requests when half of the send buffer is filled,
630
* but notify TCP that we'd like to have more space. */
631
mutex_lock(&connection->data.mutex);
632
if (connection->data.socket) {
633
struct sock *sk = connection->data.socket->sk;
634
int queued = sk->sk_wmem_queued;
635
int sndbuf = sk->sk_sndbuf;
636
if (queued > sndbuf / 2) {
637
requeue = 1;
638
if (sk->sk_socket)
639
set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
640
}
641
} else
642
requeue = 1;
643
mutex_unlock(&connection->data.mutex);
644
if (requeue)
645
goto requeue;
646
647
next_sector:
648
size = BM_BLOCK_SIZE;
649
bit = drbd_bm_find_next(device, device->bm_resync_fo);
650
651
if (bit == DRBD_END_OF_BITMAP) {
652
device->bm_resync_fo = drbd_bm_bits(device);
653
put_ldev(device);
654
return 0;
655
}
656
657
sector = BM_BIT_TO_SECT(bit);
658
659
if (drbd_try_rs_begin_io(peer_device, sector)) {
660
device->bm_resync_fo = bit;
661
goto requeue;
662
}
663
device->bm_resync_fo = bit + 1;
664
665
if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
666
drbd_rs_complete_io(device, sector);
667
goto next_sector;
668
}
669
670
#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
671
/* try to find some adjacent bits.
672
* we stop if we have already the maximum req size.
673
*
674
* Additionally always align bigger requests, in order to
675
* be prepared for all stripe sizes of software RAIDs.
676
*/
677
align = 1;
678
rollback_i = i;
679
while (i < number) {
680
if (size + BM_BLOCK_SIZE > max_bio_size)
681
break;
682
683
/* Be always aligned */
684
if (sector & ((1<<(align+3))-1))
685
break;
686
687
if (discard_granularity && size == discard_granularity)
688
break;
689
690
/* do not cross extent boundaries */
691
if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
692
break;
693
/* now, is it actually dirty, after all?
694
* caution, drbd_bm_test_bit is tri-state for some
695
* obscure reason; ( b == 0 ) would get the out-of-band
696
* only accidentally right because of the "oddly sized"
697
* adjustment below */
698
if (drbd_bm_test_bit(device, bit+1) != 1)
699
break;
700
bit++;
701
size += BM_BLOCK_SIZE;
702
if ((BM_BLOCK_SIZE << align) <= size)
703
align++;
704
i++;
705
}
706
/* if we merged some,
707
* reset the offset to start the next drbd_bm_find_next from */
708
if (size > BM_BLOCK_SIZE)
709
device->bm_resync_fo = bit + 1;
710
#endif
711
712
/* adjust very last sectors, in case we are oddly sized */
713
if (sector + (size>>9) > capacity)
714
size = (capacity-sector)<<9;
715
716
if (device->use_csums) {
717
switch (read_for_csum(peer_device, sector, size)) {
718
case -EIO: /* Disk failure */
719
put_ldev(device);
720
return -EIO;
721
case -EAGAIN: /* allocation failed, or ldev busy */
722
drbd_rs_complete_io(device, sector);
723
device->bm_resync_fo = BM_SECT_TO_BIT(sector);
724
i = rollback_i;
725
goto requeue;
726
case 0:
727
/* everything ok */
728
break;
729
default:
730
BUG();
731
}
732
} else {
733
int err;
734
735
inc_rs_pending(peer_device);
736
err = drbd_send_drequest(peer_device,
737
size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST,
738
sector, size, ID_SYNCER);
739
if (err) {
740
drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
741
dec_rs_pending(peer_device);
742
put_ldev(device);
743
return err;
744
}
745
}
746
}
747
748
if (device->bm_resync_fo >= drbd_bm_bits(device)) {
749
/* last syncer _request_ was sent,
750
* but the P_RS_DATA_REPLY not yet received. sync will end (and
751
* next sync group will resume), as soon as we receive the last
752
* resync data block, and the last bit is cleared.
753
* until then resync "work" is "inactive" ...
754
*/
755
put_ldev(device);
756
return 0;
757
}
758
759
requeue:
760
device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
761
mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
762
put_ldev(device);
763
return 0;
764
}
765
766
static int make_ov_request(struct drbd_peer_device *peer_device, int cancel)
767
{
768
struct drbd_device *device = peer_device->device;
769
int number, i, size;
770
sector_t sector;
771
const sector_t capacity = get_capacity(device->vdisk);
772
bool stop_sector_reached = false;
773
774
if (unlikely(cancel))
775
return 1;
776
777
number = drbd_rs_number_requests(peer_device);
778
779
sector = device->ov_position;
780
for (i = 0; i < number; i++) {
781
if (sector >= capacity)
782
return 1;
783
784
/* We check for "finished" only in the reply path:
785
* w_e_end_ov_reply().
786
* We need to send at least one request out. */
787
stop_sector_reached = i > 0
788
&& verify_can_do_stop_sector(device)
789
&& sector >= device->ov_stop_sector;
790
if (stop_sector_reached)
791
break;
792
793
size = BM_BLOCK_SIZE;
794
795
if (drbd_try_rs_begin_io(peer_device, sector)) {
796
device->ov_position = sector;
797
goto requeue;
798
}
799
800
if (sector + (size>>9) > capacity)
801
size = (capacity-sector)<<9;
802
803
inc_rs_pending(peer_device);
804
if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
805
dec_rs_pending(peer_device);
806
return 0;
807
}
808
sector += BM_SECT_PER_BIT;
809
}
810
device->ov_position = sector;
811
812
requeue:
813
device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
814
if (i == 0 || !stop_sector_reached)
815
mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
816
return 1;
817
}
818
819
int w_ov_finished(struct drbd_work *w, int cancel)
820
{
821
struct drbd_device_work *dw =
822
container_of(w, struct drbd_device_work, w);
823
struct drbd_device *device = dw->device;
824
kfree(dw);
825
ov_out_of_sync_print(first_peer_device(device));
826
drbd_resync_finished(first_peer_device(device));
827
828
return 0;
829
}
830
831
static int w_resync_finished(struct drbd_work *w, int cancel)
832
{
833
struct drbd_device_work *dw =
834
container_of(w, struct drbd_device_work, w);
835
struct drbd_device *device = dw->device;
836
kfree(dw);
837
838
drbd_resync_finished(first_peer_device(device));
839
840
return 0;
841
}
842
843
static void ping_peer(struct drbd_device *device)
844
{
845
struct drbd_connection *connection = first_peer_device(device)->connection;
846
847
clear_bit(GOT_PING_ACK, &connection->flags);
848
request_ping(connection);
849
wait_event(connection->ping_wait,
850
test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
851
}
852
853
int drbd_resync_finished(struct drbd_peer_device *peer_device)
854
{
855
struct drbd_device *device = peer_device->device;
856
struct drbd_connection *connection = peer_device->connection;
857
unsigned long db, dt, dbdt;
858
unsigned long n_oos;
859
union drbd_state os, ns;
860
struct drbd_device_work *dw;
861
char *khelper_cmd = NULL;
862
int verify_done = 0;
863
864
/* Remove all elements from the resync LRU. Since future actions
865
* might set bits in the (main) bitmap, then the entries in the
866
* resync LRU would be wrong. */
867
if (drbd_rs_del_all(device)) {
868
/* In case this is not possible now, most probably because
869
* there are P_RS_DATA_REPLY Packets lingering on the worker's
870
* queue (or even the read operations for those packets
871
* is not finished by now). Retry in 100ms. */
872
873
schedule_timeout_interruptible(HZ / 10);
874
dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
875
if (dw) {
876
dw->w.cb = w_resync_finished;
877
dw->device = device;
878
drbd_queue_work(&connection->sender_work, &dw->w);
879
return 1;
880
}
881
drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
882
}
883
884
dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
885
if (dt <= 0)
886
dt = 1;
887
888
db = device->rs_total;
889
/* adjust for verify start and stop sectors, respective reached position */
890
if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
891
db -= device->ov_left;
892
893
dbdt = Bit2KB(db/dt);
894
device->rs_paused /= HZ;
895
896
if (!get_ldev(device))
897
goto out;
898
899
ping_peer(device);
900
901
spin_lock_irq(&device->resource->req_lock);
902
os = drbd_read_state(device);
903
904
verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
905
906
/* This protects us against multiple calls (that can happen in the presence
907
of application IO), and against connectivity loss just before we arrive here. */
908
if (os.conn <= C_CONNECTED)
909
goto out_unlock;
910
911
ns = os;
912
ns.conn = C_CONNECTED;
913
914
drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
915
verify_done ? "Online verify" : "Resync",
916
dt + device->rs_paused, device->rs_paused, dbdt);
917
918
n_oos = drbd_bm_total_weight(device);
919
920
if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
921
if (n_oos) {
922
drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
923
n_oos, Bit2KB(1));
924
khelper_cmd = "out-of-sync";
925
}
926
} else {
927
D_ASSERT(device, (n_oos - device->rs_failed) == 0);
928
929
if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
930
khelper_cmd = "after-resync-target";
931
932
if (device->use_csums && device->rs_total) {
933
const unsigned long s = device->rs_same_csum;
934
const unsigned long t = device->rs_total;
935
const int ratio =
936
(t == 0) ? 0 :
937
(t < 100000) ? ((s*100)/t) : (s/(t/100));
938
drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
939
"transferred %luK total %luK\n",
940
ratio,
941
Bit2KB(device->rs_same_csum),
942
Bit2KB(device->rs_total - device->rs_same_csum),
943
Bit2KB(device->rs_total));
944
}
945
}
946
947
if (device->rs_failed) {
948
drbd_info(device, " %lu failed blocks\n", device->rs_failed);
949
950
if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
951
ns.disk = D_INCONSISTENT;
952
ns.pdsk = D_UP_TO_DATE;
953
} else {
954
ns.disk = D_UP_TO_DATE;
955
ns.pdsk = D_INCONSISTENT;
956
}
957
} else {
958
ns.disk = D_UP_TO_DATE;
959
ns.pdsk = D_UP_TO_DATE;
960
961
if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
962
if (device->p_uuid) {
963
int i;
964
for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
965
_drbd_uuid_set(device, i, device->p_uuid[i]);
966
drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
967
_drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
968
} else {
969
drbd_err(device, "device->p_uuid is NULL! BUG\n");
970
}
971
}
972
973
if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
974
/* for verify runs, we don't update uuids here,
975
* so there would be nothing to report. */
976
drbd_uuid_set_bm(device, 0UL);
977
drbd_print_uuids(device, "updated UUIDs");
978
if (device->p_uuid) {
979
/* Now the two UUID sets are equal, update what we
980
* know of the peer. */
981
int i;
982
for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
983
device->p_uuid[i] = device->ldev->md.uuid[i];
984
}
985
}
986
}
987
988
_drbd_set_state(device, ns, CS_VERBOSE, NULL);
989
out_unlock:
990
spin_unlock_irq(&device->resource->req_lock);
991
992
/* If we have been sync source, and have an effective fencing-policy,
993
* once *all* volumes are back in sync, call "unfence". */
994
if (os.conn == C_SYNC_SOURCE) {
995
enum drbd_disk_state disk_state = D_MASK;
996
enum drbd_disk_state pdsk_state = D_MASK;
997
enum drbd_fencing_p fp = FP_DONT_CARE;
998
999
rcu_read_lock();
1000
fp = rcu_dereference(device->ldev->disk_conf)->fencing;
1001
if (fp != FP_DONT_CARE) {
1002
struct drbd_peer_device *peer_device;
1003
int vnr;
1004
idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1005
struct drbd_device *device = peer_device->device;
1006
disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk);
1007
pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk);
1008
}
1009
}
1010
rcu_read_unlock();
1011
if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE)
1012
conn_khelper(connection, "unfence-peer");
1013
}
1014
1015
put_ldev(device);
1016
out:
1017
device->rs_total = 0;
1018
device->rs_failed = 0;
1019
device->rs_paused = 0;
1020
1021
/* reset start sector, if we reached end of device */
1022
if (verify_done && device->ov_left == 0)
1023
device->ov_start_sector = 0;
1024
1025
drbd_md_sync(device);
1026
1027
if (khelper_cmd)
1028
drbd_khelper(device, khelper_cmd);
1029
1030
return 1;
1031
}
1032
1033
/**
1034
* w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1035
* @w: work object.
1036
* @cancel: The connection will be closed anyways
1037
*/
1038
int w_e_end_data_req(struct drbd_work *w, int cancel)
1039
{
1040
struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1041
struct drbd_peer_device *peer_device = peer_req->peer_device;
1042
struct drbd_device *device = peer_device->device;
1043
int err;
1044
1045
if (unlikely(cancel)) {
1046
err = 0;
1047
goto out;
1048
}
1049
1050
if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1051
err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1052
} else {
1053
if (drbd_ratelimit())
1054
drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1055
(unsigned long long)peer_req->i.sector);
1056
1057
err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1058
}
1059
1060
if (unlikely(err))
1061
drbd_err(device, "drbd_send_block() failed\n");
1062
out:
1063
dec_unacked(device);
1064
drbd_free_peer_req(device, peer_req);
1065
1066
return err;
1067
}
1068
1069
static bool all_zero(struct drbd_peer_request *peer_req)
1070
{
1071
struct page *page = peer_req->pages;
1072
unsigned int len = peer_req->i.size;
1073
1074
page_chain_for_each(page) {
1075
unsigned int l = min_t(unsigned int, len, PAGE_SIZE);
1076
unsigned int i, words = l / sizeof(long);
1077
unsigned long *d;
1078
1079
d = kmap_atomic(page);
1080
for (i = 0; i < words; i++) {
1081
if (d[i]) {
1082
kunmap_atomic(d);
1083
return false;
1084
}
1085
}
1086
kunmap_atomic(d);
1087
len -= l;
1088
}
1089
1090
return true;
1091
}
1092
1093
/**
1094
* w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1095
* @w: work object.
1096
* @cancel: The connection will be closed anyways
1097
*/
1098
int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1099
{
1100
struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1101
struct drbd_peer_device *peer_device = peer_req->peer_device;
1102
struct drbd_device *device = peer_device->device;
1103
int err;
1104
1105
if (unlikely(cancel)) {
1106
err = 0;
1107
goto out;
1108
}
1109
1110
if (get_ldev_if_state(device, D_FAILED)) {
1111
drbd_rs_complete_io(device, peer_req->i.sector);
1112
put_ldev(device);
1113
}
1114
1115
if (device->state.conn == C_AHEAD) {
1116
err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1117
} else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1118
if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1119
inc_rs_pending(peer_device);
1120
if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req))
1121
err = drbd_send_rs_deallocated(peer_device, peer_req);
1122
else
1123
err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1124
} else {
1125
if (drbd_ratelimit())
1126
drbd_err(device, "Not sending RSDataReply, "
1127
"partner DISKLESS!\n");
1128
err = 0;
1129
}
1130
} else {
1131
if (drbd_ratelimit())
1132
drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1133
(unsigned long long)peer_req->i.sector);
1134
1135
err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1136
1137
/* update resync data with failure */
1138
drbd_rs_failed_io(peer_device, peer_req->i.sector, peer_req->i.size);
1139
}
1140
if (unlikely(err))
1141
drbd_err(device, "drbd_send_block() failed\n");
1142
out:
1143
dec_unacked(device);
1144
drbd_free_peer_req(device, peer_req);
1145
1146
return err;
1147
}
1148
1149
int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1150
{
1151
struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1152
struct drbd_peer_device *peer_device = peer_req->peer_device;
1153
struct drbd_device *device = peer_device->device;
1154
struct digest_info *di;
1155
int digest_size;
1156
void *digest = NULL;
1157
int err, eq = 0;
1158
1159
if (unlikely(cancel)) {
1160
err = 0;
1161
goto out;
1162
}
1163
1164
if (get_ldev(device)) {
1165
drbd_rs_complete_io(device, peer_req->i.sector);
1166
put_ldev(device);
1167
}
1168
1169
di = peer_req->digest;
1170
1171
if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1172
/* quick hack to try to avoid a race against reconfiguration.
1173
* a real fix would be much more involved,
1174
* introducing more locking mechanisms */
1175
if (peer_device->connection->csums_tfm) {
1176
digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
1177
D_ASSERT(device, digest_size == di->digest_size);
1178
digest = kmalloc(digest_size, GFP_NOIO);
1179
}
1180
if (digest) {
1181
drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1182
eq = !memcmp(digest, di->digest, digest_size);
1183
kfree(digest);
1184
}
1185
1186
if (eq) {
1187
drbd_set_in_sync(peer_device, peer_req->i.sector, peer_req->i.size);
1188
/* rs_same_csums unit is BM_BLOCK_SIZE */
1189
device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1190
err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1191
} else {
1192
inc_rs_pending(peer_device);
1193
peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1194
peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1195
kfree(di);
1196
err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1197
}
1198
} else {
1199
err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1200
if (drbd_ratelimit())
1201
drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1202
}
1203
if (unlikely(err))
1204
drbd_err(device, "drbd_send_block/ack() failed\n");
1205
out:
1206
dec_unacked(device);
1207
drbd_free_peer_req(device, peer_req);
1208
1209
return err;
1210
}
1211
1212
int w_e_end_ov_req(struct drbd_work *w, int cancel)
1213
{
1214
struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1215
struct drbd_peer_device *peer_device = peer_req->peer_device;
1216
struct drbd_device *device = peer_device->device;
1217
sector_t sector = peer_req->i.sector;
1218
unsigned int size = peer_req->i.size;
1219
int digest_size;
1220
void *digest;
1221
int err = 0;
1222
1223
if (unlikely(cancel))
1224
goto out;
1225
1226
digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1227
digest = kmalloc(digest_size, GFP_NOIO);
1228
if (!digest) {
1229
err = 1; /* terminate the connection in case the allocation failed */
1230
goto out;
1231
}
1232
1233
if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1234
drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1235
else
1236
memset(digest, 0, digest_size);
1237
1238
/* Free e and pages before send.
1239
* In case we block on congestion, we could otherwise run into
1240
* some distributed deadlock, if the other side blocks on
1241
* congestion as well, because our receiver blocks in
1242
* drbd_alloc_pages due to pp_in_use > max_buffers. */
1243
drbd_free_peer_req(device, peer_req);
1244
peer_req = NULL;
1245
inc_rs_pending(peer_device);
1246
err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1247
if (err)
1248
dec_rs_pending(peer_device);
1249
kfree(digest);
1250
1251
out:
1252
if (peer_req)
1253
drbd_free_peer_req(device, peer_req);
1254
dec_unacked(device);
1255
return err;
1256
}
1257
1258
void drbd_ov_out_of_sync_found(struct drbd_peer_device *peer_device, sector_t sector, int size)
1259
{
1260
struct drbd_device *device = peer_device->device;
1261
if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1262
device->ov_last_oos_size += size>>9;
1263
} else {
1264
device->ov_last_oos_start = sector;
1265
device->ov_last_oos_size = size>>9;
1266
}
1267
drbd_set_out_of_sync(peer_device, sector, size);
1268
}
1269
1270
int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1271
{
1272
struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1273
struct drbd_peer_device *peer_device = peer_req->peer_device;
1274
struct drbd_device *device = peer_device->device;
1275
struct digest_info *di;
1276
void *digest;
1277
sector_t sector = peer_req->i.sector;
1278
unsigned int size = peer_req->i.size;
1279
int digest_size;
1280
int err, eq = 0;
1281
bool stop_sector_reached = false;
1282
1283
if (unlikely(cancel)) {
1284
drbd_free_peer_req(device, peer_req);
1285
dec_unacked(device);
1286
return 0;
1287
}
1288
1289
/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1290
* the resync lru has been cleaned up already */
1291
if (get_ldev(device)) {
1292
drbd_rs_complete_io(device, peer_req->i.sector);
1293
put_ldev(device);
1294
}
1295
1296
di = peer_req->digest;
1297
1298
if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1299
digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1300
digest = kmalloc(digest_size, GFP_NOIO);
1301
if (digest) {
1302
drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1303
1304
D_ASSERT(device, digest_size == di->digest_size);
1305
eq = !memcmp(digest, di->digest, digest_size);
1306
kfree(digest);
1307
}
1308
}
1309
1310
/* Free peer_req and pages before send.
1311
* In case we block on congestion, we could otherwise run into
1312
* some distributed deadlock, if the other side blocks on
1313
* congestion as well, because our receiver blocks in
1314
* drbd_alloc_pages due to pp_in_use > max_buffers. */
1315
drbd_free_peer_req(device, peer_req);
1316
if (!eq)
1317
drbd_ov_out_of_sync_found(peer_device, sector, size);
1318
else
1319
ov_out_of_sync_print(peer_device);
1320
1321
err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1322
eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1323
1324
dec_unacked(device);
1325
1326
--device->ov_left;
1327
1328
/* let's advance progress step marks only for every other megabyte */
1329
if ((device->ov_left & 0x200) == 0x200)
1330
drbd_advance_rs_marks(peer_device, device->ov_left);
1331
1332
stop_sector_reached = verify_can_do_stop_sector(device) &&
1333
(sector + (size>>9)) >= device->ov_stop_sector;
1334
1335
if (device->ov_left == 0 || stop_sector_reached) {
1336
ov_out_of_sync_print(peer_device);
1337
drbd_resync_finished(peer_device);
1338
}
1339
1340
return err;
1341
}
1342
1343
/* FIXME
1344
* We need to track the number of pending barrier acks,
1345
* and to be able to wait for them.
1346
* See also comment in drbd_adm_attach before drbd_suspend_io.
1347
*/
1348
static int drbd_send_barrier(struct drbd_connection *connection)
1349
{
1350
struct p_barrier *p;
1351
struct drbd_socket *sock;
1352
1353
sock = &connection->data;
1354
p = conn_prepare_command(connection, sock);
1355
if (!p)
1356
return -EIO;
1357
p->barrier = connection->send.current_epoch_nr;
1358
p->pad = 0;
1359
connection->send.current_epoch_writes = 0;
1360
connection->send.last_sent_barrier_jif = jiffies;
1361
1362
return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1363
}
1364
1365
static int pd_send_unplug_remote(struct drbd_peer_device *pd)
1366
{
1367
struct drbd_socket *sock = &pd->connection->data;
1368
if (!drbd_prepare_command(pd, sock))
1369
return -EIO;
1370
return drbd_send_command(pd, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1371
}
1372
1373
int w_send_write_hint(struct drbd_work *w, int cancel)
1374
{
1375
struct drbd_device *device =
1376
container_of(w, struct drbd_device, unplug_work);
1377
1378
if (cancel)
1379
return 0;
1380
return pd_send_unplug_remote(first_peer_device(device));
1381
}
1382
1383
static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1384
{
1385
if (!connection->send.seen_any_write_yet) {
1386
connection->send.seen_any_write_yet = true;
1387
connection->send.current_epoch_nr = epoch;
1388
connection->send.current_epoch_writes = 0;
1389
connection->send.last_sent_barrier_jif = jiffies;
1390
}
1391
}
1392
1393
static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1394
{
1395
/* re-init if first write on this connection */
1396
if (!connection->send.seen_any_write_yet)
1397
return;
1398
if (connection->send.current_epoch_nr != epoch) {
1399
if (connection->send.current_epoch_writes)
1400
drbd_send_barrier(connection);
1401
connection->send.current_epoch_nr = epoch;
1402
}
1403
}
1404
1405
int w_send_out_of_sync(struct drbd_work *w, int cancel)
1406
{
1407
struct drbd_request *req = container_of(w, struct drbd_request, w);
1408
struct drbd_device *device = req->device;
1409
struct drbd_peer_device *const peer_device = first_peer_device(device);
1410
struct drbd_connection *const connection = peer_device->connection;
1411
int err;
1412
1413
if (unlikely(cancel)) {
1414
req_mod(req, SEND_CANCELED, peer_device);
1415
return 0;
1416
}
1417
req->pre_send_jif = jiffies;
1418
1419
/* this time, no connection->send.current_epoch_writes++;
1420
* If it was sent, it was the closing barrier for the last
1421
* replicated epoch, before we went into AHEAD mode.
1422
* No more barriers will be sent, until we leave AHEAD mode again. */
1423
maybe_send_barrier(connection, req->epoch);
1424
1425
err = drbd_send_out_of_sync(peer_device, req);
1426
req_mod(req, OOS_HANDED_TO_NETWORK, peer_device);
1427
1428
return err;
1429
}
1430
1431
/**
1432
* w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1433
* @w: work object.
1434
* @cancel: The connection will be closed anyways
1435
*/
1436
int w_send_dblock(struct drbd_work *w, int cancel)
1437
{
1438
struct drbd_request *req = container_of(w, struct drbd_request, w);
1439
struct drbd_device *device = req->device;
1440
struct drbd_peer_device *const peer_device = first_peer_device(device);
1441
struct drbd_connection *connection = peer_device->connection;
1442
bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1443
int err;
1444
1445
if (unlikely(cancel)) {
1446
req_mod(req, SEND_CANCELED, peer_device);
1447
return 0;
1448
}
1449
req->pre_send_jif = jiffies;
1450
1451
re_init_if_first_write(connection, req->epoch);
1452
maybe_send_barrier(connection, req->epoch);
1453
connection->send.current_epoch_writes++;
1454
1455
err = drbd_send_dblock(peer_device, req);
1456
req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK, peer_device);
1457
1458
if (do_send_unplug && !err)
1459
pd_send_unplug_remote(peer_device);
1460
1461
return err;
1462
}
1463
1464
/**
1465
* w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1466
* @w: work object.
1467
* @cancel: The connection will be closed anyways
1468
*/
1469
int w_send_read_req(struct drbd_work *w, int cancel)
1470
{
1471
struct drbd_request *req = container_of(w, struct drbd_request, w);
1472
struct drbd_device *device = req->device;
1473
struct drbd_peer_device *const peer_device = first_peer_device(device);
1474
struct drbd_connection *connection = peer_device->connection;
1475
bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1476
int err;
1477
1478
if (unlikely(cancel)) {
1479
req_mod(req, SEND_CANCELED, peer_device);
1480
return 0;
1481
}
1482
req->pre_send_jif = jiffies;
1483
1484
/* Even read requests may close a write epoch,
1485
* if there was any yet. */
1486
maybe_send_barrier(connection, req->epoch);
1487
1488
err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1489
(unsigned long)req);
1490
1491
req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK, peer_device);
1492
1493
if (do_send_unplug && !err)
1494
pd_send_unplug_remote(peer_device);
1495
1496
return err;
1497
}
1498
1499
int w_restart_disk_io(struct drbd_work *w, int cancel)
1500
{
1501
struct drbd_request *req = container_of(w, struct drbd_request, w);
1502
struct drbd_device *device = req->device;
1503
1504
if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1505
drbd_al_begin_io(device, &req->i);
1506
1507
req->private_bio = bio_alloc_clone(device->ldev->backing_bdev,
1508
req->master_bio, GFP_NOIO,
1509
&drbd_io_bio_set);
1510
req->private_bio->bi_private = req;
1511
req->private_bio->bi_end_io = drbd_request_endio;
1512
submit_bio_noacct(req->private_bio);
1513
1514
return 0;
1515
}
1516
1517
static int _drbd_may_sync_now(struct drbd_device *device)
1518
{
1519
struct drbd_device *odev = device;
1520
int resync_after;
1521
1522
while (1) {
1523
if (!odev->ldev || odev->state.disk == D_DISKLESS)
1524
return 1;
1525
rcu_read_lock();
1526
resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1527
rcu_read_unlock();
1528
if (resync_after == -1)
1529
return 1;
1530
odev = minor_to_device(resync_after);
1531
if (!odev)
1532
return 1;
1533
if ((odev->state.conn >= C_SYNC_SOURCE &&
1534
odev->state.conn <= C_PAUSED_SYNC_T) ||
1535
odev->state.aftr_isp || odev->state.peer_isp ||
1536
odev->state.user_isp)
1537
return 0;
1538
}
1539
}
1540
1541
/**
1542
* drbd_pause_after() - Pause resync on all devices that may not resync now
1543
* @device: DRBD device.
1544
*
1545
* Called from process context only (admin command and after_state_ch).
1546
*/
1547
static bool drbd_pause_after(struct drbd_device *device)
1548
{
1549
bool changed = false;
1550
struct drbd_device *odev;
1551
int i;
1552
1553
rcu_read_lock();
1554
idr_for_each_entry(&drbd_devices, odev, i) {
1555
if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1556
continue;
1557
if (!_drbd_may_sync_now(odev) &&
1558
_drbd_set_state(_NS(odev, aftr_isp, 1),
1559
CS_HARD, NULL) != SS_NOTHING_TO_DO)
1560
changed = true;
1561
}
1562
rcu_read_unlock();
1563
1564
return changed;
1565
}
1566
1567
/**
1568
* drbd_resume_next() - Resume resync on all devices that may resync now
1569
* @device: DRBD device.
1570
*
1571
* Called from process context only (admin command and worker).
1572
*/
1573
static bool drbd_resume_next(struct drbd_device *device)
1574
{
1575
bool changed = false;
1576
struct drbd_device *odev;
1577
int i;
1578
1579
rcu_read_lock();
1580
idr_for_each_entry(&drbd_devices, odev, i) {
1581
if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1582
continue;
1583
if (odev->state.aftr_isp) {
1584
if (_drbd_may_sync_now(odev) &&
1585
_drbd_set_state(_NS(odev, aftr_isp, 0),
1586
CS_HARD, NULL) != SS_NOTHING_TO_DO)
1587
changed = true;
1588
}
1589
}
1590
rcu_read_unlock();
1591
return changed;
1592
}
1593
1594
void resume_next_sg(struct drbd_device *device)
1595
{
1596
lock_all_resources();
1597
drbd_resume_next(device);
1598
unlock_all_resources();
1599
}
1600
1601
void suspend_other_sg(struct drbd_device *device)
1602
{
1603
lock_all_resources();
1604
drbd_pause_after(device);
1605
unlock_all_resources();
1606
}
1607
1608
/* caller must lock_all_resources() */
1609
enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1610
{
1611
struct drbd_device *odev;
1612
int resync_after;
1613
1614
if (o_minor == -1)
1615
return NO_ERROR;
1616
if (o_minor < -1 || o_minor > MINORMASK)
1617
return ERR_RESYNC_AFTER;
1618
1619
/* check for loops */
1620
odev = minor_to_device(o_minor);
1621
while (1) {
1622
if (odev == device)
1623
return ERR_RESYNC_AFTER_CYCLE;
1624
1625
/* You are free to depend on diskless, non-existing,
1626
* or not yet/no longer existing minors.
1627
* We only reject dependency loops.
1628
* We cannot follow the dependency chain beyond a detached or
1629
* missing minor.
1630
*/
1631
if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1632
return NO_ERROR;
1633
1634
rcu_read_lock();
1635
resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1636
rcu_read_unlock();
1637
/* dependency chain ends here, no cycles. */
1638
if (resync_after == -1)
1639
return NO_ERROR;
1640
1641
/* follow the dependency chain */
1642
odev = minor_to_device(resync_after);
1643
}
1644
}
1645
1646
/* caller must lock_all_resources() */
1647
void drbd_resync_after_changed(struct drbd_device *device)
1648
{
1649
int changed;
1650
1651
do {
1652
changed = drbd_pause_after(device);
1653
changed |= drbd_resume_next(device);
1654
} while (changed);
1655
}
1656
1657
void drbd_rs_controller_reset(struct drbd_peer_device *peer_device)
1658
{
1659
struct drbd_device *device = peer_device->device;
1660
struct gendisk *disk = device->ldev->backing_bdev->bd_disk;
1661
struct fifo_buffer *plan;
1662
1663
atomic_set(&device->rs_sect_in, 0);
1664
atomic_set(&device->rs_sect_ev, 0);
1665
device->rs_in_flight = 0;
1666
device->rs_last_events =
1667
(int)part_stat_read_accum(disk->part0, sectors);
1668
1669
/* Updating the RCU protected object in place is necessary since
1670
this function gets called from atomic context.
1671
It is valid since all other updates also lead to an completely
1672
empty fifo */
1673
rcu_read_lock();
1674
plan = rcu_dereference(device->rs_plan_s);
1675
plan->total = 0;
1676
fifo_set(plan, 0);
1677
rcu_read_unlock();
1678
}
1679
1680
void start_resync_timer_fn(struct timer_list *t)
1681
{
1682
struct drbd_device *device = timer_container_of(device, t,
1683
start_resync_timer);
1684
drbd_device_post_work(device, RS_START);
1685
}
1686
1687
static void do_start_resync(struct drbd_device *device)
1688
{
1689
if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1690
drbd_warn(device, "postponing start_resync ...\n");
1691
device->start_resync_timer.expires = jiffies + HZ/10;
1692
add_timer(&device->start_resync_timer);
1693
return;
1694
}
1695
1696
drbd_start_resync(device, C_SYNC_SOURCE);
1697
clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1698
}
1699
1700
static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1701
{
1702
bool csums_after_crash_only;
1703
rcu_read_lock();
1704
csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1705
rcu_read_unlock();
1706
return connection->agreed_pro_version >= 89 && /* supported? */
1707
connection->csums_tfm && /* configured? */
1708
(csums_after_crash_only == false /* use for each resync? */
1709
|| test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1710
}
1711
1712
/**
1713
* drbd_start_resync() - Start the resync process
1714
* @device: DRBD device.
1715
* @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1716
*
1717
* This function might bring you directly into one of the
1718
* C_PAUSED_SYNC_* states.
1719
*/
1720
void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1721
{
1722
struct drbd_peer_device *peer_device = first_peer_device(device);
1723
struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1724
union drbd_state ns;
1725
int r;
1726
1727
if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1728
drbd_err(device, "Resync already running!\n");
1729
return;
1730
}
1731
1732
if (!connection) {
1733
drbd_err(device, "No connection to peer, aborting!\n");
1734
return;
1735
}
1736
1737
if (!test_bit(B_RS_H_DONE, &device->flags)) {
1738
if (side == C_SYNC_TARGET) {
1739
/* Since application IO was locked out during C_WF_BITMAP_T and
1740
C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1741
we check that we might make the data inconsistent. */
1742
r = drbd_khelper(device, "before-resync-target");
1743
r = (r >> 8) & 0xff;
1744
if (r > 0) {
1745
drbd_info(device, "before-resync-target handler returned %d, "
1746
"dropping connection.\n", r);
1747
conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1748
return;
1749
}
1750
} else /* C_SYNC_SOURCE */ {
1751
r = drbd_khelper(device, "before-resync-source");
1752
r = (r >> 8) & 0xff;
1753
if (r > 0) {
1754
if (r == 3) {
1755
drbd_info(device, "before-resync-source handler returned %d, "
1756
"ignoring. Old userland tools?", r);
1757
} else {
1758
drbd_info(device, "before-resync-source handler returned %d, "
1759
"dropping connection.\n", r);
1760
conn_request_state(connection,
1761
NS(conn, C_DISCONNECTING), CS_HARD);
1762
return;
1763
}
1764
}
1765
}
1766
}
1767
1768
if (current == connection->worker.task) {
1769
/* The worker should not sleep waiting for state_mutex,
1770
that can take long */
1771
if (!mutex_trylock(device->state_mutex)) {
1772
set_bit(B_RS_H_DONE, &device->flags);
1773
device->start_resync_timer.expires = jiffies + HZ/5;
1774
add_timer(&device->start_resync_timer);
1775
return;
1776
}
1777
} else {
1778
mutex_lock(device->state_mutex);
1779
}
1780
1781
lock_all_resources();
1782
clear_bit(B_RS_H_DONE, &device->flags);
1783
/* Did some connection breakage or IO error race with us? */
1784
if (device->state.conn < C_CONNECTED
1785
|| !get_ldev_if_state(device, D_NEGOTIATING)) {
1786
unlock_all_resources();
1787
goto out;
1788
}
1789
1790
ns = drbd_read_state(device);
1791
1792
ns.aftr_isp = !_drbd_may_sync_now(device);
1793
1794
ns.conn = side;
1795
1796
if (side == C_SYNC_TARGET)
1797
ns.disk = D_INCONSISTENT;
1798
else /* side == C_SYNC_SOURCE */
1799
ns.pdsk = D_INCONSISTENT;
1800
1801
r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1802
ns = drbd_read_state(device);
1803
1804
if (ns.conn < C_CONNECTED)
1805
r = SS_UNKNOWN_ERROR;
1806
1807
if (r == SS_SUCCESS) {
1808
unsigned long tw = drbd_bm_total_weight(device);
1809
unsigned long now = jiffies;
1810
int i;
1811
1812
device->rs_failed = 0;
1813
device->rs_paused = 0;
1814
device->rs_same_csum = 0;
1815
device->rs_last_sect_ev = 0;
1816
device->rs_total = tw;
1817
device->rs_start = now;
1818
for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1819
device->rs_mark_left[i] = tw;
1820
device->rs_mark_time[i] = now;
1821
}
1822
drbd_pause_after(device);
1823
/* Forget potentially stale cached per resync extent bit-counts.
1824
* Open coded drbd_rs_cancel_all(device), we already have IRQs
1825
* disabled, and know the disk state is ok. */
1826
spin_lock(&device->al_lock);
1827
lc_reset(device->resync);
1828
device->resync_locked = 0;
1829
device->resync_wenr = LC_FREE;
1830
spin_unlock(&device->al_lock);
1831
}
1832
unlock_all_resources();
1833
1834
if (r == SS_SUCCESS) {
1835
wake_up(&device->al_wait); /* for lc_reset() above */
1836
/* reset rs_last_bcast when a resync or verify is started,
1837
* to deal with potential jiffies wrap. */
1838
device->rs_last_bcast = jiffies - HZ;
1839
1840
drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1841
drbd_conn_str(ns.conn),
1842
(unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1843
(unsigned long) device->rs_total);
1844
if (side == C_SYNC_TARGET) {
1845
device->bm_resync_fo = 0;
1846
device->use_csums = use_checksum_based_resync(connection, device);
1847
} else {
1848
device->use_csums = false;
1849
}
1850
1851
/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1852
* with w_send_oos, or the sync target will get confused as to
1853
* how much bits to resync. We cannot do that always, because for an
1854
* empty resync and protocol < 95, we need to do it here, as we call
1855
* drbd_resync_finished from here in that case.
1856
* We drbd_gen_and_send_sync_uuid here for protocol < 96,
1857
* and from after_state_ch otherwise. */
1858
if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1859
drbd_gen_and_send_sync_uuid(peer_device);
1860
1861
if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1862
/* This still has a race (about when exactly the peers
1863
* detect connection loss) that can lead to a full sync
1864
* on next handshake. In 8.3.9 we fixed this with explicit
1865
* resync-finished notifications, but the fix
1866
* introduces a protocol change. Sleeping for some
1867
* time longer than the ping interval + timeout on the
1868
* SyncSource, to give the SyncTarget the chance to
1869
* detect connection loss, then waiting for a ping
1870
* response (implicit in drbd_resync_finished) reduces
1871
* the race considerably, but does not solve it. */
1872
if (side == C_SYNC_SOURCE) {
1873
struct net_conf *nc;
1874
int timeo;
1875
1876
rcu_read_lock();
1877
nc = rcu_dereference(connection->net_conf);
1878
timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1879
rcu_read_unlock();
1880
schedule_timeout_interruptible(timeo);
1881
}
1882
drbd_resync_finished(peer_device);
1883
}
1884
1885
drbd_rs_controller_reset(peer_device);
1886
/* ns.conn may already be != device->state.conn,
1887
* we may have been paused in between, or become paused until
1888
* the timer triggers.
1889
* No matter, that is handled in resync_timer_fn() */
1890
if (ns.conn == C_SYNC_TARGET)
1891
mod_timer(&device->resync_timer, jiffies);
1892
1893
drbd_md_sync(device);
1894
}
1895
put_ldev(device);
1896
out:
1897
mutex_unlock(device->state_mutex);
1898
}
1899
1900
static void update_on_disk_bitmap(struct drbd_peer_device *peer_device, bool resync_done)
1901
{
1902
struct drbd_device *device = peer_device->device;
1903
struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1904
device->rs_last_bcast = jiffies;
1905
1906
if (!get_ldev(device))
1907
return;
1908
1909
drbd_bm_write_lazy(device, 0);
1910
if (resync_done && is_sync_state(device->state.conn))
1911
drbd_resync_finished(peer_device);
1912
1913
drbd_bcast_event(device, &sib);
1914
/* update timestamp, in case it took a while to write out stuff */
1915
device->rs_last_bcast = jiffies;
1916
put_ldev(device);
1917
}
1918
1919
static void drbd_ldev_destroy(struct drbd_device *device)
1920
{
1921
lc_destroy(device->resync);
1922
device->resync = NULL;
1923
lc_destroy(device->act_log);
1924
device->act_log = NULL;
1925
1926
__acquire(local);
1927
drbd_backing_dev_free(device, device->ldev);
1928
device->ldev = NULL;
1929
__release(local);
1930
1931
clear_bit(GOING_DISKLESS, &device->flags);
1932
wake_up(&device->misc_wait);
1933
}
1934
1935
static void go_diskless(struct drbd_device *device)
1936
{
1937
struct drbd_peer_device *peer_device = first_peer_device(device);
1938
D_ASSERT(device, device->state.disk == D_FAILED);
1939
/* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1940
* inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1941
* the protected members anymore, though, so once put_ldev reaches zero
1942
* again, it will be safe to free them. */
1943
1944
/* Try to write changed bitmap pages, read errors may have just
1945
* set some bits outside the area covered by the activity log.
1946
*
1947
* If we have an IO error during the bitmap writeout,
1948
* we will want a full sync next time, just in case.
1949
* (Do we want a specific meta data flag for this?)
1950
*
1951
* If that does not make it to stable storage either,
1952
* we cannot do anything about that anymore.
1953
*
1954
* We still need to check if both bitmap and ldev are present, we may
1955
* end up here after a failed attach, before ldev was even assigned.
1956
*/
1957
if (device->bitmap && device->ldev) {
1958
/* An interrupted resync or similar is allowed to recounts bits
1959
* while we detach.
1960
* Any modifications would not be expected anymore, though.
1961
*/
1962
if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1963
"detach", BM_LOCKED_TEST_ALLOWED, peer_device)) {
1964
if (test_bit(WAS_READ_ERROR, &device->flags)) {
1965
drbd_md_set_flag(device, MDF_FULL_SYNC);
1966
drbd_md_sync(device);
1967
}
1968
}
1969
}
1970
1971
drbd_force_state(device, NS(disk, D_DISKLESS));
1972
}
1973
1974
static int do_md_sync(struct drbd_device *device)
1975
{
1976
drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1977
drbd_md_sync(device);
1978
return 0;
1979
}
1980
1981
/* only called from drbd_worker thread, no locking */
1982
void __update_timing_details(
1983
struct drbd_thread_timing_details *tdp,
1984
unsigned int *cb_nr,
1985
void *cb,
1986
const char *fn, const unsigned int line)
1987
{
1988
unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
1989
struct drbd_thread_timing_details *td = tdp + i;
1990
1991
td->start_jif = jiffies;
1992
td->cb_addr = cb;
1993
td->caller_fn = fn;
1994
td->line = line;
1995
td->cb_nr = *cb_nr;
1996
1997
i = (i+1) % DRBD_THREAD_DETAILS_HIST;
1998
td = tdp + i;
1999
memset(td, 0, sizeof(*td));
2000
2001
++(*cb_nr);
2002
}
2003
2004
static void do_device_work(struct drbd_device *device, const unsigned long todo)
2005
{
2006
if (test_bit(MD_SYNC, &todo))
2007
do_md_sync(device);
2008
if (test_bit(RS_DONE, &todo) ||
2009
test_bit(RS_PROGRESS, &todo))
2010
update_on_disk_bitmap(first_peer_device(device), test_bit(RS_DONE, &todo));
2011
if (test_bit(GO_DISKLESS, &todo))
2012
go_diskless(device);
2013
if (test_bit(DESTROY_DISK, &todo))
2014
drbd_ldev_destroy(device);
2015
if (test_bit(RS_START, &todo))
2016
do_start_resync(device);
2017
}
2018
2019
#define DRBD_DEVICE_WORK_MASK \
2020
((1UL << GO_DISKLESS) \
2021
|(1UL << DESTROY_DISK) \
2022
|(1UL << MD_SYNC) \
2023
|(1UL << RS_START) \
2024
|(1UL << RS_PROGRESS) \
2025
|(1UL << RS_DONE) \
2026
)
2027
2028
static unsigned long get_work_bits(unsigned long *flags)
2029
{
2030
unsigned long old, new;
2031
do {
2032
old = *flags;
2033
new = old & ~DRBD_DEVICE_WORK_MASK;
2034
} while (cmpxchg(flags, old, new) != old);
2035
return old & DRBD_DEVICE_WORK_MASK;
2036
}
2037
2038
static void do_unqueued_work(struct drbd_connection *connection)
2039
{
2040
struct drbd_peer_device *peer_device;
2041
int vnr;
2042
2043
rcu_read_lock();
2044
idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2045
struct drbd_device *device = peer_device->device;
2046
unsigned long todo = get_work_bits(&device->flags);
2047
if (!todo)
2048
continue;
2049
2050
kref_get(&device->kref);
2051
rcu_read_unlock();
2052
do_device_work(device, todo);
2053
kref_put(&device->kref, drbd_destroy_device);
2054
rcu_read_lock();
2055
}
2056
rcu_read_unlock();
2057
}
2058
2059
static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
2060
{
2061
spin_lock_irq(&queue->q_lock);
2062
list_splice_tail_init(&queue->q, work_list);
2063
spin_unlock_irq(&queue->q_lock);
2064
return !list_empty(work_list);
2065
}
2066
2067
static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
2068
{
2069
DEFINE_WAIT(wait);
2070
struct net_conf *nc;
2071
int uncork, cork;
2072
2073
dequeue_work_batch(&connection->sender_work, work_list);
2074
if (!list_empty(work_list))
2075
return;
2076
2077
/* Still nothing to do?
2078
* Maybe we still need to close the current epoch,
2079
* even if no new requests are queued yet.
2080
*
2081
* Also, poke TCP, just in case.
2082
* Then wait for new work (or signal). */
2083
rcu_read_lock();
2084
nc = rcu_dereference(connection->net_conf);
2085
uncork = nc ? nc->tcp_cork : 0;
2086
rcu_read_unlock();
2087
if (uncork) {
2088
mutex_lock(&connection->data.mutex);
2089
if (connection->data.socket)
2090
tcp_sock_set_cork(connection->data.socket->sk, false);
2091
mutex_unlock(&connection->data.mutex);
2092
}
2093
2094
for (;;) {
2095
int send_barrier;
2096
prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2097
spin_lock_irq(&connection->resource->req_lock);
2098
spin_lock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
2099
if (!list_empty(&connection->sender_work.q))
2100
list_splice_tail_init(&connection->sender_work.q, work_list);
2101
spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
2102
if (!list_empty(work_list) || signal_pending(current)) {
2103
spin_unlock_irq(&connection->resource->req_lock);
2104
break;
2105
}
2106
2107
/* We found nothing new to do, no to-be-communicated request,
2108
* no other work item. We may still need to close the last
2109
* epoch. Next incoming request epoch will be connection ->
2110
* current transfer log epoch number. If that is different
2111
* from the epoch of the last request we communicated, it is
2112
* safe to send the epoch separating barrier now.
2113
*/
2114
send_barrier =
2115
atomic_read(&connection->current_tle_nr) !=
2116
connection->send.current_epoch_nr;
2117
spin_unlock_irq(&connection->resource->req_lock);
2118
2119
if (send_barrier)
2120
maybe_send_barrier(connection,
2121
connection->send.current_epoch_nr + 1);
2122
2123
if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2124
break;
2125
2126
/* drbd_send() may have called flush_signals() */
2127
if (get_t_state(&connection->worker) != RUNNING)
2128
break;
2129
2130
schedule();
2131
/* may be woken up for other things but new work, too,
2132
* e.g. if the current epoch got closed.
2133
* In which case we send the barrier above. */
2134
}
2135
finish_wait(&connection->sender_work.q_wait, &wait);
2136
2137
/* someone may have changed the config while we have been waiting above. */
2138
rcu_read_lock();
2139
nc = rcu_dereference(connection->net_conf);
2140
cork = nc ? nc->tcp_cork : 0;
2141
rcu_read_unlock();
2142
mutex_lock(&connection->data.mutex);
2143
if (connection->data.socket) {
2144
if (cork)
2145
tcp_sock_set_cork(connection->data.socket->sk, true);
2146
else if (!uncork)
2147
tcp_sock_set_cork(connection->data.socket->sk, false);
2148
}
2149
mutex_unlock(&connection->data.mutex);
2150
}
2151
2152
int drbd_worker(struct drbd_thread *thi)
2153
{
2154
struct drbd_connection *connection = thi->connection;
2155
struct drbd_work *w = NULL;
2156
struct drbd_peer_device *peer_device;
2157
LIST_HEAD(work_list);
2158
int vnr;
2159
2160
while (get_t_state(thi) == RUNNING) {
2161
drbd_thread_current_set_cpu(thi);
2162
2163
if (list_empty(&work_list)) {
2164
update_worker_timing_details(connection, wait_for_work);
2165
wait_for_work(connection, &work_list);
2166
}
2167
2168
if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2169
update_worker_timing_details(connection, do_unqueued_work);
2170
do_unqueued_work(connection);
2171
}
2172
2173
if (signal_pending(current)) {
2174
flush_signals(current);
2175
if (get_t_state(thi) == RUNNING) {
2176
drbd_warn(connection, "Worker got an unexpected signal\n");
2177
continue;
2178
}
2179
break;
2180
}
2181
2182
if (get_t_state(thi) != RUNNING)
2183
break;
2184
2185
if (!list_empty(&work_list)) {
2186
w = list_first_entry(&work_list, struct drbd_work, list);
2187
list_del_init(&w->list);
2188
update_worker_timing_details(connection, w->cb);
2189
if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2190
continue;
2191
if (connection->cstate >= C_WF_REPORT_PARAMS)
2192
conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2193
}
2194
}
2195
2196
do {
2197
if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2198
update_worker_timing_details(connection, do_unqueued_work);
2199
do_unqueued_work(connection);
2200
}
2201
if (!list_empty(&work_list)) {
2202
w = list_first_entry(&work_list, struct drbd_work, list);
2203
list_del_init(&w->list);
2204
update_worker_timing_details(connection, w->cb);
2205
w->cb(w, 1);
2206
} else
2207
dequeue_work_batch(&connection->sender_work, &work_list);
2208
} while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2209
2210
rcu_read_lock();
2211
idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2212
struct drbd_device *device = peer_device->device;
2213
D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2214
kref_get(&device->kref);
2215
rcu_read_unlock();
2216
drbd_device_cleanup(device);
2217
kref_put(&device->kref, drbd_destroy_device);
2218
rcu_read_lock();
2219
}
2220
rcu_read_unlock();
2221
2222
return 0;
2223
}
2224
2225