Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/drivers/block/drbd/drbd_req.c
26282 views
1
// SPDX-License-Identifier: GPL-2.0-only
2
/*
3
drbd_req.c
4
5
This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6
7
Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
8
Copyright (C) 1999-2008, Philipp Reisner <[email protected]>.
9
Copyright (C) 2002-2008, Lars Ellenberg <[email protected]>.
10
11
12
*/
13
14
#include <linux/module.h>
15
16
#include <linux/slab.h>
17
#include <linux/drbd.h>
18
#include "drbd_int.h"
19
#include "drbd_req.h"
20
21
22
static bool drbd_may_do_local_read(struct drbd_device *device, sector_t sector, int size);
23
24
static struct drbd_request *drbd_req_new(struct drbd_device *device, struct bio *bio_src)
25
{
26
struct drbd_request *req;
27
28
req = mempool_alloc(&drbd_request_mempool, GFP_NOIO);
29
if (!req)
30
return NULL;
31
memset(req, 0, sizeof(*req));
32
33
req->rq_state = (bio_data_dir(bio_src) == WRITE ? RQ_WRITE : 0)
34
| (bio_op(bio_src) == REQ_OP_WRITE_ZEROES ? RQ_ZEROES : 0)
35
| (bio_op(bio_src) == REQ_OP_DISCARD ? RQ_UNMAP : 0);
36
req->device = device;
37
req->master_bio = bio_src;
38
req->epoch = 0;
39
40
drbd_clear_interval(&req->i);
41
req->i.sector = bio_src->bi_iter.bi_sector;
42
req->i.size = bio_src->bi_iter.bi_size;
43
req->i.local = true;
44
req->i.waiting = false;
45
46
INIT_LIST_HEAD(&req->tl_requests);
47
INIT_LIST_HEAD(&req->w.list);
48
INIT_LIST_HEAD(&req->req_pending_master_completion);
49
INIT_LIST_HEAD(&req->req_pending_local);
50
51
/* one reference to be put by __drbd_make_request */
52
atomic_set(&req->completion_ref, 1);
53
/* one kref as long as completion_ref > 0 */
54
kref_init(&req->kref);
55
return req;
56
}
57
58
static void drbd_remove_request_interval(struct rb_root *root,
59
struct drbd_request *req)
60
{
61
struct drbd_device *device = req->device;
62
struct drbd_interval *i = &req->i;
63
64
drbd_remove_interval(root, i);
65
66
/* Wake up any processes waiting for this request to complete. */
67
if (i->waiting)
68
wake_up(&device->misc_wait);
69
}
70
71
void drbd_req_destroy(struct kref *kref)
72
{
73
struct drbd_request *req = container_of(kref, struct drbd_request, kref);
74
struct drbd_device *device = req->device;
75
const unsigned s = req->rq_state;
76
77
if ((req->master_bio && !(s & RQ_POSTPONED)) ||
78
atomic_read(&req->completion_ref) ||
79
(s & RQ_LOCAL_PENDING) ||
80
((s & RQ_NET_MASK) && !(s & RQ_NET_DONE))) {
81
drbd_err(device, "drbd_req_destroy: Logic BUG rq_state = 0x%x, completion_ref = %d\n",
82
s, atomic_read(&req->completion_ref));
83
return;
84
}
85
86
/* If called from mod_rq_state (expected normal case) or
87
* drbd_send_and_submit (the less likely normal path), this holds the
88
* req_lock, and req->tl_requests will typicaly be on ->transfer_log,
89
* though it may be still empty (never added to the transfer log).
90
*
91
* If called from do_retry(), we do NOT hold the req_lock, but we are
92
* still allowed to unconditionally list_del(&req->tl_requests),
93
* because it will be on a local on-stack list only. */
94
list_del_init(&req->tl_requests);
95
96
/* finally remove the request from the conflict detection
97
* respective block_id verification interval tree. */
98
if (!drbd_interval_empty(&req->i)) {
99
struct rb_root *root;
100
101
if (s & RQ_WRITE)
102
root = &device->write_requests;
103
else
104
root = &device->read_requests;
105
drbd_remove_request_interval(root, req);
106
} else if (s & (RQ_NET_MASK & ~RQ_NET_DONE) && req->i.size != 0)
107
drbd_err(device, "drbd_req_destroy: Logic BUG: interval empty, but: rq_state=0x%x, sect=%llu, size=%u\n",
108
s, (unsigned long long)req->i.sector, req->i.size);
109
110
/* if it was a write, we may have to set the corresponding
111
* bit(s) out-of-sync first. If it had a local part, we need to
112
* release the reference to the activity log. */
113
if (s & RQ_WRITE) {
114
/* Set out-of-sync unless both OK flags are set
115
* (local only or remote failed).
116
* Other places where we set out-of-sync:
117
* READ with local io-error */
118
119
/* There is a special case:
120
* we may notice late that IO was suspended,
121
* and postpone, or schedule for retry, a write,
122
* before it even was submitted or sent.
123
* In that case we do not want to touch the bitmap at all.
124
*/
125
struct drbd_peer_device *peer_device = first_peer_device(device);
126
if ((s & (RQ_POSTPONED|RQ_LOCAL_MASK|RQ_NET_MASK)) != RQ_POSTPONED) {
127
if (!(s & RQ_NET_OK) || !(s & RQ_LOCAL_OK))
128
drbd_set_out_of_sync(peer_device, req->i.sector, req->i.size);
129
130
if ((s & RQ_NET_OK) && (s & RQ_LOCAL_OK) && (s & RQ_NET_SIS))
131
drbd_set_in_sync(peer_device, req->i.sector, req->i.size);
132
}
133
134
/* one might be tempted to move the drbd_al_complete_io
135
* to the local io completion callback drbd_request_endio.
136
* but, if this was a mirror write, we may only
137
* drbd_al_complete_io after this is RQ_NET_DONE,
138
* otherwise the extent could be dropped from the al
139
* before it has actually been written on the peer.
140
* if we crash before our peer knows about the request,
141
* but after the extent has been dropped from the al,
142
* we would forget to resync the corresponding extent.
143
*/
144
if (s & RQ_IN_ACT_LOG) {
145
if (get_ldev_if_state(device, D_FAILED)) {
146
drbd_al_complete_io(device, &req->i);
147
put_ldev(device);
148
} else if (drbd_ratelimit()) {
149
drbd_warn(device, "Should have called drbd_al_complete_io(, %llu, %u), "
150
"but my Disk seems to have failed :(\n",
151
(unsigned long long) req->i.sector, req->i.size);
152
}
153
}
154
}
155
156
mempool_free(req, &drbd_request_mempool);
157
}
158
159
static void wake_all_senders(struct drbd_connection *connection)
160
{
161
wake_up(&connection->sender_work.q_wait);
162
}
163
164
/* must hold resource->req_lock */
165
void start_new_tl_epoch(struct drbd_connection *connection)
166
{
167
/* no point closing an epoch, if it is empty, anyways. */
168
if (connection->current_tle_writes == 0)
169
return;
170
171
connection->current_tle_writes = 0;
172
atomic_inc(&connection->current_tle_nr);
173
wake_all_senders(connection);
174
}
175
176
void complete_master_bio(struct drbd_device *device,
177
struct bio_and_error *m)
178
{
179
if (unlikely(m->error))
180
m->bio->bi_status = errno_to_blk_status(m->error);
181
bio_endio(m->bio);
182
dec_ap_bio(device);
183
}
184
185
186
/* Helper for __req_mod().
187
* Set m->bio to the master bio, if it is fit to be completed,
188
* or leave it alone (it is initialized to NULL in __req_mod),
189
* if it has already been completed, or cannot be completed yet.
190
* If m->bio is set, the error status to be returned is placed in m->error.
191
*/
192
static
193
void drbd_req_complete(struct drbd_request *req, struct bio_and_error *m)
194
{
195
const unsigned s = req->rq_state;
196
struct drbd_device *device = req->device;
197
int error, ok;
198
199
/* we must not complete the master bio, while it is
200
* still being processed by _drbd_send_zc_bio (drbd_send_dblock)
201
* not yet acknowledged by the peer
202
* not yet completed by the local io subsystem
203
* these flags may get cleared in any order by
204
* the worker,
205
* the receiver,
206
* the bio_endio completion callbacks.
207
*/
208
if ((s & RQ_LOCAL_PENDING && !(s & RQ_LOCAL_ABORTED)) ||
209
(s & RQ_NET_QUEUED) || (s & RQ_NET_PENDING) ||
210
(s & RQ_COMPLETION_SUSP)) {
211
drbd_err(device, "drbd_req_complete: Logic BUG rq_state = 0x%x\n", s);
212
return;
213
}
214
215
if (!req->master_bio) {
216
drbd_err(device, "drbd_req_complete: Logic BUG, master_bio == NULL!\n");
217
return;
218
}
219
220
/*
221
* figure out whether to report success or failure.
222
*
223
* report success when at least one of the operations succeeded.
224
* or, to put the other way,
225
* only report failure, when both operations failed.
226
*
227
* what to do about the failures is handled elsewhere.
228
* what we need to do here is just: complete the master_bio.
229
*
230
* local completion error, if any, has been stored as ERR_PTR
231
* in private_bio within drbd_request_endio.
232
*/
233
ok = (s & RQ_LOCAL_OK) || (s & RQ_NET_OK);
234
error = PTR_ERR(req->private_bio);
235
236
/* Before we can signal completion to the upper layers,
237
* we may need to close the current transfer log epoch.
238
* We are within the request lock, so we can simply compare
239
* the request epoch number with the current transfer log
240
* epoch number. If they match, increase the current_tle_nr,
241
* and reset the transfer log epoch write_cnt.
242
*/
243
if (op_is_write(bio_op(req->master_bio)) &&
244
req->epoch == atomic_read(&first_peer_device(device)->connection->current_tle_nr))
245
start_new_tl_epoch(first_peer_device(device)->connection);
246
247
/* Update disk stats */
248
bio_end_io_acct(req->master_bio, req->start_jif);
249
250
/* If READ failed,
251
* have it be pushed back to the retry work queue,
252
* so it will re-enter __drbd_make_request(),
253
* and be re-assigned to a suitable local or remote path,
254
* or failed if we do not have access to good data anymore.
255
*
256
* Unless it was failed early by __drbd_make_request(),
257
* because no path was available, in which case
258
* it was not even added to the transfer_log.
259
*
260
* read-ahead may fail, and will not be retried.
261
*
262
* WRITE should have used all available paths already.
263
*/
264
if (!ok &&
265
bio_op(req->master_bio) == REQ_OP_READ &&
266
!(req->master_bio->bi_opf & REQ_RAHEAD) &&
267
!list_empty(&req->tl_requests))
268
req->rq_state |= RQ_POSTPONED;
269
270
if (!(req->rq_state & RQ_POSTPONED)) {
271
m->error = ok ? 0 : (error ?: -EIO);
272
m->bio = req->master_bio;
273
req->master_bio = NULL;
274
/* We leave it in the tree, to be able to verify later
275
* write-acks in protocol != C during resync.
276
* But we mark it as "complete", so it won't be counted as
277
* conflict in a multi-primary setup. */
278
req->i.completed = true;
279
}
280
281
if (req->i.waiting)
282
wake_up(&device->misc_wait);
283
284
/* Either we are about to complete to upper layers,
285
* or we will restart this request.
286
* In either case, the request object will be destroyed soon,
287
* so better remove it from all lists. */
288
list_del_init(&req->req_pending_master_completion);
289
}
290
291
/* still holds resource->req_lock */
292
static void drbd_req_put_completion_ref(struct drbd_request *req, struct bio_and_error *m, int put)
293
{
294
struct drbd_device *device = req->device;
295
D_ASSERT(device, m || (req->rq_state & RQ_POSTPONED));
296
297
if (!put)
298
return;
299
300
if (!atomic_sub_and_test(put, &req->completion_ref))
301
return;
302
303
drbd_req_complete(req, m);
304
305
/* local completion may still come in later,
306
* we need to keep the req object around. */
307
if (req->rq_state & RQ_LOCAL_ABORTED)
308
return;
309
310
if (req->rq_state & RQ_POSTPONED) {
311
/* don't destroy the req object just yet,
312
* but queue it for retry */
313
drbd_restart_request(req);
314
return;
315
}
316
317
kref_put(&req->kref, drbd_req_destroy);
318
}
319
320
static void set_if_null_req_next(struct drbd_peer_device *peer_device, struct drbd_request *req)
321
{
322
struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
323
if (!connection)
324
return;
325
if (connection->req_next == NULL)
326
connection->req_next = req;
327
}
328
329
static void advance_conn_req_next(struct drbd_peer_device *peer_device, struct drbd_request *req)
330
{
331
struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
332
struct drbd_request *iter = req;
333
if (!connection)
334
return;
335
if (connection->req_next != req)
336
return;
337
338
req = NULL;
339
list_for_each_entry_continue(iter, &connection->transfer_log, tl_requests) {
340
const unsigned int s = iter->rq_state;
341
342
if (s & RQ_NET_QUEUED) {
343
req = iter;
344
break;
345
}
346
}
347
connection->req_next = req;
348
}
349
350
static void set_if_null_req_ack_pending(struct drbd_peer_device *peer_device, struct drbd_request *req)
351
{
352
struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
353
if (!connection)
354
return;
355
if (connection->req_ack_pending == NULL)
356
connection->req_ack_pending = req;
357
}
358
359
static void advance_conn_req_ack_pending(struct drbd_peer_device *peer_device, struct drbd_request *req)
360
{
361
struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
362
struct drbd_request *iter = req;
363
if (!connection)
364
return;
365
if (connection->req_ack_pending != req)
366
return;
367
368
req = NULL;
369
list_for_each_entry_continue(iter, &connection->transfer_log, tl_requests) {
370
const unsigned int s = iter->rq_state;
371
372
if ((s & RQ_NET_SENT) && (s & RQ_NET_PENDING)) {
373
req = iter;
374
break;
375
}
376
}
377
connection->req_ack_pending = req;
378
}
379
380
static void set_if_null_req_not_net_done(struct drbd_peer_device *peer_device, struct drbd_request *req)
381
{
382
struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
383
if (!connection)
384
return;
385
if (connection->req_not_net_done == NULL)
386
connection->req_not_net_done = req;
387
}
388
389
static void advance_conn_req_not_net_done(struct drbd_peer_device *peer_device, struct drbd_request *req)
390
{
391
struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
392
struct drbd_request *iter = req;
393
if (!connection)
394
return;
395
if (connection->req_not_net_done != req)
396
return;
397
398
req = NULL;
399
list_for_each_entry_continue(iter, &connection->transfer_log, tl_requests) {
400
const unsigned int s = iter->rq_state;
401
402
if ((s & RQ_NET_SENT) && !(s & RQ_NET_DONE)) {
403
req = iter;
404
break;
405
}
406
}
407
connection->req_not_net_done = req;
408
}
409
410
/* I'd like this to be the only place that manipulates
411
* req->completion_ref and req->kref. */
412
static void mod_rq_state(struct drbd_request *req, struct bio_and_error *m,
413
int clear, int set)
414
{
415
struct drbd_device *device = req->device;
416
struct drbd_peer_device *peer_device = first_peer_device(device);
417
unsigned s = req->rq_state;
418
int c_put = 0;
419
420
if (drbd_suspended(device) && !((s | clear) & RQ_COMPLETION_SUSP))
421
set |= RQ_COMPLETION_SUSP;
422
423
/* apply */
424
425
req->rq_state &= ~clear;
426
req->rq_state |= set;
427
428
/* no change? */
429
if (req->rq_state == s)
430
return;
431
432
/* intent: get references */
433
434
kref_get(&req->kref);
435
436
if (!(s & RQ_LOCAL_PENDING) && (set & RQ_LOCAL_PENDING))
437
atomic_inc(&req->completion_ref);
438
439
if (!(s & RQ_NET_PENDING) && (set & RQ_NET_PENDING)) {
440
inc_ap_pending(device);
441
atomic_inc(&req->completion_ref);
442
}
443
444
if (!(s & RQ_NET_QUEUED) && (set & RQ_NET_QUEUED)) {
445
atomic_inc(&req->completion_ref);
446
set_if_null_req_next(peer_device, req);
447
}
448
449
if (!(s & RQ_EXP_BARR_ACK) && (set & RQ_EXP_BARR_ACK))
450
kref_get(&req->kref); /* wait for the DONE */
451
452
if (!(s & RQ_NET_SENT) && (set & RQ_NET_SENT)) {
453
/* potentially already completed in the ack_receiver thread */
454
if (!(s & RQ_NET_DONE)) {
455
atomic_add(req->i.size >> 9, &device->ap_in_flight);
456
set_if_null_req_not_net_done(peer_device, req);
457
}
458
if (req->rq_state & RQ_NET_PENDING)
459
set_if_null_req_ack_pending(peer_device, req);
460
}
461
462
if (!(s & RQ_COMPLETION_SUSP) && (set & RQ_COMPLETION_SUSP))
463
atomic_inc(&req->completion_ref);
464
465
/* progress: put references */
466
467
if ((s & RQ_COMPLETION_SUSP) && (clear & RQ_COMPLETION_SUSP))
468
++c_put;
469
470
if (!(s & RQ_LOCAL_ABORTED) && (set & RQ_LOCAL_ABORTED)) {
471
D_ASSERT(device, req->rq_state & RQ_LOCAL_PENDING);
472
++c_put;
473
}
474
475
if ((s & RQ_LOCAL_PENDING) && (clear & RQ_LOCAL_PENDING)) {
476
if (req->rq_state & RQ_LOCAL_ABORTED)
477
kref_put(&req->kref, drbd_req_destroy);
478
else
479
++c_put;
480
list_del_init(&req->req_pending_local);
481
}
482
483
if ((s & RQ_NET_PENDING) && (clear & RQ_NET_PENDING)) {
484
dec_ap_pending(device);
485
++c_put;
486
req->acked_jif = jiffies;
487
advance_conn_req_ack_pending(peer_device, req);
488
}
489
490
if ((s & RQ_NET_QUEUED) && (clear & RQ_NET_QUEUED)) {
491
++c_put;
492
advance_conn_req_next(peer_device, req);
493
}
494
495
if (!(s & RQ_NET_DONE) && (set & RQ_NET_DONE)) {
496
if (s & RQ_NET_SENT)
497
atomic_sub(req->i.size >> 9, &device->ap_in_flight);
498
if (s & RQ_EXP_BARR_ACK)
499
kref_put(&req->kref, drbd_req_destroy);
500
req->net_done_jif = jiffies;
501
502
/* in ahead/behind mode, or just in case,
503
* before we finally destroy this request,
504
* the caching pointers must not reference it anymore */
505
advance_conn_req_next(peer_device, req);
506
advance_conn_req_ack_pending(peer_device, req);
507
advance_conn_req_not_net_done(peer_device, req);
508
}
509
510
/* potentially complete and destroy */
511
512
/* If we made progress, retry conflicting peer requests, if any. */
513
if (req->i.waiting)
514
wake_up(&device->misc_wait);
515
516
drbd_req_put_completion_ref(req, m, c_put);
517
kref_put(&req->kref, drbd_req_destroy);
518
}
519
520
static void drbd_report_io_error(struct drbd_device *device, struct drbd_request *req)
521
{
522
if (!drbd_ratelimit())
523
return;
524
525
drbd_warn(device, "local %s IO error sector %llu+%u on %pg\n",
526
(req->rq_state & RQ_WRITE) ? "WRITE" : "READ",
527
(unsigned long long)req->i.sector,
528
req->i.size >> 9,
529
device->ldev->backing_bdev);
530
}
531
532
/* Helper for HANDED_OVER_TO_NETWORK.
533
* Is this a protocol A write (neither WRITE_ACK nor RECEIVE_ACK expected)?
534
* Is it also still "PENDING"?
535
* --> If so, clear PENDING and set NET_OK below.
536
* If it is a protocol A write, but not RQ_PENDING anymore, neg-ack was faster
537
* (and we must not set RQ_NET_OK) */
538
static inline bool is_pending_write_protocol_A(struct drbd_request *req)
539
{
540
return (req->rq_state &
541
(RQ_WRITE|RQ_NET_PENDING|RQ_EXP_WRITE_ACK|RQ_EXP_RECEIVE_ACK))
542
== (RQ_WRITE|RQ_NET_PENDING);
543
}
544
545
/* obviously this could be coded as many single functions
546
* instead of one huge switch,
547
* or by putting the code directly in the respective locations
548
* (as it has been before).
549
*
550
* but having it this way
551
* enforces that it is all in this one place, where it is easier to audit,
552
* it makes it obvious that whatever "event" "happens" to a request should
553
* happen "atomically" within the req_lock,
554
* and it enforces that we have to think in a very structured manner
555
* about the "events" that may happen to a request during its life time ...
556
*
557
*
558
* peer_device == NULL means local disk
559
*/
560
int __req_mod(struct drbd_request *req, enum drbd_req_event what,
561
struct drbd_peer_device *peer_device,
562
struct bio_and_error *m)
563
{
564
struct drbd_device *const device = req->device;
565
struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
566
struct net_conf *nc;
567
int p, rv = 0;
568
569
if (m)
570
m->bio = NULL;
571
572
switch (what) {
573
default:
574
drbd_err(device, "LOGIC BUG in %s:%u\n", __FILE__ , __LINE__);
575
break;
576
577
/* does not happen...
578
* initialization done in drbd_req_new
579
case CREATED:
580
break;
581
*/
582
583
case TO_BE_SENT: /* via network */
584
/* reached via __drbd_make_request
585
* and from w_read_retry_remote */
586
D_ASSERT(device, !(req->rq_state & RQ_NET_MASK));
587
rcu_read_lock();
588
nc = rcu_dereference(connection->net_conf);
589
p = nc->wire_protocol;
590
rcu_read_unlock();
591
req->rq_state |=
592
p == DRBD_PROT_C ? RQ_EXP_WRITE_ACK :
593
p == DRBD_PROT_B ? RQ_EXP_RECEIVE_ACK : 0;
594
mod_rq_state(req, m, 0, RQ_NET_PENDING);
595
break;
596
597
case TO_BE_SUBMITTED: /* locally */
598
/* reached via __drbd_make_request */
599
D_ASSERT(device, !(req->rq_state & RQ_LOCAL_MASK));
600
mod_rq_state(req, m, 0, RQ_LOCAL_PENDING);
601
break;
602
603
case COMPLETED_OK:
604
if (req->rq_state & RQ_WRITE)
605
device->writ_cnt += req->i.size >> 9;
606
else
607
device->read_cnt += req->i.size >> 9;
608
609
mod_rq_state(req, m, RQ_LOCAL_PENDING,
610
RQ_LOCAL_COMPLETED|RQ_LOCAL_OK);
611
break;
612
613
case ABORT_DISK_IO:
614
mod_rq_state(req, m, 0, RQ_LOCAL_ABORTED);
615
break;
616
617
case WRITE_COMPLETED_WITH_ERROR:
618
drbd_report_io_error(device, req);
619
__drbd_chk_io_error(device, DRBD_WRITE_ERROR);
620
mod_rq_state(req, m, RQ_LOCAL_PENDING, RQ_LOCAL_COMPLETED);
621
break;
622
623
case READ_COMPLETED_WITH_ERROR:
624
drbd_set_out_of_sync(peer_device, req->i.sector, req->i.size);
625
drbd_report_io_error(device, req);
626
__drbd_chk_io_error(device, DRBD_READ_ERROR);
627
fallthrough;
628
case READ_AHEAD_COMPLETED_WITH_ERROR:
629
/* it is legal to fail read-ahead, no __drbd_chk_io_error in that case. */
630
mod_rq_state(req, m, RQ_LOCAL_PENDING, RQ_LOCAL_COMPLETED);
631
break;
632
633
case DISCARD_COMPLETED_NOTSUPP:
634
case DISCARD_COMPLETED_WITH_ERROR:
635
/* I'd rather not detach from local disk just because it
636
* failed a REQ_OP_DISCARD. */
637
mod_rq_state(req, m, RQ_LOCAL_PENDING, RQ_LOCAL_COMPLETED);
638
break;
639
640
case QUEUE_FOR_NET_READ:
641
/* READ, and
642
* no local disk,
643
* or target area marked as invalid,
644
* or just got an io-error. */
645
/* from __drbd_make_request
646
* or from bio_endio during read io-error recovery */
647
648
/* So we can verify the handle in the answer packet.
649
* Corresponding drbd_remove_request_interval is in
650
* drbd_req_complete() */
651
D_ASSERT(device, drbd_interval_empty(&req->i));
652
drbd_insert_interval(&device->read_requests, &req->i);
653
654
set_bit(UNPLUG_REMOTE, &device->flags);
655
656
D_ASSERT(device, req->rq_state & RQ_NET_PENDING);
657
D_ASSERT(device, (req->rq_state & RQ_LOCAL_MASK) == 0);
658
mod_rq_state(req, m, 0, RQ_NET_QUEUED);
659
req->w.cb = w_send_read_req;
660
drbd_queue_work(&connection->sender_work,
661
&req->w);
662
break;
663
664
case QUEUE_FOR_NET_WRITE:
665
/* assert something? */
666
/* from __drbd_make_request only */
667
668
/* Corresponding drbd_remove_request_interval is in
669
* drbd_req_complete() */
670
D_ASSERT(device, drbd_interval_empty(&req->i));
671
drbd_insert_interval(&device->write_requests, &req->i);
672
673
/* NOTE
674
* In case the req ended up on the transfer log before being
675
* queued on the worker, it could lead to this request being
676
* missed during cleanup after connection loss.
677
* So we have to do both operations here,
678
* within the same lock that protects the transfer log.
679
*
680
* _req_add_to_epoch(req); this has to be after the
681
* _maybe_start_new_epoch(req); which happened in
682
* __drbd_make_request, because we now may set the bit
683
* again ourselves to close the current epoch.
684
*
685
* Add req to the (now) current epoch (barrier). */
686
687
/* otherwise we may lose an unplug, which may cause some remote
688
* io-scheduler timeout to expire, increasing maximum latency,
689
* hurting performance. */
690
set_bit(UNPLUG_REMOTE, &device->flags);
691
692
/* queue work item to send data */
693
D_ASSERT(device, req->rq_state & RQ_NET_PENDING);
694
mod_rq_state(req, m, 0, RQ_NET_QUEUED|RQ_EXP_BARR_ACK);
695
req->w.cb = w_send_dblock;
696
drbd_queue_work(&connection->sender_work,
697
&req->w);
698
699
/* close the epoch, in case it outgrew the limit */
700
rcu_read_lock();
701
nc = rcu_dereference(connection->net_conf);
702
p = nc->max_epoch_size;
703
rcu_read_unlock();
704
if (connection->current_tle_writes >= p)
705
start_new_tl_epoch(connection);
706
707
break;
708
709
case QUEUE_FOR_SEND_OOS:
710
mod_rq_state(req, m, 0, RQ_NET_QUEUED);
711
req->w.cb = w_send_out_of_sync;
712
drbd_queue_work(&connection->sender_work,
713
&req->w);
714
break;
715
716
case READ_RETRY_REMOTE_CANCELED:
717
case SEND_CANCELED:
718
case SEND_FAILED:
719
/* real cleanup will be done from tl_clear. just update flags
720
* so it is no longer marked as on the worker queue */
721
mod_rq_state(req, m, RQ_NET_QUEUED, 0);
722
break;
723
724
case HANDED_OVER_TO_NETWORK:
725
/* assert something? */
726
if (is_pending_write_protocol_A(req))
727
/* this is what is dangerous about protocol A:
728
* pretend it was successfully written on the peer. */
729
mod_rq_state(req, m, RQ_NET_QUEUED|RQ_NET_PENDING,
730
RQ_NET_SENT|RQ_NET_OK);
731
else
732
mod_rq_state(req, m, RQ_NET_QUEUED, RQ_NET_SENT);
733
/* It is still not yet RQ_NET_DONE until the
734
* corresponding epoch barrier got acked as well,
735
* so we know what to dirty on connection loss. */
736
break;
737
738
case OOS_HANDED_TO_NETWORK:
739
/* Was not set PENDING, no longer QUEUED, so is now DONE
740
* as far as this connection is concerned. */
741
mod_rq_state(req, m, RQ_NET_QUEUED, RQ_NET_DONE);
742
break;
743
744
case CONNECTION_LOST_WHILE_PENDING:
745
/* transfer log cleanup after connection loss */
746
mod_rq_state(req, m,
747
RQ_NET_OK|RQ_NET_PENDING|RQ_COMPLETION_SUSP,
748
RQ_NET_DONE);
749
break;
750
751
case CONFLICT_RESOLVED:
752
/* for superseded conflicting writes of multiple primaries,
753
* there is no need to keep anything in the tl, potential
754
* node crashes are covered by the activity log.
755
*
756
* If this request had been marked as RQ_POSTPONED before,
757
* it will actually not be completed, but "restarted",
758
* resubmitted from the retry worker context. */
759
D_ASSERT(device, req->rq_state & RQ_NET_PENDING);
760
D_ASSERT(device, req->rq_state & RQ_EXP_WRITE_ACK);
761
mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_DONE|RQ_NET_OK);
762
break;
763
764
case WRITE_ACKED_BY_PEER_AND_SIS:
765
req->rq_state |= RQ_NET_SIS;
766
fallthrough;
767
case WRITE_ACKED_BY_PEER:
768
/* Normal operation protocol C: successfully written on peer.
769
* During resync, even in protocol != C,
770
* we requested an explicit write ack anyways.
771
* Which means we cannot even assert anything here.
772
* Nothing more to do here.
773
* We want to keep the tl in place for all protocols, to cater
774
* for volatile write-back caches on lower level devices. */
775
goto ack_common;
776
case RECV_ACKED_BY_PEER:
777
D_ASSERT(device, req->rq_state & RQ_EXP_RECEIVE_ACK);
778
/* protocol B; pretends to be successfully written on peer.
779
* see also notes above in HANDED_OVER_TO_NETWORK about
780
* protocol != C */
781
ack_common:
782
mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_OK);
783
break;
784
785
case POSTPONE_WRITE:
786
D_ASSERT(device, req->rq_state & RQ_EXP_WRITE_ACK);
787
/* If this node has already detected the write conflict, the
788
* worker will be waiting on misc_wait. Wake it up once this
789
* request has completed locally.
790
*/
791
D_ASSERT(device, req->rq_state & RQ_NET_PENDING);
792
req->rq_state |= RQ_POSTPONED;
793
if (req->i.waiting)
794
wake_up(&device->misc_wait);
795
/* Do not clear RQ_NET_PENDING. This request will make further
796
* progress via restart_conflicting_writes() or
797
* fail_postponed_requests(). Hopefully. */
798
break;
799
800
case NEG_ACKED:
801
mod_rq_state(req, m, RQ_NET_OK|RQ_NET_PENDING, 0);
802
break;
803
804
case FAIL_FROZEN_DISK_IO:
805
if (!(req->rq_state & RQ_LOCAL_COMPLETED))
806
break;
807
mod_rq_state(req, m, RQ_COMPLETION_SUSP, 0);
808
break;
809
810
case RESTART_FROZEN_DISK_IO:
811
if (!(req->rq_state & RQ_LOCAL_COMPLETED))
812
break;
813
814
mod_rq_state(req, m,
815
RQ_COMPLETION_SUSP|RQ_LOCAL_COMPLETED,
816
RQ_LOCAL_PENDING);
817
818
rv = MR_READ;
819
if (bio_data_dir(req->master_bio) == WRITE)
820
rv = MR_WRITE;
821
822
get_ldev(device); /* always succeeds in this call path */
823
req->w.cb = w_restart_disk_io;
824
drbd_queue_work(&connection->sender_work,
825
&req->w);
826
break;
827
828
case RESEND:
829
/* Simply complete (local only) READs. */
830
if (!(req->rq_state & RQ_WRITE) && !req->w.cb) {
831
mod_rq_state(req, m, RQ_COMPLETION_SUSP, 0);
832
break;
833
}
834
835
/* If RQ_NET_OK is already set, we got a P_WRITE_ACK or P_RECV_ACK
836
before the connection loss (B&C only); only P_BARRIER_ACK
837
(or the local completion?) was missing when we suspended.
838
Throwing them out of the TL here by pretending we got a BARRIER_ACK.
839
During connection handshake, we ensure that the peer was not rebooted. */
840
if (!(req->rq_state & RQ_NET_OK)) {
841
/* FIXME could this possibly be a req->dw.cb == w_send_out_of_sync?
842
* in that case we must not set RQ_NET_PENDING. */
843
844
mod_rq_state(req, m, RQ_COMPLETION_SUSP, RQ_NET_QUEUED|RQ_NET_PENDING);
845
if (req->w.cb) {
846
/* w.cb expected to be w_send_dblock, or w_send_read_req */
847
drbd_queue_work(&connection->sender_work,
848
&req->w);
849
rv = req->rq_state & RQ_WRITE ? MR_WRITE : MR_READ;
850
} /* else: FIXME can this happen? */
851
break;
852
}
853
fallthrough; /* to BARRIER_ACKED */
854
855
case BARRIER_ACKED:
856
/* barrier ack for READ requests does not make sense */
857
if (!(req->rq_state & RQ_WRITE))
858
break;
859
860
if (req->rq_state & RQ_NET_PENDING) {
861
/* barrier came in before all requests were acked.
862
* this is bad, because if the connection is lost now,
863
* we won't be able to clean them up... */
864
drbd_err(device, "FIXME (BARRIER_ACKED but pending)\n");
865
}
866
/* Allowed to complete requests, even while suspended.
867
* As this is called for all requests within a matching epoch,
868
* we need to filter, and only set RQ_NET_DONE for those that
869
* have actually been on the wire. */
870
mod_rq_state(req, m, RQ_COMPLETION_SUSP,
871
(req->rq_state & RQ_NET_MASK) ? RQ_NET_DONE : 0);
872
break;
873
874
case DATA_RECEIVED:
875
D_ASSERT(device, req->rq_state & RQ_NET_PENDING);
876
mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_OK|RQ_NET_DONE);
877
break;
878
879
case QUEUE_AS_DRBD_BARRIER:
880
start_new_tl_epoch(connection);
881
mod_rq_state(req, m, 0, RQ_NET_OK|RQ_NET_DONE);
882
break;
883
}
884
885
return rv;
886
}
887
888
/* we may do a local read if:
889
* - we are consistent (of course),
890
* - or we are generally inconsistent,
891
* BUT we are still/already IN SYNC for this area.
892
* since size may be bigger than BM_BLOCK_SIZE,
893
* we may need to check several bits.
894
*/
895
static bool drbd_may_do_local_read(struct drbd_device *device, sector_t sector, int size)
896
{
897
unsigned long sbnr, ebnr;
898
sector_t esector, nr_sectors;
899
900
if (device->state.disk == D_UP_TO_DATE)
901
return true;
902
if (device->state.disk != D_INCONSISTENT)
903
return false;
904
esector = sector + (size >> 9) - 1;
905
nr_sectors = get_capacity(device->vdisk);
906
D_ASSERT(device, sector < nr_sectors);
907
D_ASSERT(device, esector < nr_sectors);
908
909
sbnr = BM_SECT_TO_BIT(sector);
910
ebnr = BM_SECT_TO_BIT(esector);
911
912
return drbd_bm_count_bits(device, sbnr, ebnr) == 0;
913
}
914
915
static bool remote_due_to_read_balancing(struct drbd_device *device, sector_t sector,
916
enum drbd_read_balancing rbm)
917
{
918
int stripe_shift;
919
920
switch (rbm) {
921
case RB_CONGESTED_REMOTE:
922
return false;
923
case RB_LEAST_PENDING:
924
return atomic_read(&device->local_cnt) >
925
atomic_read(&device->ap_pending_cnt) + atomic_read(&device->rs_pending_cnt);
926
case RB_32K_STRIPING: /* stripe_shift = 15 */
927
case RB_64K_STRIPING:
928
case RB_128K_STRIPING:
929
case RB_256K_STRIPING:
930
case RB_512K_STRIPING:
931
case RB_1M_STRIPING: /* stripe_shift = 20 */
932
stripe_shift = (rbm - RB_32K_STRIPING + 15);
933
return (sector >> (stripe_shift - 9)) & 1;
934
case RB_ROUND_ROBIN:
935
return test_and_change_bit(READ_BALANCE_RR, &device->flags);
936
case RB_PREFER_REMOTE:
937
return true;
938
case RB_PREFER_LOCAL:
939
default:
940
return false;
941
}
942
}
943
944
/*
945
* complete_conflicting_writes - wait for any conflicting write requests
946
*
947
* The write_requests tree contains all active write requests which we
948
* currently know about. Wait for any requests to complete which conflict with
949
* the new one.
950
*
951
* Only way out: remove the conflicting intervals from the tree.
952
*/
953
static void complete_conflicting_writes(struct drbd_request *req)
954
{
955
DEFINE_WAIT(wait);
956
struct drbd_device *device = req->device;
957
struct drbd_interval *i;
958
sector_t sector = req->i.sector;
959
int size = req->i.size;
960
961
for (;;) {
962
drbd_for_each_overlap(i, &device->write_requests, sector, size) {
963
/* Ignore, if already completed to upper layers. */
964
if (i->completed)
965
continue;
966
/* Handle the first found overlap. After the schedule
967
* we have to restart the tree walk. */
968
break;
969
}
970
if (!i) /* if any */
971
break;
972
973
/* Indicate to wake up device->misc_wait on progress. */
974
prepare_to_wait(&device->misc_wait, &wait, TASK_UNINTERRUPTIBLE);
975
i->waiting = true;
976
spin_unlock_irq(&device->resource->req_lock);
977
schedule();
978
spin_lock_irq(&device->resource->req_lock);
979
}
980
finish_wait(&device->misc_wait, &wait);
981
}
982
983
/* called within req_lock */
984
static void maybe_pull_ahead(struct drbd_device *device)
985
{
986
struct drbd_connection *connection = first_peer_device(device)->connection;
987
struct net_conf *nc;
988
bool congested = false;
989
enum drbd_on_congestion on_congestion;
990
991
rcu_read_lock();
992
nc = rcu_dereference(connection->net_conf);
993
on_congestion = nc ? nc->on_congestion : OC_BLOCK;
994
rcu_read_unlock();
995
if (on_congestion == OC_BLOCK ||
996
connection->agreed_pro_version < 96)
997
return;
998
999
if (on_congestion == OC_PULL_AHEAD && device->state.conn == C_AHEAD)
1000
return; /* nothing to do ... */
1001
1002
/* If I don't even have good local storage, we can not reasonably try
1003
* to pull ahead of the peer. We also need the local reference to make
1004
* sure device->act_log is there.
1005
*/
1006
if (!get_ldev_if_state(device, D_UP_TO_DATE))
1007
return;
1008
1009
if (nc->cong_fill &&
1010
atomic_read(&device->ap_in_flight) >= nc->cong_fill) {
1011
drbd_info(device, "Congestion-fill threshold reached\n");
1012
congested = true;
1013
}
1014
1015
if (device->act_log->used >= nc->cong_extents) {
1016
drbd_info(device, "Congestion-extents threshold reached\n");
1017
congested = true;
1018
}
1019
1020
if (congested) {
1021
/* start a new epoch for non-mirrored writes */
1022
start_new_tl_epoch(first_peer_device(device)->connection);
1023
1024
if (on_congestion == OC_PULL_AHEAD)
1025
_drbd_set_state(_NS(device, conn, C_AHEAD), 0, NULL);
1026
else /*nc->on_congestion == OC_DISCONNECT */
1027
_drbd_set_state(_NS(device, conn, C_DISCONNECTING), 0, NULL);
1028
}
1029
put_ldev(device);
1030
}
1031
1032
/* If this returns false, and req->private_bio is still set,
1033
* this should be submitted locally.
1034
*
1035
* If it returns false, but req->private_bio is not set,
1036
* we do not have access to good data :(
1037
*
1038
* Otherwise, this destroys req->private_bio, if any,
1039
* and returns true.
1040
*/
1041
static bool do_remote_read(struct drbd_request *req)
1042
{
1043
struct drbd_device *device = req->device;
1044
enum drbd_read_balancing rbm;
1045
1046
if (req->private_bio) {
1047
if (!drbd_may_do_local_read(device,
1048
req->i.sector, req->i.size)) {
1049
bio_put(req->private_bio);
1050
req->private_bio = NULL;
1051
put_ldev(device);
1052
}
1053
}
1054
1055
if (device->state.pdsk != D_UP_TO_DATE)
1056
return false;
1057
1058
if (req->private_bio == NULL)
1059
return true;
1060
1061
/* TODO: improve read balancing decisions, take into account drbd
1062
* protocol, pending requests etc. */
1063
1064
rcu_read_lock();
1065
rbm = rcu_dereference(device->ldev->disk_conf)->read_balancing;
1066
rcu_read_unlock();
1067
1068
if (rbm == RB_PREFER_LOCAL && req->private_bio)
1069
return false; /* submit locally */
1070
1071
if (remote_due_to_read_balancing(device, req->i.sector, rbm)) {
1072
if (req->private_bio) {
1073
bio_put(req->private_bio);
1074
req->private_bio = NULL;
1075
put_ldev(device);
1076
}
1077
return true;
1078
}
1079
1080
return false;
1081
}
1082
1083
bool drbd_should_do_remote(union drbd_dev_state s)
1084
{
1085
return s.pdsk == D_UP_TO_DATE ||
1086
(s.pdsk >= D_INCONSISTENT &&
1087
s.conn >= C_WF_BITMAP_T &&
1088
s.conn < C_AHEAD);
1089
/* Before proto 96 that was >= CONNECTED instead of >= C_WF_BITMAP_T.
1090
That is equivalent since before 96 IO was frozen in the C_WF_BITMAP*
1091
states. */
1092
}
1093
1094
static bool drbd_should_send_out_of_sync(union drbd_dev_state s)
1095
{
1096
return s.conn == C_AHEAD || s.conn == C_WF_BITMAP_S;
1097
/* pdsk = D_INCONSISTENT as a consequence. Protocol 96 check not necessary
1098
since we enter state C_AHEAD only if proto >= 96 */
1099
}
1100
1101
/* returns number of connections (== 1, for drbd 8.4)
1102
* expected to actually write this data,
1103
* which does NOT include those that we are L_AHEAD for. */
1104
static int drbd_process_write_request(struct drbd_request *req)
1105
{
1106
struct drbd_device *device = req->device;
1107
struct drbd_peer_device *peer_device = first_peer_device(device);
1108
int remote, send_oos;
1109
1110
remote = drbd_should_do_remote(device->state);
1111
send_oos = drbd_should_send_out_of_sync(device->state);
1112
1113
/* Need to replicate writes. Unless it is an empty flush,
1114
* which is better mapped to a DRBD P_BARRIER packet,
1115
* also for drbd wire protocol compatibility reasons.
1116
* If this was a flush, just start a new epoch.
1117
* Unless the current epoch was empty anyways, or we are not currently
1118
* replicating, in which case there is no point. */
1119
if (unlikely(req->i.size == 0)) {
1120
/* The only size==0 bios we expect are empty flushes. */
1121
D_ASSERT(device, req->master_bio->bi_opf & REQ_PREFLUSH);
1122
if (remote)
1123
_req_mod(req, QUEUE_AS_DRBD_BARRIER, peer_device);
1124
return remote;
1125
}
1126
1127
if (!remote && !send_oos)
1128
return 0;
1129
1130
D_ASSERT(device, !(remote && send_oos));
1131
1132
if (remote) {
1133
_req_mod(req, TO_BE_SENT, peer_device);
1134
_req_mod(req, QUEUE_FOR_NET_WRITE, peer_device);
1135
} else if (drbd_set_out_of_sync(peer_device, req->i.sector, req->i.size))
1136
_req_mod(req, QUEUE_FOR_SEND_OOS, peer_device);
1137
1138
return remote;
1139
}
1140
1141
static void drbd_process_discard_or_zeroes_req(struct drbd_request *req, int flags)
1142
{
1143
int err = drbd_issue_discard_or_zero_out(req->device,
1144
req->i.sector, req->i.size >> 9, flags);
1145
if (err)
1146
req->private_bio->bi_status = BLK_STS_IOERR;
1147
bio_endio(req->private_bio);
1148
}
1149
1150
static void
1151
drbd_submit_req_private_bio(struct drbd_request *req)
1152
{
1153
struct drbd_device *device = req->device;
1154
struct bio *bio = req->private_bio;
1155
unsigned int type;
1156
1157
if (bio_op(bio) != REQ_OP_READ)
1158
type = DRBD_FAULT_DT_WR;
1159
else if (bio->bi_opf & REQ_RAHEAD)
1160
type = DRBD_FAULT_DT_RA;
1161
else
1162
type = DRBD_FAULT_DT_RD;
1163
1164
/* State may have changed since we grabbed our reference on the
1165
* ->ldev member. Double check, and short-circuit to endio.
1166
* In case the last activity log transaction failed to get on
1167
* stable storage, and this is a WRITE, we may not even submit
1168
* this bio. */
1169
if (get_ldev(device)) {
1170
if (drbd_insert_fault(device, type))
1171
bio_io_error(bio);
1172
else if (bio_op(bio) == REQ_OP_WRITE_ZEROES)
1173
drbd_process_discard_or_zeroes_req(req, EE_ZEROOUT |
1174
((bio->bi_opf & REQ_NOUNMAP) ? 0 : EE_TRIM));
1175
else if (bio_op(bio) == REQ_OP_DISCARD)
1176
drbd_process_discard_or_zeroes_req(req, EE_TRIM);
1177
else
1178
submit_bio_noacct(bio);
1179
put_ldev(device);
1180
} else
1181
bio_io_error(bio);
1182
}
1183
1184
static void drbd_queue_write(struct drbd_device *device, struct drbd_request *req)
1185
{
1186
spin_lock_irq(&device->resource->req_lock);
1187
list_add_tail(&req->tl_requests, &device->submit.writes);
1188
list_add_tail(&req->req_pending_master_completion,
1189
&device->pending_master_completion[1 /* WRITE */]);
1190
spin_unlock_irq(&device->resource->req_lock);
1191
queue_work(device->submit.wq, &device->submit.worker);
1192
/* do_submit() may sleep internally on al_wait, too */
1193
wake_up(&device->al_wait);
1194
}
1195
1196
/* returns the new drbd_request pointer, if the caller is expected to
1197
* drbd_send_and_submit() it (to save latency), or NULL if we queued the
1198
* request on the submitter thread.
1199
* Returns ERR_PTR(-ENOMEM) if we cannot allocate a drbd_request.
1200
*/
1201
static struct drbd_request *
1202
drbd_request_prepare(struct drbd_device *device, struct bio *bio)
1203
{
1204
const int rw = bio_data_dir(bio);
1205
struct drbd_request *req;
1206
1207
/* allocate outside of all locks; */
1208
req = drbd_req_new(device, bio);
1209
if (!req) {
1210
dec_ap_bio(device);
1211
/* only pass the error to the upper layers.
1212
* if user cannot handle io errors, that's not our business. */
1213
drbd_err(device, "could not kmalloc() req\n");
1214
bio->bi_status = BLK_STS_RESOURCE;
1215
bio_endio(bio);
1216
return ERR_PTR(-ENOMEM);
1217
}
1218
1219
/* Update disk stats */
1220
req->start_jif = bio_start_io_acct(req->master_bio);
1221
1222
if (get_ldev(device)) {
1223
req->private_bio = bio_alloc_clone(device->ldev->backing_bdev,
1224
bio, GFP_NOIO,
1225
&drbd_io_bio_set);
1226
req->private_bio->bi_private = req;
1227
req->private_bio->bi_end_io = drbd_request_endio;
1228
}
1229
1230
/* process discards always from our submitter thread */
1231
if (bio_op(bio) == REQ_OP_WRITE_ZEROES ||
1232
bio_op(bio) == REQ_OP_DISCARD)
1233
goto queue_for_submitter_thread;
1234
1235
if (rw == WRITE && req->private_bio && req->i.size
1236
&& !test_bit(AL_SUSPENDED, &device->flags)) {
1237
if (!drbd_al_begin_io_fastpath(device, &req->i))
1238
goto queue_for_submitter_thread;
1239
req->rq_state |= RQ_IN_ACT_LOG;
1240
req->in_actlog_jif = jiffies;
1241
}
1242
return req;
1243
1244
queue_for_submitter_thread:
1245
atomic_inc(&device->ap_actlog_cnt);
1246
drbd_queue_write(device, req);
1247
return NULL;
1248
}
1249
1250
/* Require at least one path to current data.
1251
* We don't want to allow writes on C_STANDALONE D_INCONSISTENT:
1252
* We would not allow to read what was written,
1253
* we would not have bumped the data generation uuids,
1254
* we would cause data divergence for all the wrong reasons.
1255
*
1256
* If we don't see at least one D_UP_TO_DATE, we will fail this request,
1257
* which either returns EIO, or, if OND_SUSPEND_IO is set, suspends IO,
1258
* and queues for retry later.
1259
*/
1260
static bool may_do_writes(struct drbd_device *device)
1261
{
1262
const union drbd_dev_state s = device->state;
1263
return s.disk == D_UP_TO_DATE || s.pdsk == D_UP_TO_DATE;
1264
}
1265
1266
struct drbd_plug_cb {
1267
struct blk_plug_cb cb;
1268
struct drbd_request *most_recent_req;
1269
/* do we need more? */
1270
};
1271
1272
static void drbd_unplug(struct blk_plug_cb *cb, bool from_schedule)
1273
{
1274
struct drbd_plug_cb *plug = container_of(cb, struct drbd_plug_cb, cb);
1275
struct drbd_resource *resource = plug->cb.data;
1276
struct drbd_request *req = plug->most_recent_req;
1277
1278
kfree(cb);
1279
if (!req)
1280
return;
1281
1282
spin_lock_irq(&resource->req_lock);
1283
/* In case the sender did not process it yet, raise the flag to
1284
* have it followed with P_UNPLUG_REMOTE just after. */
1285
req->rq_state |= RQ_UNPLUG;
1286
/* but also queue a generic unplug */
1287
drbd_queue_unplug(req->device);
1288
kref_put(&req->kref, drbd_req_destroy);
1289
spin_unlock_irq(&resource->req_lock);
1290
}
1291
1292
static struct drbd_plug_cb* drbd_check_plugged(struct drbd_resource *resource)
1293
{
1294
/* A lot of text to say
1295
* return (struct drbd_plug_cb*)blk_check_plugged(); */
1296
struct drbd_plug_cb *plug;
1297
struct blk_plug_cb *cb = blk_check_plugged(drbd_unplug, resource, sizeof(*plug));
1298
1299
if (cb)
1300
plug = container_of(cb, struct drbd_plug_cb, cb);
1301
else
1302
plug = NULL;
1303
return plug;
1304
}
1305
1306
static void drbd_update_plug(struct drbd_plug_cb *plug, struct drbd_request *req)
1307
{
1308
struct drbd_request *tmp = plug->most_recent_req;
1309
/* Will be sent to some peer.
1310
* Remember to tag it with UNPLUG_REMOTE on unplug */
1311
kref_get(&req->kref);
1312
plug->most_recent_req = req;
1313
if (tmp)
1314
kref_put(&tmp->kref, drbd_req_destroy);
1315
}
1316
1317
static void drbd_send_and_submit(struct drbd_device *device, struct drbd_request *req)
1318
{
1319
struct drbd_resource *resource = device->resource;
1320
struct drbd_peer_device *peer_device = first_peer_device(device);
1321
const int rw = bio_data_dir(req->master_bio);
1322
struct bio_and_error m = { NULL, };
1323
bool no_remote = false;
1324
bool submit_private_bio = false;
1325
1326
spin_lock_irq(&resource->req_lock);
1327
if (rw == WRITE) {
1328
/* This may temporarily give up the req_lock,
1329
* but will re-aquire it before it returns here.
1330
* Needs to be before the check on drbd_suspended() */
1331
complete_conflicting_writes(req);
1332
/* no more giving up req_lock from now on! */
1333
1334
/* check for congestion, and potentially stop sending
1335
* full data updates, but start sending "dirty bits" only. */
1336
maybe_pull_ahead(device);
1337
}
1338
1339
1340
if (drbd_suspended(device)) {
1341
/* push back and retry: */
1342
req->rq_state |= RQ_POSTPONED;
1343
if (req->private_bio) {
1344
bio_put(req->private_bio);
1345
req->private_bio = NULL;
1346
put_ldev(device);
1347
}
1348
goto out;
1349
}
1350
1351
/* We fail READ early, if we can not serve it.
1352
* We must do this before req is registered on any lists.
1353
* Otherwise, drbd_req_complete() will queue failed READ for retry. */
1354
if (rw != WRITE) {
1355
if (!do_remote_read(req) && !req->private_bio)
1356
goto nodata;
1357
}
1358
1359
/* which transfer log epoch does this belong to? */
1360
req->epoch = atomic_read(&first_peer_device(device)->connection->current_tle_nr);
1361
1362
/* no point in adding empty flushes to the transfer log,
1363
* they are mapped to drbd barriers already. */
1364
if (likely(req->i.size!=0)) {
1365
if (rw == WRITE)
1366
first_peer_device(device)->connection->current_tle_writes++;
1367
1368
list_add_tail(&req->tl_requests, &first_peer_device(device)->connection->transfer_log);
1369
}
1370
1371
if (rw == WRITE) {
1372
if (req->private_bio && !may_do_writes(device)) {
1373
bio_put(req->private_bio);
1374
req->private_bio = NULL;
1375
put_ldev(device);
1376
goto nodata;
1377
}
1378
if (!drbd_process_write_request(req))
1379
no_remote = true;
1380
} else {
1381
/* We either have a private_bio, or we can read from remote.
1382
* Otherwise we had done the goto nodata above. */
1383
if (req->private_bio == NULL) {
1384
_req_mod(req, TO_BE_SENT, peer_device);
1385
_req_mod(req, QUEUE_FOR_NET_READ, peer_device);
1386
} else
1387
no_remote = true;
1388
}
1389
1390
if (no_remote == false) {
1391
struct drbd_plug_cb *plug = drbd_check_plugged(resource);
1392
if (plug)
1393
drbd_update_plug(plug, req);
1394
}
1395
1396
/* If it took the fast path in drbd_request_prepare, add it here.
1397
* The slow path has added it already. */
1398
if (list_empty(&req->req_pending_master_completion))
1399
list_add_tail(&req->req_pending_master_completion,
1400
&device->pending_master_completion[rw == WRITE]);
1401
if (req->private_bio) {
1402
/* needs to be marked within the same spinlock */
1403
req->pre_submit_jif = jiffies;
1404
list_add_tail(&req->req_pending_local,
1405
&device->pending_completion[rw == WRITE]);
1406
_req_mod(req, TO_BE_SUBMITTED, NULL);
1407
/* but we need to give up the spinlock to submit */
1408
submit_private_bio = true;
1409
} else if (no_remote) {
1410
nodata:
1411
if (drbd_ratelimit())
1412
drbd_err(device, "IO ERROR: neither local nor remote data, sector %llu+%u\n",
1413
(unsigned long long)req->i.sector, req->i.size >> 9);
1414
/* A write may have been queued for send_oos, however.
1415
* So we can not simply free it, we must go through drbd_req_put_completion_ref() */
1416
}
1417
1418
out:
1419
drbd_req_put_completion_ref(req, &m, 1);
1420
spin_unlock_irq(&resource->req_lock);
1421
1422
/* Even though above is a kref_put(), this is safe.
1423
* As long as we still need to submit our private bio,
1424
* we hold a completion ref, and the request cannot disappear.
1425
* If however this request did not even have a private bio to submit
1426
* (e.g. remote read), req may already be invalid now.
1427
* That's why we cannot check on req->private_bio. */
1428
if (submit_private_bio)
1429
drbd_submit_req_private_bio(req);
1430
if (m.bio)
1431
complete_master_bio(device, &m);
1432
}
1433
1434
void __drbd_make_request(struct drbd_device *device, struct bio *bio)
1435
{
1436
struct drbd_request *req = drbd_request_prepare(device, bio);
1437
if (IS_ERR_OR_NULL(req))
1438
return;
1439
drbd_send_and_submit(device, req);
1440
}
1441
1442
static void submit_fast_path(struct drbd_device *device, struct list_head *incoming)
1443
{
1444
struct blk_plug plug;
1445
struct drbd_request *req, *tmp;
1446
1447
blk_start_plug(&plug);
1448
list_for_each_entry_safe(req, tmp, incoming, tl_requests) {
1449
const int rw = bio_data_dir(req->master_bio);
1450
1451
if (rw == WRITE /* rw != WRITE should not even end up here! */
1452
&& req->private_bio && req->i.size
1453
&& !test_bit(AL_SUSPENDED, &device->flags)) {
1454
if (!drbd_al_begin_io_fastpath(device, &req->i))
1455
continue;
1456
1457
req->rq_state |= RQ_IN_ACT_LOG;
1458
req->in_actlog_jif = jiffies;
1459
atomic_dec(&device->ap_actlog_cnt);
1460
}
1461
1462
list_del_init(&req->tl_requests);
1463
drbd_send_and_submit(device, req);
1464
}
1465
blk_finish_plug(&plug);
1466
}
1467
1468
static bool prepare_al_transaction_nonblock(struct drbd_device *device,
1469
struct list_head *incoming,
1470
struct list_head *pending,
1471
struct list_head *later)
1472
{
1473
struct drbd_request *req;
1474
int wake = 0;
1475
int err;
1476
1477
spin_lock_irq(&device->al_lock);
1478
while ((req = list_first_entry_or_null(incoming, struct drbd_request, tl_requests))) {
1479
err = drbd_al_begin_io_nonblock(device, &req->i);
1480
if (err == -ENOBUFS)
1481
break;
1482
if (err == -EBUSY)
1483
wake = 1;
1484
if (err)
1485
list_move_tail(&req->tl_requests, later);
1486
else
1487
list_move_tail(&req->tl_requests, pending);
1488
}
1489
spin_unlock_irq(&device->al_lock);
1490
if (wake)
1491
wake_up(&device->al_wait);
1492
return !list_empty(pending);
1493
}
1494
1495
static void send_and_submit_pending(struct drbd_device *device, struct list_head *pending)
1496
{
1497
struct blk_plug plug;
1498
struct drbd_request *req;
1499
1500
blk_start_plug(&plug);
1501
while ((req = list_first_entry_or_null(pending, struct drbd_request, tl_requests))) {
1502
req->rq_state |= RQ_IN_ACT_LOG;
1503
req->in_actlog_jif = jiffies;
1504
atomic_dec(&device->ap_actlog_cnt);
1505
list_del_init(&req->tl_requests);
1506
drbd_send_and_submit(device, req);
1507
}
1508
blk_finish_plug(&plug);
1509
}
1510
1511
void do_submit(struct work_struct *ws)
1512
{
1513
struct drbd_device *device = container_of(ws, struct drbd_device, submit.worker);
1514
LIST_HEAD(incoming); /* from drbd_make_request() */
1515
LIST_HEAD(pending); /* to be submitted after next AL-transaction commit */
1516
LIST_HEAD(busy); /* blocked by resync requests */
1517
1518
/* grab new incoming requests */
1519
spin_lock_irq(&device->resource->req_lock);
1520
list_splice_tail_init(&device->submit.writes, &incoming);
1521
spin_unlock_irq(&device->resource->req_lock);
1522
1523
for (;;) {
1524
DEFINE_WAIT(wait);
1525
1526
/* move used-to-be-busy back to front of incoming */
1527
list_splice_init(&busy, &incoming);
1528
submit_fast_path(device, &incoming);
1529
if (list_empty(&incoming))
1530
break;
1531
1532
for (;;) {
1533
prepare_to_wait(&device->al_wait, &wait, TASK_UNINTERRUPTIBLE);
1534
1535
list_splice_init(&busy, &incoming);
1536
prepare_al_transaction_nonblock(device, &incoming, &pending, &busy);
1537
if (!list_empty(&pending))
1538
break;
1539
1540
schedule();
1541
1542
/* If all currently "hot" activity log extents are kept busy by
1543
* incoming requests, we still must not totally starve new
1544
* requests to "cold" extents.
1545
* Something left on &incoming means there had not been
1546
* enough update slots available, and the activity log
1547
* has been marked as "starving".
1548
*
1549
* Try again now, without looking for new requests,
1550
* effectively blocking all new requests until we made
1551
* at least _some_ progress with what we currently have.
1552
*/
1553
if (!list_empty(&incoming))
1554
continue;
1555
1556
/* Nothing moved to pending, but nothing left
1557
* on incoming: all moved to busy!
1558
* Grab new and iterate. */
1559
spin_lock_irq(&device->resource->req_lock);
1560
list_splice_tail_init(&device->submit.writes, &incoming);
1561
spin_unlock_irq(&device->resource->req_lock);
1562
}
1563
finish_wait(&device->al_wait, &wait);
1564
1565
/* If the transaction was full, before all incoming requests
1566
* had been processed, skip ahead to commit, and iterate
1567
* without splicing in more incoming requests from upper layers.
1568
*
1569
* Else, if all incoming have been processed,
1570
* they have become either "pending" (to be submitted after
1571
* next transaction commit) or "busy" (blocked by resync).
1572
*
1573
* Maybe more was queued, while we prepared the transaction?
1574
* Try to stuff those into this transaction as well.
1575
* Be strictly non-blocking here,
1576
* we already have something to commit.
1577
*
1578
* Commit if we don't make any more progres.
1579
*/
1580
1581
while (list_empty(&incoming)) {
1582
LIST_HEAD(more_pending);
1583
LIST_HEAD(more_incoming);
1584
bool made_progress;
1585
1586
/* It is ok to look outside the lock,
1587
* it's only an optimization anyways */
1588
if (list_empty(&device->submit.writes))
1589
break;
1590
1591
spin_lock_irq(&device->resource->req_lock);
1592
list_splice_tail_init(&device->submit.writes, &more_incoming);
1593
spin_unlock_irq(&device->resource->req_lock);
1594
1595
if (list_empty(&more_incoming))
1596
break;
1597
1598
made_progress = prepare_al_transaction_nonblock(device, &more_incoming, &more_pending, &busy);
1599
1600
list_splice_tail_init(&more_pending, &pending);
1601
list_splice_tail_init(&more_incoming, &incoming);
1602
if (!made_progress)
1603
break;
1604
}
1605
1606
drbd_al_begin_io_commit(device);
1607
send_and_submit_pending(device, &pending);
1608
}
1609
}
1610
1611
void drbd_submit_bio(struct bio *bio)
1612
{
1613
struct drbd_device *device = bio->bi_bdev->bd_disk->private_data;
1614
1615
bio = bio_split_to_limits(bio);
1616
if (!bio)
1617
return;
1618
1619
/*
1620
* what we "blindly" assume:
1621
*/
1622
D_ASSERT(device, IS_ALIGNED(bio->bi_iter.bi_size, 512));
1623
1624
inc_ap_bio(device);
1625
__drbd_make_request(device, bio);
1626
}
1627
1628
static bool net_timeout_reached(struct drbd_request *net_req,
1629
struct drbd_connection *connection,
1630
unsigned long now, unsigned long ent,
1631
unsigned int ko_count, unsigned int timeout)
1632
{
1633
struct drbd_device *device = net_req->device;
1634
1635
if (!time_after(now, net_req->pre_send_jif + ent))
1636
return false;
1637
1638
if (time_in_range(now, connection->last_reconnect_jif, connection->last_reconnect_jif + ent))
1639
return false;
1640
1641
if (net_req->rq_state & RQ_NET_PENDING) {
1642
drbd_warn(device, "Remote failed to finish a request within %ums > ko-count (%u) * timeout (%u * 0.1s)\n",
1643
jiffies_to_msecs(now - net_req->pre_send_jif), ko_count, timeout);
1644
return true;
1645
}
1646
1647
/* We received an ACK already (or are using protocol A),
1648
* but are waiting for the epoch closing barrier ack.
1649
* Check if we sent the barrier already. We should not blame the peer
1650
* for being unresponsive, if we did not even ask it yet. */
1651
if (net_req->epoch == connection->send.current_epoch_nr) {
1652
drbd_warn(device,
1653
"We did not send a P_BARRIER for %ums > ko-count (%u) * timeout (%u * 0.1s); drbd kernel thread blocked?\n",
1654
jiffies_to_msecs(now - net_req->pre_send_jif), ko_count, timeout);
1655
return false;
1656
}
1657
1658
/* Worst case: we may have been blocked for whatever reason, then
1659
* suddenly are able to send a lot of requests (and epoch separating
1660
* barriers) in quick succession.
1661
* The timestamp of the net_req may be much too old and not correspond
1662
* to the sending time of the relevant unack'ed barrier packet, so
1663
* would trigger a spurious timeout. The latest barrier packet may
1664
* have a too recent timestamp to trigger the timeout, potentially miss
1665
* a timeout. Right now we don't have a place to conveniently store
1666
* these timestamps.
1667
* But in this particular situation, the application requests are still
1668
* completed to upper layers, DRBD should still "feel" responsive.
1669
* No need yet to kill this connection, it may still recover.
1670
* If not, eventually we will have queued enough into the network for
1671
* us to block. From that point of view, the timestamp of the last sent
1672
* barrier packet is relevant enough.
1673
*/
1674
if (time_after(now, connection->send.last_sent_barrier_jif + ent)) {
1675
drbd_warn(device, "Remote failed to answer a P_BARRIER (sent at %lu jif; now=%lu jif) within %ums > ko-count (%u) * timeout (%u * 0.1s)\n",
1676
connection->send.last_sent_barrier_jif, now,
1677
jiffies_to_msecs(now - connection->send.last_sent_barrier_jif), ko_count, timeout);
1678
return true;
1679
}
1680
return false;
1681
}
1682
1683
/* A request is considered timed out, if
1684
* - we have some effective timeout from the configuration,
1685
* with some state restrictions applied,
1686
* - the oldest request is waiting for a response from the network
1687
* resp. the local disk,
1688
* - the oldest request is in fact older than the effective timeout,
1689
* - the connection was established (resp. disk was attached)
1690
* for longer than the timeout already.
1691
* Note that for 32bit jiffies and very stable connections/disks,
1692
* we may have a wrap around, which is catched by
1693
* !time_in_range(now, last_..._jif, last_..._jif + timeout).
1694
*
1695
* Side effect: once per 32bit wrap-around interval, which means every
1696
* ~198 days with 250 HZ, we have a window where the timeout would need
1697
* to expire twice (worst case) to become effective. Good enough.
1698
*/
1699
1700
void request_timer_fn(struct timer_list *t)
1701
{
1702
struct drbd_device *device = timer_container_of(device, t,
1703
request_timer);
1704
struct drbd_connection *connection = first_peer_device(device)->connection;
1705
struct drbd_request *req_read, *req_write, *req_peer; /* oldest request */
1706
struct net_conf *nc;
1707
unsigned long oldest_submit_jif;
1708
unsigned long ent = 0, dt = 0, et, nt; /* effective timeout = ko_count * timeout */
1709
unsigned long now;
1710
unsigned int ko_count = 0, timeout = 0;
1711
1712
rcu_read_lock();
1713
nc = rcu_dereference(connection->net_conf);
1714
if (nc && device->state.conn >= C_WF_REPORT_PARAMS) {
1715
ko_count = nc->ko_count;
1716
timeout = nc->timeout;
1717
}
1718
1719
if (get_ldev(device)) { /* implicit state.disk >= D_INCONSISTENT */
1720
dt = rcu_dereference(device->ldev->disk_conf)->disk_timeout * HZ / 10;
1721
put_ldev(device);
1722
}
1723
rcu_read_unlock();
1724
1725
1726
ent = timeout * HZ/10 * ko_count;
1727
et = min_not_zero(dt, ent);
1728
1729
if (!et)
1730
return; /* Recurring timer stopped */
1731
1732
now = jiffies;
1733
nt = now + et;
1734
1735
spin_lock_irq(&device->resource->req_lock);
1736
req_read = list_first_entry_or_null(&device->pending_completion[0], struct drbd_request, req_pending_local);
1737
req_write = list_first_entry_or_null(&device->pending_completion[1], struct drbd_request, req_pending_local);
1738
1739
/* maybe the oldest request waiting for the peer is in fact still
1740
* blocking in tcp sendmsg. That's ok, though, that's handled via the
1741
* socket send timeout, requesting a ping, and bumping ko-count in
1742
* we_should_drop_the_connection().
1743
*/
1744
1745
/* check the oldest request we did successfully sent,
1746
* but which is still waiting for an ACK. */
1747
req_peer = connection->req_ack_pending;
1748
1749
/* if we don't have such request (e.g. protocoll A)
1750
* check the oldest requests which is still waiting on its epoch
1751
* closing barrier ack. */
1752
if (!req_peer)
1753
req_peer = connection->req_not_net_done;
1754
1755
/* evaluate the oldest peer request only in one timer! */
1756
if (req_peer && req_peer->device != device)
1757
req_peer = NULL;
1758
1759
/* do we have something to evaluate? */
1760
if (req_peer == NULL && req_write == NULL && req_read == NULL)
1761
goto out;
1762
1763
oldest_submit_jif =
1764
(req_write && req_read)
1765
? ( time_before(req_write->pre_submit_jif, req_read->pre_submit_jif)
1766
? req_write->pre_submit_jif : req_read->pre_submit_jif )
1767
: req_write ? req_write->pre_submit_jif
1768
: req_read ? req_read->pre_submit_jif : now;
1769
1770
if (ent && req_peer && net_timeout_reached(req_peer, connection, now, ent, ko_count, timeout))
1771
_conn_request_state(connection, NS(conn, C_TIMEOUT), CS_VERBOSE | CS_HARD);
1772
1773
if (dt && oldest_submit_jif != now &&
1774
time_after(now, oldest_submit_jif + dt) &&
1775
!time_in_range(now, device->last_reattach_jif, device->last_reattach_jif + dt)) {
1776
drbd_warn(device, "Local backing device failed to meet the disk-timeout\n");
1777
__drbd_chk_io_error(device, DRBD_FORCE_DETACH);
1778
}
1779
1780
/* Reschedule timer for the nearest not already expired timeout.
1781
* Fallback to now + min(effective network timeout, disk timeout). */
1782
ent = (ent && req_peer && time_before(now, req_peer->pre_send_jif + ent))
1783
? req_peer->pre_send_jif + ent : now + et;
1784
dt = (dt && oldest_submit_jif != now && time_before(now, oldest_submit_jif + dt))
1785
? oldest_submit_jif + dt : now + et;
1786
nt = time_before(ent, dt) ? ent : dt;
1787
out:
1788
spin_unlock_irq(&device->resource->req_lock);
1789
mod_timer(&device->request_timer, nt);
1790
}
1791
1792