/*1drbd_req.c23This file is part of DRBD by Philipp Reisner and Lars Ellenberg.45Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.6Copyright (C) 1999-2008, Philipp Reisner <[email protected]>.7Copyright (C) 2002-2008, Lars Ellenberg <[email protected]>.89drbd is free software; you can redistribute it and/or modify10it under the terms of the GNU General Public License as published by11the Free Software Foundation; either version 2, or (at your option)12any later version.1314drbd is distributed in the hope that it will be useful,15but WITHOUT ANY WARRANTY; without even the implied warranty of16MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the17GNU General Public License for more details.1819You should have received a copy of the GNU General Public License20along with drbd; see the file COPYING. If not, write to21the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.2223*/2425#include <linux/module.h>2627#include <linux/slab.h>28#include <linux/drbd.h>29#include "drbd_int.h"30#include "drbd_req.h"313233/* Update disk stats at start of I/O request */34static void _drbd_start_io_acct(struct drbd_conf *mdev, struct drbd_request *req, struct bio *bio)35{36const int rw = bio_data_dir(bio);37int cpu;38cpu = part_stat_lock();39part_stat_inc(cpu, &mdev->vdisk->part0, ios[rw]);40part_stat_add(cpu, &mdev->vdisk->part0, sectors[rw], bio_sectors(bio));41part_inc_in_flight(&mdev->vdisk->part0, rw);42part_stat_unlock();43}4445/* Update disk stats when completing request upwards */46static void _drbd_end_io_acct(struct drbd_conf *mdev, struct drbd_request *req)47{48int rw = bio_data_dir(req->master_bio);49unsigned long duration = jiffies - req->start_time;50int cpu;51cpu = part_stat_lock();52part_stat_add(cpu, &mdev->vdisk->part0, ticks[rw], duration);53part_round_stats(cpu, &mdev->vdisk->part0);54part_dec_in_flight(&mdev->vdisk->part0, rw);55part_stat_unlock();56}5758static void _req_is_done(struct drbd_conf *mdev, struct drbd_request *req, const int rw)59{60const unsigned long s = req->rq_state;6162/* remove it from the transfer log.63* well, only if it had been there in the first64* place... if it had not (local only or conflicting65* and never sent), it should still be "empty" as66* initialized in drbd_req_new(), so we can list_del() it67* here unconditionally */68list_del(&req->tl_requests);6970/* if it was a write, we may have to set the corresponding71* bit(s) out-of-sync first. If it had a local part, we need to72* release the reference to the activity log. */73if (rw == WRITE) {74/* Set out-of-sync unless both OK flags are set75* (local only or remote failed).76* Other places where we set out-of-sync:77* READ with local io-error */78if (!(s & RQ_NET_OK) || !(s & RQ_LOCAL_OK))79drbd_set_out_of_sync(mdev, req->sector, req->size);8081if ((s & RQ_NET_OK) && (s & RQ_LOCAL_OK) && (s & RQ_NET_SIS))82drbd_set_in_sync(mdev, req->sector, req->size);8384/* one might be tempted to move the drbd_al_complete_io85* to the local io completion callback drbd_endio_pri.86* but, if this was a mirror write, we may only87* drbd_al_complete_io after this is RQ_NET_DONE,88* otherwise the extent could be dropped from the al89* before it has actually been written on the peer.90* if we crash before our peer knows about the request,91* but after the extent has been dropped from the al,92* we would forget to resync the corresponding extent.93*/94if (s & RQ_LOCAL_MASK) {95if (get_ldev_if_state(mdev, D_FAILED)) {96if (s & RQ_IN_ACT_LOG)97drbd_al_complete_io(mdev, req->sector);98put_ldev(mdev);99} else if (__ratelimit(&drbd_ratelimit_state)) {100dev_warn(DEV, "Should have called drbd_al_complete_io(, %llu), "101"but my Disk seems to have failed :(\n",102(unsigned long long) req->sector);103}104}105}106107drbd_req_free(req);108}109110static void queue_barrier(struct drbd_conf *mdev)111{112struct drbd_tl_epoch *b;113114/* We are within the req_lock. Once we queued the barrier for sending,115* we set the CREATE_BARRIER bit. It is cleared as soon as a new116* barrier/epoch object is added. This is the only place this bit is117* set. It indicates that the barrier for this epoch is already queued,118* and no new epoch has been created yet. */119if (test_bit(CREATE_BARRIER, &mdev->flags))120return;121122b = mdev->newest_tle;123b->w.cb = w_send_barrier;124/* inc_ap_pending done here, so we won't125* get imbalanced on connection loss.126* dec_ap_pending will be done in got_BarrierAck127* or (on connection loss) in tl_clear. */128inc_ap_pending(mdev);129drbd_queue_work(&mdev->data.work, &b->w);130set_bit(CREATE_BARRIER, &mdev->flags);131}132133static void _about_to_complete_local_write(struct drbd_conf *mdev,134struct drbd_request *req)135{136const unsigned long s = req->rq_state;137struct drbd_request *i;138struct drbd_epoch_entry *e;139struct hlist_node *n;140struct hlist_head *slot;141142/* Before we can signal completion to the upper layers,143* we may need to close the current epoch.144* We can skip this, if this request has not even been sent, because we145* did not have a fully established connection yet/anymore, during146* bitmap exchange, or while we are C_AHEAD due to congestion policy.147*/148if (mdev->state.conn >= C_CONNECTED &&149(s & RQ_NET_SENT) != 0 &&150req->epoch == mdev->newest_tle->br_number)151queue_barrier(mdev);152153/* we need to do the conflict detection stuff,154* if we have the ee_hash (two_primaries) and155* this has been on the network */156if ((s & RQ_NET_DONE) && mdev->ee_hash != NULL) {157const sector_t sector = req->sector;158const int size = req->size;159160/* ASSERT:161* there must be no conflicting requests, since162* they must have been failed on the spot */163#define OVERLAPS overlaps(sector, size, i->sector, i->size)164slot = tl_hash_slot(mdev, sector);165hlist_for_each_entry(i, n, slot, collision) {166if (OVERLAPS) {167dev_alert(DEV, "LOGIC BUG: completed: %p %llus +%u; "168"other: %p %llus +%u\n",169req, (unsigned long long)sector, size,170i, (unsigned long long)i->sector, i->size);171}172}173174/* maybe "wake" those conflicting epoch entries175* that wait for this request to finish.176*177* currently, there can be only _one_ such ee178* (well, or some more, which would be pending179* P_DISCARD_ACK not yet sent by the asender...),180* since we block the receiver thread upon the181* first conflict detection, which will wait on182* misc_wait. maybe we want to assert that?183*184* anyways, if we found one,185* we just have to do a wake_up. */186#undef OVERLAPS187#define OVERLAPS overlaps(sector, size, e->sector, e->size)188slot = ee_hash_slot(mdev, req->sector);189hlist_for_each_entry(e, n, slot, collision) {190if (OVERLAPS) {191wake_up(&mdev->misc_wait);192break;193}194}195}196#undef OVERLAPS197}198199void complete_master_bio(struct drbd_conf *mdev,200struct bio_and_error *m)201{202bio_endio(m->bio, m->error);203dec_ap_bio(mdev);204}205206/* Helper for __req_mod().207* Set m->bio to the master bio, if it is fit to be completed,208* or leave it alone (it is initialized to NULL in __req_mod),209* if it has already been completed, or cannot be completed yet.210* If m->bio is set, the error status to be returned is placed in m->error.211*/212void _req_may_be_done(struct drbd_request *req, struct bio_and_error *m)213{214const unsigned long s = req->rq_state;215struct drbd_conf *mdev = req->mdev;216/* only WRITES may end up here without a master bio (on barrier ack) */217int rw = req->master_bio ? bio_data_dir(req->master_bio) : WRITE;218219/* we must not complete the master bio, while it is220* still being processed by _drbd_send_zc_bio (drbd_send_dblock)221* not yet acknowledged by the peer222* not yet completed by the local io subsystem223* these flags may get cleared in any order by224* the worker,225* the receiver,226* the bio_endio completion callbacks.227*/228if (s & RQ_NET_QUEUED)229return;230if (s & RQ_NET_PENDING)231return;232if (s & RQ_LOCAL_PENDING)233return;234235if (req->master_bio) {236/* this is data_received (remote read)237* or protocol C P_WRITE_ACK238* or protocol B P_RECV_ACK239* or protocol A "handed_over_to_network" (SendAck)240* or canceled or failed,241* or killed from the transfer log due to connection loss.242*/243244/*245* figure out whether to report success or failure.246*247* report success when at least one of the operations succeeded.248* or, to put the other way,249* only report failure, when both operations failed.250*251* what to do about the failures is handled elsewhere.252* what we need to do here is just: complete the master_bio.253*254* local completion error, if any, has been stored as ERR_PTR255* in private_bio within drbd_endio_pri.256*/257int ok = (s & RQ_LOCAL_OK) || (s & RQ_NET_OK);258int error = PTR_ERR(req->private_bio);259260/* remove the request from the conflict detection261* respective block_id verification hash */262if (!hlist_unhashed(&req->collision))263hlist_del(&req->collision);264else265D_ASSERT((s & (RQ_NET_MASK & ~RQ_NET_DONE)) == 0);266267/* for writes we need to do some extra housekeeping */268if (rw == WRITE)269_about_to_complete_local_write(mdev, req);270271/* Update disk stats */272_drbd_end_io_acct(mdev, req);273274m->error = ok ? 0 : (error ?: -EIO);275m->bio = req->master_bio;276req->master_bio = NULL;277}278279if ((s & RQ_NET_MASK) == 0 || (s & RQ_NET_DONE)) {280/* this is disconnected (local only) operation,281* or protocol C P_WRITE_ACK,282* or protocol A or B P_BARRIER_ACK,283* or killed from the transfer log due to connection loss. */284_req_is_done(mdev, req, rw);285}286/* else: network part and not DONE yet. that is287* protocol A or B, barrier ack still pending... */288}289290static void _req_may_be_done_not_susp(struct drbd_request *req, struct bio_and_error *m)291{292struct drbd_conf *mdev = req->mdev;293294if (!is_susp(mdev->state))295_req_may_be_done(req, m);296}297298/*299* checks whether there was an overlapping request300* or ee already registered.301*302* if so, return 1, in which case this request is completed on the spot,303* without ever being submitted or send.304*305* return 0 if it is ok to submit this request.306*307* NOTE:308* paranoia: assume something above us is broken, and issues different write309* requests for the same block simultaneously...310*311* To ensure these won't be reordered differently on both nodes, resulting in312* diverging data sets, we discard the later one(s). Not that this is supposed313* to happen, but this is the rationale why we also have to check for314* conflicting requests with local origin, and why we have to do so regardless315* of whether we allowed multiple primaries.316*317* BTW, in case we only have one primary, the ee_hash is empty anyways, and the318* second hlist_for_each_entry becomes a noop. This is even simpler than to319* grab a reference on the net_conf, and check for the two_primaries flag...320*/321static int _req_conflicts(struct drbd_request *req)322{323struct drbd_conf *mdev = req->mdev;324const sector_t sector = req->sector;325const int size = req->size;326struct drbd_request *i;327struct drbd_epoch_entry *e;328struct hlist_node *n;329struct hlist_head *slot;330331D_ASSERT(hlist_unhashed(&req->collision));332333if (!get_net_conf(mdev))334return 0;335336/* BUG_ON */337ERR_IF (mdev->tl_hash_s == 0)338goto out_no_conflict;339BUG_ON(mdev->tl_hash == NULL);340341#define OVERLAPS overlaps(i->sector, i->size, sector, size)342slot = tl_hash_slot(mdev, sector);343hlist_for_each_entry(i, n, slot, collision) {344if (OVERLAPS) {345dev_alert(DEV, "%s[%u] Concurrent local write detected! "346"[DISCARD L] new: %llus +%u; "347"pending: %llus +%u\n",348current->comm, current->pid,349(unsigned long long)sector, size,350(unsigned long long)i->sector, i->size);351goto out_conflict;352}353}354355if (mdev->ee_hash_s) {356/* now, check for overlapping requests with remote origin */357BUG_ON(mdev->ee_hash == NULL);358#undef OVERLAPS359#define OVERLAPS overlaps(e->sector, e->size, sector, size)360slot = ee_hash_slot(mdev, sector);361hlist_for_each_entry(e, n, slot, collision) {362if (OVERLAPS) {363dev_alert(DEV, "%s[%u] Concurrent remote write detected!"364" [DISCARD L] new: %llus +%u; "365"pending: %llus +%u\n",366current->comm, current->pid,367(unsigned long long)sector, size,368(unsigned long long)e->sector, e->size);369goto out_conflict;370}371}372}373#undef OVERLAPS374375out_no_conflict:376/* this is like it should be, and what we expected.377* our users do behave after all... */378put_net_conf(mdev);379return 0;380381out_conflict:382put_net_conf(mdev);383return 1;384}385386/* obviously this could be coded as many single functions387* instead of one huge switch,388* or by putting the code directly in the respective locations389* (as it has been before).390*391* but having it this way392* enforces that it is all in this one place, where it is easier to audit,393* it makes it obvious that whatever "event" "happens" to a request should394* happen "atomically" within the req_lock,395* and it enforces that we have to think in a very structured manner396* about the "events" that may happen to a request during its life time ...397*/398int __req_mod(struct drbd_request *req, enum drbd_req_event what,399struct bio_and_error *m)400{401struct drbd_conf *mdev = req->mdev;402int rv = 0;403m->bio = NULL;404405switch (what) {406default:407dev_err(DEV, "LOGIC BUG in %s:%u\n", __FILE__ , __LINE__);408break;409410/* does not happen...411* initialization done in drbd_req_new412case created:413break;414*/415416case to_be_send: /* via network */417/* reached via drbd_make_request_common418* and from w_read_retry_remote */419D_ASSERT(!(req->rq_state & RQ_NET_MASK));420req->rq_state |= RQ_NET_PENDING;421inc_ap_pending(mdev);422break;423424case to_be_submitted: /* locally */425/* reached via drbd_make_request_common */426D_ASSERT(!(req->rq_state & RQ_LOCAL_MASK));427req->rq_state |= RQ_LOCAL_PENDING;428break;429430case completed_ok:431if (bio_data_dir(req->master_bio) == WRITE)432mdev->writ_cnt += req->size>>9;433else434mdev->read_cnt += req->size>>9;435436req->rq_state |= (RQ_LOCAL_COMPLETED|RQ_LOCAL_OK);437req->rq_state &= ~RQ_LOCAL_PENDING;438439_req_may_be_done_not_susp(req, m);440put_ldev(mdev);441break;442443case write_completed_with_error:444req->rq_state |= RQ_LOCAL_COMPLETED;445req->rq_state &= ~RQ_LOCAL_PENDING;446447__drbd_chk_io_error(mdev, false);448_req_may_be_done_not_susp(req, m);449put_ldev(mdev);450break;451452case read_ahead_completed_with_error:453/* it is legal to fail READA */454req->rq_state |= RQ_LOCAL_COMPLETED;455req->rq_state &= ~RQ_LOCAL_PENDING;456_req_may_be_done_not_susp(req, m);457put_ldev(mdev);458break;459460case read_completed_with_error:461drbd_set_out_of_sync(mdev, req->sector, req->size);462463req->rq_state |= RQ_LOCAL_COMPLETED;464req->rq_state &= ~RQ_LOCAL_PENDING;465466D_ASSERT(!(req->rq_state & RQ_NET_MASK));467468__drbd_chk_io_error(mdev, false);469put_ldev(mdev);470471/* no point in retrying if there is no good remote data,472* or we have no connection. */473if (mdev->state.pdsk != D_UP_TO_DATE) {474_req_may_be_done_not_susp(req, m);475break;476}477478/* _req_mod(req,to_be_send); oops, recursion... */479req->rq_state |= RQ_NET_PENDING;480inc_ap_pending(mdev);481/* fall through: _req_mod(req,queue_for_net_read); */482483case queue_for_net_read:484/* READ or READA, and485* no local disk,486* or target area marked as invalid,487* or just got an io-error. */488/* from drbd_make_request_common489* or from bio_endio during read io-error recovery */490491/* so we can verify the handle in the answer packet492* corresponding hlist_del is in _req_may_be_done() */493hlist_add_head(&req->collision, ar_hash_slot(mdev, req->sector));494495set_bit(UNPLUG_REMOTE, &mdev->flags);496497D_ASSERT(req->rq_state & RQ_NET_PENDING);498req->rq_state |= RQ_NET_QUEUED;499req->w.cb = (req->rq_state & RQ_LOCAL_MASK)500? w_read_retry_remote501: w_send_read_req;502drbd_queue_work(&mdev->data.work, &req->w);503break;504505case queue_for_net_write:506/* assert something? */507/* from drbd_make_request_common only */508509hlist_add_head(&req->collision, tl_hash_slot(mdev, req->sector));510/* corresponding hlist_del is in _req_may_be_done() */511512/* NOTE513* In case the req ended up on the transfer log before being514* queued on the worker, it could lead to this request being515* missed during cleanup after connection loss.516* So we have to do both operations here,517* within the same lock that protects the transfer log.518*519* _req_add_to_epoch(req); this has to be after the520* _maybe_start_new_epoch(req); which happened in521* drbd_make_request_common, because we now may set the bit522* again ourselves to close the current epoch.523*524* Add req to the (now) current epoch (barrier). */525526/* otherwise we may lose an unplug, which may cause some remote527* io-scheduler timeout to expire, increasing maximum latency,528* hurting performance. */529set_bit(UNPLUG_REMOTE, &mdev->flags);530531/* see drbd_make_request_common,532* just after it grabs the req_lock */533D_ASSERT(test_bit(CREATE_BARRIER, &mdev->flags) == 0);534535req->epoch = mdev->newest_tle->br_number;536537/* increment size of current epoch */538mdev->newest_tle->n_writes++;539540/* queue work item to send data */541D_ASSERT(req->rq_state & RQ_NET_PENDING);542req->rq_state |= RQ_NET_QUEUED;543req->w.cb = w_send_dblock;544drbd_queue_work(&mdev->data.work, &req->w);545546/* close the epoch, in case it outgrew the limit */547if (mdev->newest_tle->n_writes >= mdev->net_conf->max_epoch_size)548queue_barrier(mdev);549550break;551552case queue_for_send_oos:553req->rq_state |= RQ_NET_QUEUED;554req->w.cb = w_send_oos;555drbd_queue_work(&mdev->data.work, &req->w);556break;557558case oos_handed_to_network:559/* actually the same */560case send_canceled:561/* treat it the same */562case send_failed:563/* real cleanup will be done from tl_clear. just update flags564* so it is no longer marked as on the worker queue */565req->rq_state &= ~RQ_NET_QUEUED;566/* if we did it right, tl_clear should be scheduled only after567* this, so this should not be necessary! */568_req_may_be_done_not_susp(req, m);569break;570571case handed_over_to_network:572/* assert something? */573if (bio_data_dir(req->master_bio) == WRITE)574atomic_add(req->size>>9, &mdev->ap_in_flight);575576if (bio_data_dir(req->master_bio) == WRITE &&577mdev->net_conf->wire_protocol == DRBD_PROT_A) {578/* this is what is dangerous about protocol A:579* pretend it was successfully written on the peer. */580if (req->rq_state & RQ_NET_PENDING) {581dec_ap_pending(mdev);582req->rq_state &= ~RQ_NET_PENDING;583req->rq_state |= RQ_NET_OK;584} /* else: neg-ack was faster... */585/* it is still not yet RQ_NET_DONE until the586* corresponding epoch barrier got acked as well,587* so we know what to dirty on connection loss */588}589req->rq_state &= ~RQ_NET_QUEUED;590req->rq_state |= RQ_NET_SENT;591/* because _drbd_send_zc_bio could sleep, and may want to592* dereference the bio even after the "write_acked_by_peer" and593* "completed_ok" events came in, once we return from594* _drbd_send_zc_bio (drbd_send_dblock), we have to check595* whether it is done already, and end it. */596_req_may_be_done_not_susp(req, m);597break;598599case read_retry_remote_canceled:600req->rq_state &= ~RQ_NET_QUEUED;601/* fall through, in case we raced with drbd_disconnect */602case connection_lost_while_pending:603/* transfer log cleanup after connection loss */604/* assert something? */605if (req->rq_state & RQ_NET_PENDING)606dec_ap_pending(mdev);607req->rq_state &= ~(RQ_NET_OK|RQ_NET_PENDING);608req->rq_state |= RQ_NET_DONE;609if (req->rq_state & RQ_NET_SENT && req->rq_state & RQ_WRITE)610atomic_sub(req->size>>9, &mdev->ap_in_flight);611612/* if it is still queued, we may not complete it here.613* it will be canceled soon. */614if (!(req->rq_state & RQ_NET_QUEUED))615_req_may_be_done(req, m); /* Allowed while state.susp */616break;617618case write_acked_by_peer_and_sis:619req->rq_state |= RQ_NET_SIS;620case conflict_discarded_by_peer:621/* for discarded conflicting writes of multiple primaries,622* there is no need to keep anything in the tl, potential623* node crashes are covered by the activity log. */624if (what == conflict_discarded_by_peer)625dev_alert(DEV, "Got DiscardAck packet %llus +%u!"626" DRBD is not a random data generator!\n",627(unsigned long long)req->sector, req->size);628req->rq_state |= RQ_NET_DONE;629/* fall through */630case write_acked_by_peer:631/* protocol C; successfully written on peer.632* Nothing to do here.633* We want to keep the tl in place for all protocols, to cater634* for volatile write-back caches on lower level devices.635*636* A barrier request is expected to have forced all prior637* requests onto stable storage, so completion of a barrier638* request could set NET_DONE right here, and not wait for the639* P_BARRIER_ACK, but that is an unnecessary optimization. */640641/* this makes it effectively the same as for: */642case recv_acked_by_peer:643/* protocol B; pretends to be successfully written on peer.644* see also notes above in handed_over_to_network about645* protocol != C */646req->rq_state |= RQ_NET_OK;647D_ASSERT(req->rq_state & RQ_NET_PENDING);648dec_ap_pending(mdev);649atomic_sub(req->size>>9, &mdev->ap_in_flight);650req->rq_state &= ~RQ_NET_PENDING;651_req_may_be_done_not_susp(req, m);652break;653654case neg_acked:655/* assert something? */656if (req->rq_state & RQ_NET_PENDING) {657dec_ap_pending(mdev);658atomic_sub(req->size>>9, &mdev->ap_in_flight);659}660req->rq_state &= ~(RQ_NET_OK|RQ_NET_PENDING);661662req->rq_state |= RQ_NET_DONE;663_req_may_be_done_not_susp(req, m);664/* else: done by handed_over_to_network */665break;666667case fail_frozen_disk_io:668if (!(req->rq_state & RQ_LOCAL_COMPLETED))669break;670671_req_may_be_done(req, m); /* Allowed while state.susp */672break;673674case restart_frozen_disk_io:675if (!(req->rq_state & RQ_LOCAL_COMPLETED))676break;677678req->rq_state &= ~RQ_LOCAL_COMPLETED;679680rv = MR_READ;681if (bio_data_dir(req->master_bio) == WRITE)682rv = MR_WRITE;683684get_ldev(mdev);685req->w.cb = w_restart_disk_io;686drbd_queue_work(&mdev->data.work, &req->w);687break;688689case resend:690/* If RQ_NET_OK is already set, we got a P_WRITE_ACK or P_RECV_ACK691before the connection loss (B&C only); only P_BARRIER_ACK was missing.692Trowing them out of the TL here by pretending we got a BARRIER_ACK693We ensure that the peer was not rebooted */694if (!(req->rq_state & RQ_NET_OK)) {695if (req->w.cb) {696drbd_queue_work(&mdev->data.work, &req->w);697rv = req->rq_state & RQ_WRITE ? MR_WRITE : MR_READ;698}699break;700}701/* else, fall through to barrier_acked */702703case barrier_acked:704if (!(req->rq_state & RQ_WRITE))705break;706707if (req->rq_state & RQ_NET_PENDING) {708/* barrier came in before all requests have been acked.709* this is bad, because if the connection is lost now,710* we won't be able to clean them up... */711dev_err(DEV, "FIXME (barrier_acked but pending)\n");712list_move(&req->tl_requests, &mdev->out_of_sequence_requests);713}714if ((req->rq_state & RQ_NET_MASK) != 0) {715req->rq_state |= RQ_NET_DONE;716if (mdev->net_conf->wire_protocol == DRBD_PROT_A)717atomic_sub(req->size>>9, &mdev->ap_in_flight);718}719_req_may_be_done(req, m); /* Allowed while state.susp */720break;721722case data_received:723D_ASSERT(req->rq_state & RQ_NET_PENDING);724dec_ap_pending(mdev);725req->rq_state &= ~RQ_NET_PENDING;726req->rq_state |= (RQ_NET_OK|RQ_NET_DONE);727_req_may_be_done_not_susp(req, m);728break;729};730731return rv;732}733734/* we may do a local read if:735* - we are consistent (of course),736* - or we are generally inconsistent,737* BUT we are still/already IN SYNC for this area.738* since size may be bigger than BM_BLOCK_SIZE,739* we may need to check several bits.740*/741static int drbd_may_do_local_read(struct drbd_conf *mdev, sector_t sector, int size)742{743unsigned long sbnr, ebnr;744sector_t esector, nr_sectors;745746if (mdev->state.disk == D_UP_TO_DATE)747return 1;748if (mdev->state.disk >= D_OUTDATED)749return 0;750if (mdev->state.disk < D_INCONSISTENT)751return 0;752/* state.disk == D_INCONSISTENT We will have a look at the BitMap */753nr_sectors = drbd_get_capacity(mdev->this_bdev);754esector = sector + (size >> 9) - 1;755756D_ASSERT(sector < nr_sectors);757D_ASSERT(esector < nr_sectors);758759sbnr = BM_SECT_TO_BIT(sector);760ebnr = BM_SECT_TO_BIT(esector);761762return 0 == drbd_bm_count_bits(mdev, sbnr, ebnr);763}764765static int drbd_make_request_common(struct drbd_conf *mdev, struct bio *bio, unsigned long start_time)766{767const int rw = bio_rw(bio);768const int size = bio->bi_size;769const sector_t sector = bio->bi_sector;770struct drbd_tl_epoch *b = NULL;771struct drbd_request *req;772int local, remote, send_oos = 0;773int err = -EIO;774int ret = 0;775776/* allocate outside of all locks; */777req = drbd_req_new(mdev, bio);778if (!req) {779dec_ap_bio(mdev);780/* only pass the error to the upper layers.781* if user cannot handle io errors, that's not our business. */782dev_err(DEV, "could not kmalloc() req\n");783bio_endio(bio, -ENOMEM);784return 0;785}786req->start_time = start_time;787788local = get_ldev(mdev);789if (!local) {790bio_put(req->private_bio); /* or we get a bio leak */791req->private_bio = NULL;792}793if (rw == WRITE) {794remote = 1;795} else {796/* READ || READA */797if (local) {798if (!drbd_may_do_local_read(mdev, sector, size)) {799/* we could kick the syncer to800* sync this extent asap, wait for801* it, then continue locally.802* Or just issue the request remotely.803*/804local = 0;805bio_put(req->private_bio);806req->private_bio = NULL;807put_ldev(mdev);808}809}810remote = !local && mdev->state.pdsk >= D_UP_TO_DATE;811}812813/* If we have a disk, but a READA request is mapped to remote,814* we are R_PRIMARY, D_INCONSISTENT, SyncTarget.815* Just fail that READA request right here.816*817* THINK: maybe fail all READA when not local?818* or make this configurable...819* if network is slow, READA won't do any good.820*/821if (rw == READA && mdev->state.disk >= D_INCONSISTENT && !local) {822err = -EWOULDBLOCK;823goto fail_and_free_req;824}825826/* For WRITES going to the local disk, grab a reference on the target827* extent. This waits for any resync activity in the corresponding828* resync extent to finish, and, if necessary, pulls in the target829* extent into the activity log, which involves further disk io because830* of transactional on-disk meta data updates. */831if (rw == WRITE && local && !test_bit(AL_SUSPENDED, &mdev->flags)) {832req->rq_state |= RQ_IN_ACT_LOG;833drbd_al_begin_io(mdev, sector);834}835836remote = remote && drbd_should_do_remote(mdev->state);837send_oos = rw == WRITE && drbd_should_send_oos(mdev->state);838D_ASSERT(!(remote && send_oos));839840if (!(local || remote) && !is_susp(mdev->state)) {841if (__ratelimit(&drbd_ratelimit_state))842dev_err(DEV, "IO ERROR: neither local nor remote disk\n");843goto fail_free_complete;844}845846/* For WRITE request, we have to make sure that we have an847* unused_spare_tle, in case we need to start a new epoch.848* I try to be smart and avoid to pre-allocate always "just in case",849* but there is a race between testing the bit and pointer outside the850* spinlock, and grabbing the spinlock.851* if we lost that race, we retry. */852if (rw == WRITE && (remote || send_oos) &&853mdev->unused_spare_tle == NULL &&854test_bit(CREATE_BARRIER, &mdev->flags)) {855allocate_barrier:856b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_NOIO);857if (!b) {858dev_err(DEV, "Failed to alloc barrier.\n");859err = -ENOMEM;860goto fail_free_complete;861}862}863864/* GOOD, everything prepared, grab the spin_lock */865spin_lock_irq(&mdev->req_lock);866867if (is_susp(mdev->state)) {868/* If we got suspended, use the retry mechanism of869generic_make_request() to restart processing of this870bio. In the next call to drbd_make_request871we sleep in inc_ap_bio() */872ret = 1;873spin_unlock_irq(&mdev->req_lock);874goto fail_free_complete;875}876877if (remote || send_oos) {878remote = drbd_should_do_remote(mdev->state);879send_oos = rw == WRITE && drbd_should_send_oos(mdev->state);880D_ASSERT(!(remote && send_oos));881882if (!(remote || send_oos))883dev_warn(DEV, "lost connection while grabbing the req_lock!\n");884if (!(local || remote)) {885dev_err(DEV, "IO ERROR: neither local nor remote disk\n");886spin_unlock_irq(&mdev->req_lock);887goto fail_free_complete;888}889}890891if (b && mdev->unused_spare_tle == NULL) {892mdev->unused_spare_tle = b;893b = NULL;894}895if (rw == WRITE && (remote || send_oos) &&896mdev->unused_spare_tle == NULL &&897test_bit(CREATE_BARRIER, &mdev->flags)) {898/* someone closed the current epoch899* while we were grabbing the spinlock */900spin_unlock_irq(&mdev->req_lock);901goto allocate_barrier;902}903904905/* Update disk stats */906_drbd_start_io_acct(mdev, req, bio);907908/* _maybe_start_new_epoch(mdev);909* If we need to generate a write barrier packet, we have to add the910* new epoch (barrier) object, and queue the barrier packet for sending,911* and queue the req's data after it _within the same lock_, otherwise912* we have race conditions were the reorder domains could be mixed up.913*914* Even read requests may start a new epoch and queue the corresponding915* barrier packet. To get the write ordering right, we only have to916* make sure that, if this is a write request and it triggered a917* barrier packet, this request is queued within the same spinlock. */918if ((remote || send_oos) && mdev->unused_spare_tle &&919test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {920_tl_add_barrier(mdev, mdev->unused_spare_tle);921mdev->unused_spare_tle = NULL;922} else {923D_ASSERT(!(remote && rw == WRITE &&924test_bit(CREATE_BARRIER, &mdev->flags)));925}926927/* NOTE928* Actually, 'local' may be wrong here already, since we may have failed929* to write to the meta data, and may become wrong anytime because of930* local io-error for some other request, which would lead to us931* "detaching" the local disk.932*933* 'remote' may become wrong any time because the network could fail.934*935* This is a harmless race condition, though, since it is handled936* correctly at the appropriate places; so it just defers the failure937* of the respective operation.938*/939940/* mark them early for readability.941* this just sets some state flags. */942if (remote)943_req_mod(req, to_be_send);944if (local)945_req_mod(req, to_be_submitted);946947/* check this request on the collision detection hash tables.948* if we have a conflict, just complete it here.949* THINK do we want to check reads, too? (I don't think so...) */950if (rw == WRITE && _req_conflicts(req))951goto fail_conflicting;952953list_add_tail(&req->tl_requests, &mdev->newest_tle->requests);954955/* NOTE remote first: to get the concurrent write detection right,956* we must register the request before start of local IO. */957if (remote) {958/* either WRITE and C_CONNECTED,959* or READ, and no local disk,960* or READ, but not in sync.961*/962_req_mod(req, (rw == WRITE)963? queue_for_net_write964: queue_for_net_read);965}966if (send_oos && drbd_set_out_of_sync(mdev, sector, size))967_req_mod(req, queue_for_send_oos);968969if (remote &&970mdev->net_conf->on_congestion != OC_BLOCK && mdev->agreed_pro_version >= 96) {971int congested = 0;972973if (mdev->net_conf->cong_fill &&974atomic_read(&mdev->ap_in_flight) >= mdev->net_conf->cong_fill) {975dev_info(DEV, "Congestion-fill threshold reached\n");976congested = 1;977}978979if (mdev->act_log->used >= mdev->net_conf->cong_extents) {980dev_info(DEV, "Congestion-extents threshold reached\n");981congested = 1;982}983984if (congested) {985queue_barrier(mdev); /* last barrier, after mirrored writes */986987if (mdev->net_conf->on_congestion == OC_PULL_AHEAD)988_drbd_set_state(_NS(mdev, conn, C_AHEAD), 0, NULL);989else /*mdev->net_conf->on_congestion == OC_DISCONNECT */990_drbd_set_state(_NS(mdev, conn, C_DISCONNECTING), 0, NULL);991}992}993994spin_unlock_irq(&mdev->req_lock);995kfree(b); /* if someone else has beaten us to it... */996997if (local) {998req->private_bio->bi_bdev = mdev->ldev->backing_bdev;9991000/* State may have changed since we grabbed our reference on the1001* mdev->ldev member. Double check, and short-circuit to endio.1002* In case the last activity log transaction failed to get on1003* stable storage, and this is a WRITE, we may not even submit1004* this bio. */1005if (get_ldev(mdev)) {1006if (drbd_insert_fault(mdev, rw == WRITE ? DRBD_FAULT_DT_WR1007: rw == READ ? DRBD_FAULT_DT_RD1008: DRBD_FAULT_DT_RA))1009bio_endio(req->private_bio, -EIO);1010else1011generic_make_request(req->private_bio);1012put_ldev(mdev);1013} else1014bio_endio(req->private_bio, -EIO);1015}10161017return 0;10181019fail_conflicting:1020/* this is a conflicting request.1021* even though it may have been only _partially_1022* overlapping with one of the currently pending requests,1023* without even submitting or sending it, we will1024* pretend that it was successfully served right now.1025*/1026_drbd_end_io_acct(mdev, req);1027spin_unlock_irq(&mdev->req_lock);1028if (remote)1029dec_ap_pending(mdev);1030/* THINK: do we want to fail it (-EIO), or pretend success?1031* this pretends success. */1032err = 0;10331034fail_free_complete:1035if (req->rq_state & RQ_IN_ACT_LOG)1036drbd_al_complete_io(mdev, sector);1037fail_and_free_req:1038if (local) {1039bio_put(req->private_bio);1040req->private_bio = NULL;1041put_ldev(mdev);1042}1043if (!ret)1044bio_endio(bio, err);10451046drbd_req_free(req);1047dec_ap_bio(mdev);1048kfree(b);10491050return ret;1051}10521053/* helper function for drbd_make_request1054* if we can determine just by the mdev (state) that this request will fail,1055* return 11056* otherwise return 01057*/1058static int drbd_fail_request_early(struct drbd_conf *mdev, int is_write)1059{1060if (mdev->state.role != R_PRIMARY &&1061(!allow_oos || is_write)) {1062if (__ratelimit(&drbd_ratelimit_state)) {1063dev_err(DEV, "Process %s[%u] tried to %s; "1064"since we are not in Primary state, "1065"we cannot allow this\n",1066current->comm, current->pid,1067is_write ? "WRITE" : "READ");1068}1069return 1;1070}10711072return 0;1073}10741075int drbd_make_request(struct request_queue *q, struct bio *bio)1076{1077unsigned int s_enr, e_enr;1078struct drbd_conf *mdev = (struct drbd_conf *) q->queuedata;1079unsigned long start_time;10801081if (drbd_fail_request_early(mdev, bio_data_dir(bio) & WRITE)) {1082bio_endio(bio, -EPERM);1083return 0;1084}10851086start_time = jiffies;10871088/*1089* what we "blindly" assume:1090*/1091D_ASSERT(bio->bi_size > 0);1092D_ASSERT((bio->bi_size & 0x1ff) == 0);1093D_ASSERT(bio->bi_idx == 0);10941095/* to make some things easier, force alignment of requests within the1096* granularity of our hash tables */1097s_enr = bio->bi_sector >> HT_SHIFT;1098e_enr = (bio->bi_sector+(bio->bi_size>>9)-1) >> HT_SHIFT;10991100if (likely(s_enr == e_enr)) {1101inc_ap_bio(mdev, 1);1102return drbd_make_request_common(mdev, bio, start_time);1103}11041105/* can this bio be split generically?1106* Maybe add our own split-arbitrary-bios function. */1107if (bio->bi_vcnt != 1 || bio->bi_idx != 0 || bio->bi_size > DRBD_MAX_BIO_SIZE) {1108/* rather error out here than BUG in bio_split */1109dev_err(DEV, "bio would need to, but cannot, be split: "1110"(vcnt=%u,idx=%u,size=%u,sector=%llu)\n",1111bio->bi_vcnt, bio->bi_idx, bio->bi_size,1112(unsigned long long)bio->bi_sector);1113bio_endio(bio, -EINVAL);1114} else {1115/* This bio crosses some boundary, so we have to split it. */1116struct bio_pair *bp;1117/* works for the "do not cross hash slot boundaries" case1118* e.g. sector 262269, size 40961119* s_enr = 262269 >> 6 = 40971120* e_enr = (262269+8-1) >> 6 = 40981121* HT_SHIFT = 61122* sps = 64, mask = 631123* first_sectors = 64 - (262269 & 63) = 31124*/1125const sector_t sect = bio->bi_sector;1126const int sps = 1 << HT_SHIFT; /* sectors per slot */1127const int mask = sps - 1;1128const sector_t first_sectors = sps - (sect & mask);1129bp = bio_split(bio, first_sectors);11301131/* we need to get a "reference count" (ap_bio_cnt)1132* to avoid races with the disconnect/reconnect/suspend code.1133* In case we need to split the bio here, we need to get three references1134* atomically, otherwise we might deadlock when trying to submit the1135* second one! */1136inc_ap_bio(mdev, 3);11371138D_ASSERT(e_enr == s_enr + 1);11391140while (drbd_make_request_common(mdev, &bp->bio1, start_time))1141inc_ap_bio(mdev, 1);11421143while (drbd_make_request_common(mdev, &bp->bio2, start_time))1144inc_ap_bio(mdev, 1);11451146dec_ap_bio(mdev);11471148bio_pair_release(bp);1149}1150return 0;1151}11521153/* This is called by bio_add_page(). With this function we reduce1154* the number of BIOs that span over multiple DRBD_MAX_BIO_SIZEs1155* units (was AL_EXTENTs).1156*1157* we do the calculation within the lower 32bit of the byte offsets,1158* since we don't care for actual offset, but only check whether it1159* would cross "activity log extent" boundaries.1160*1161* As long as the BIO is empty we have to allow at least one bvec,1162* regardless of size and offset. so the resulting bio may still1163* cross extent boundaries. those are dealt with (bio_split) in1164* drbd_make_request.1165*/1166int drbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bvm, struct bio_vec *bvec)1167{1168struct drbd_conf *mdev = (struct drbd_conf *) q->queuedata;1169unsigned int bio_offset =1170(unsigned int)bvm->bi_sector << 9; /* 32 bit */1171unsigned int bio_size = bvm->bi_size;1172int limit, backing_limit;11731174limit = DRBD_MAX_BIO_SIZE1175- ((bio_offset & (DRBD_MAX_BIO_SIZE-1)) + bio_size);1176if (limit < 0)1177limit = 0;1178if (bio_size == 0) {1179if (limit <= bvec->bv_len)1180limit = bvec->bv_len;1181} else if (limit && get_ldev(mdev)) {1182struct request_queue * const b =1183mdev->ldev->backing_bdev->bd_disk->queue;1184if (b->merge_bvec_fn) {1185backing_limit = b->merge_bvec_fn(b, bvm, bvec);1186limit = min(limit, backing_limit);1187}1188put_ldev(mdev);1189}1190return limit;1191}11921193void request_timer_fn(unsigned long data)1194{1195struct drbd_conf *mdev = (struct drbd_conf *) data;1196struct drbd_request *req; /* oldest request */1197struct list_head *le;1198unsigned long et = 0; /* effective timeout = ko_count * timeout */11991200if (get_net_conf(mdev)) {1201et = mdev->net_conf->timeout*HZ/10 * mdev->net_conf->ko_count;1202put_net_conf(mdev);1203}1204if (!et || mdev->state.conn < C_WF_REPORT_PARAMS)1205return; /* Recurring timer stopped */12061207spin_lock_irq(&mdev->req_lock);1208le = &mdev->oldest_tle->requests;1209if (list_empty(le)) {1210spin_unlock_irq(&mdev->req_lock);1211mod_timer(&mdev->request_timer, jiffies + et);1212return;1213}12141215le = le->prev;1216req = list_entry(le, struct drbd_request, tl_requests);1217if (time_is_before_eq_jiffies(req->start_time + et)) {1218if (req->rq_state & RQ_NET_PENDING) {1219dev_warn(DEV, "Remote failed to finish a request within ko-count * timeout\n");1220_drbd_set_state(_NS(mdev, conn, C_TIMEOUT), CS_VERBOSE, NULL);1221} else {1222dev_warn(DEV, "Local backing block device frozen?\n");1223mod_timer(&mdev->request_timer, jiffies + et);1224}1225} else {1226mod_timer(&mdev->request_timer, req->start_time + et);1227}12281229spin_unlock_irq(&mdev->req_lock);1230}123112321233