Path: blob/master/drivers/block/drbd/drbd_receiver.c
15180 views
/*1drbd_receiver.c23This file is part of DRBD by Philipp Reisner and Lars Ellenberg.45Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.6Copyright (C) 1999-2008, Philipp Reisner <[email protected]>.7Copyright (C) 2002-2008, Lars Ellenberg <[email protected]>.89drbd is free software; you can redistribute it and/or modify10it under the terms of the GNU General Public License as published by11the Free Software Foundation; either version 2, or (at your option)12any later version.1314drbd is distributed in the hope that it will be useful,15but WITHOUT ANY WARRANTY; without even the implied warranty of16MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the17GNU General Public License for more details.1819You should have received a copy of the GNU General Public License20along with drbd; see the file COPYING. If not, write to21the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.22*/232425#include <linux/module.h>2627#include <asm/uaccess.h>28#include <net/sock.h>2930#include <linux/drbd.h>31#include <linux/fs.h>32#include <linux/file.h>33#include <linux/in.h>34#include <linux/mm.h>35#include <linux/memcontrol.h>36#include <linux/mm_inline.h>37#include <linux/slab.h>38#include <linux/pkt_sched.h>39#define __KERNEL_SYSCALLS__40#include <linux/unistd.h>41#include <linux/vmalloc.h>42#include <linux/random.h>43#include <linux/string.h>44#include <linux/scatterlist.h>45#include "drbd_int.h"46#include "drbd_req.h"4748#include "drbd_vli.h"4950enum finish_epoch {51FE_STILL_LIVE,52FE_DESTROYED,53FE_RECYCLED,54};5556static int drbd_do_handshake(struct drbd_conf *mdev);57static int drbd_do_auth(struct drbd_conf *mdev);5859static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);60static int e_end_block(struct drbd_conf *, struct drbd_work *, int);616263#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)6465/*66* some helper functions to deal with single linked page lists,67* page->private being our "next" pointer.68*/6970/* If at least n pages are linked at head, get n pages off.71* Otherwise, don't modify head, and return NULL.72* Locking is the responsibility of the caller.73*/74static struct page *page_chain_del(struct page **head, int n)75{76struct page *page;77struct page *tmp;7879BUG_ON(!n);80BUG_ON(!head);8182page = *head;8384if (!page)85return NULL;8687while (page) {88tmp = page_chain_next(page);89if (--n == 0)90break; /* found sufficient pages */91if (tmp == NULL)92/* insufficient pages, don't use any of them. */93return NULL;94page = tmp;95}9697/* add end of list marker for the returned list */98set_page_private(page, 0);99/* actual return value, and adjustment of head */100page = *head;101*head = tmp;102return page;103}104105/* may be used outside of locks to find the tail of a (usually short)106* "private" page chain, before adding it back to a global chain head107* with page_chain_add() under a spinlock. */108static struct page *page_chain_tail(struct page *page, int *len)109{110struct page *tmp;111int i = 1;112while ((tmp = page_chain_next(page)))113++i, page = tmp;114if (len)115*len = i;116return page;117}118119static int page_chain_free(struct page *page)120{121struct page *tmp;122int i = 0;123page_chain_for_each_safe(page, tmp) {124put_page(page);125++i;126}127return i;128}129130static void page_chain_add(struct page **head,131struct page *chain_first, struct page *chain_last)132{133#if 1134struct page *tmp;135tmp = page_chain_tail(chain_first, NULL);136BUG_ON(tmp != chain_last);137#endif138139/* add chain to head */140set_page_private(chain_last, (unsigned long)*head);141*head = chain_first;142}143144static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)145{146struct page *page = NULL;147struct page *tmp = NULL;148int i = 0;149150/* Yes, testing drbd_pp_vacant outside the lock is racy.151* So what. It saves a spin_lock. */152if (drbd_pp_vacant >= number) {153spin_lock(&drbd_pp_lock);154page = page_chain_del(&drbd_pp_pool, number);155if (page)156drbd_pp_vacant -= number;157spin_unlock(&drbd_pp_lock);158if (page)159return page;160}161162/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD163* "criss-cross" setup, that might cause write-out on some other DRBD,164* which in turn might block on the other node at this very place. */165for (i = 0; i < number; i++) {166tmp = alloc_page(GFP_TRY);167if (!tmp)168break;169set_page_private(tmp, (unsigned long)page);170page = tmp;171}172173if (i == number)174return page;175176/* Not enough pages immediately available this time.177* No need to jump around here, drbd_pp_alloc will retry this178* function "soon". */179if (page) {180tmp = page_chain_tail(page, NULL);181spin_lock(&drbd_pp_lock);182page_chain_add(&drbd_pp_pool, page, tmp);183drbd_pp_vacant += i;184spin_unlock(&drbd_pp_lock);185}186return NULL;187}188189static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)190{191struct drbd_epoch_entry *e;192struct list_head *le, *tle;193194/* The EEs are always appended to the end of the list. Since195they are sent in order over the wire, they have to finish196in order. As soon as we see the first not finished we can197stop to examine the list... */198199list_for_each_safe(le, tle, &mdev->net_ee) {200e = list_entry(le, struct drbd_epoch_entry, w.list);201if (drbd_ee_has_active_page(e))202break;203list_move(le, to_be_freed);204}205}206207static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)208{209LIST_HEAD(reclaimed);210struct drbd_epoch_entry *e, *t;211212spin_lock_irq(&mdev->req_lock);213reclaim_net_ee(mdev, &reclaimed);214spin_unlock_irq(&mdev->req_lock);215216list_for_each_entry_safe(e, t, &reclaimed, w.list)217drbd_free_net_ee(mdev, e);218}219220/**221* drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)222* @mdev: DRBD device.223* @number: number of pages requested224* @retry: whether to retry, if not enough pages are available right now225*226* Tries to allocate number pages, first from our own page pool, then from227* the kernel, unless this allocation would exceed the max_buffers setting.228* Possibly retry until DRBD frees sufficient pages somewhere else.229*230* Returns a page chain linked via page->private.231*/232static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)233{234struct page *page = NULL;235DEFINE_WAIT(wait);236237/* Yes, we may run up to @number over max_buffers. If we238* follow it strictly, the admin will get it wrong anyways. */239if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)240page = drbd_pp_first_pages_or_try_alloc(mdev, number);241242while (page == NULL) {243prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);244245drbd_kick_lo_and_reclaim_net(mdev);246247if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {248page = drbd_pp_first_pages_or_try_alloc(mdev, number);249if (page)250break;251}252253if (!retry)254break;255256if (signal_pending(current)) {257dev_warn(DEV, "drbd_pp_alloc interrupted!\n");258break;259}260261schedule();262}263finish_wait(&drbd_pp_wait, &wait);264265if (page)266atomic_add(number, &mdev->pp_in_use);267return page;268}269270/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.271* Is also used from inside an other spin_lock_irq(&mdev->req_lock);272* Either links the page chain back to the global pool,273* or returns all pages to the system. */274static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)275{276atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;277int i;278279if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE)*minor_count)280i = page_chain_free(page);281else {282struct page *tmp;283tmp = page_chain_tail(page, &i);284spin_lock(&drbd_pp_lock);285page_chain_add(&drbd_pp_pool, page, tmp);286drbd_pp_vacant += i;287spin_unlock(&drbd_pp_lock);288}289i = atomic_sub_return(i, a);290if (i < 0)291dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",292is_net ? "pp_in_use_by_net" : "pp_in_use", i);293wake_up(&drbd_pp_wait);294}295296/*297You need to hold the req_lock:298_drbd_wait_ee_list_empty()299300You must not have the req_lock:301drbd_free_ee()302drbd_alloc_ee()303drbd_init_ee()304drbd_release_ee()305drbd_ee_fix_bhs()306drbd_process_done_ee()307drbd_clear_done_ee()308drbd_wait_ee_list_empty()309*/310311struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,312u64 id,313sector_t sector,314unsigned int data_size,315gfp_t gfp_mask) __must_hold(local)316{317struct drbd_epoch_entry *e;318struct page *page;319unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;320321if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))322return NULL;323324e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);325if (!e) {326if (!(gfp_mask & __GFP_NOWARN))327dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");328return NULL;329}330331page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));332if (!page)333goto fail;334335INIT_HLIST_NODE(&e->collision);336e->epoch = NULL;337e->mdev = mdev;338e->pages = page;339atomic_set(&e->pending_bios, 0);340e->size = data_size;341e->flags = 0;342e->sector = sector;343e->block_id = id;344345return e;346347fail:348mempool_free(e, drbd_ee_mempool);349return NULL;350}351352void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e, int is_net)353{354if (e->flags & EE_HAS_DIGEST)355kfree(e->digest);356drbd_pp_free(mdev, e->pages, is_net);357D_ASSERT(atomic_read(&e->pending_bios) == 0);358D_ASSERT(hlist_unhashed(&e->collision));359mempool_free(e, drbd_ee_mempool);360}361362int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)363{364LIST_HEAD(work_list);365struct drbd_epoch_entry *e, *t;366int count = 0;367int is_net = list == &mdev->net_ee;368369spin_lock_irq(&mdev->req_lock);370list_splice_init(list, &work_list);371spin_unlock_irq(&mdev->req_lock);372373list_for_each_entry_safe(e, t, &work_list, w.list) {374drbd_free_some_ee(mdev, e, is_net);375count++;376}377return count;378}379380381/*382* This function is called from _asender only_383* but see also comments in _req_mod(,barrier_acked)384* and receive_Barrier.385*386* Move entries from net_ee to done_ee, if ready.387* Grab done_ee, call all callbacks, free the entries.388* The callbacks typically send out ACKs.389*/390static int drbd_process_done_ee(struct drbd_conf *mdev)391{392LIST_HEAD(work_list);393LIST_HEAD(reclaimed);394struct drbd_epoch_entry *e, *t;395int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);396397spin_lock_irq(&mdev->req_lock);398reclaim_net_ee(mdev, &reclaimed);399list_splice_init(&mdev->done_ee, &work_list);400spin_unlock_irq(&mdev->req_lock);401402list_for_each_entry_safe(e, t, &reclaimed, w.list)403drbd_free_net_ee(mdev, e);404405/* possible callbacks here:406* e_end_block, and e_end_resync_block, e_send_discard_ack.407* all ignore the last argument.408*/409list_for_each_entry_safe(e, t, &work_list, w.list) {410/* list_del not necessary, next/prev members not touched */411ok = e->w.cb(mdev, &e->w, !ok) && ok;412drbd_free_ee(mdev, e);413}414wake_up(&mdev->ee_wait);415416return ok;417}418419void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)420{421DEFINE_WAIT(wait);422423/* avoids spin_lock/unlock424* and calling prepare_to_wait in the fast path */425while (!list_empty(head)) {426prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);427spin_unlock_irq(&mdev->req_lock);428io_schedule();429finish_wait(&mdev->ee_wait, &wait);430spin_lock_irq(&mdev->req_lock);431}432}433434void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)435{436spin_lock_irq(&mdev->req_lock);437_drbd_wait_ee_list_empty(mdev, head);438spin_unlock_irq(&mdev->req_lock);439}440441/* see also kernel_accept; which is only present since 2.6.18.442* also we want to log which part of it failed, exactly */443static int drbd_accept(struct drbd_conf *mdev, const char **what,444struct socket *sock, struct socket **newsock)445{446struct sock *sk = sock->sk;447int err = 0;448449*what = "listen";450err = sock->ops->listen(sock, 5);451if (err < 0)452goto out;453454*what = "sock_create_lite";455err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,456newsock);457if (err < 0)458goto out;459460*what = "accept";461err = sock->ops->accept(sock, *newsock, 0);462if (err < 0) {463sock_release(*newsock);464*newsock = NULL;465goto out;466}467(*newsock)->ops = sock->ops;468469out:470return err;471}472473static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,474void *buf, size_t size, int flags)475{476mm_segment_t oldfs;477struct kvec iov = {478.iov_base = buf,479.iov_len = size,480};481struct msghdr msg = {482.msg_iovlen = 1,483.msg_iov = (struct iovec *)&iov,484.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)485};486int rv;487488oldfs = get_fs();489set_fs(KERNEL_DS);490rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);491set_fs(oldfs);492493return rv;494}495496static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)497{498mm_segment_t oldfs;499struct kvec iov = {500.iov_base = buf,501.iov_len = size,502};503struct msghdr msg = {504.msg_iovlen = 1,505.msg_iov = (struct iovec *)&iov,506.msg_flags = MSG_WAITALL | MSG_NOSIGNAL507};508int rv;509510oldfs = get_fs();511set_fs(KERNEL_DS);512513for (;;) {514rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);515if (rv == size)516break;517518/* Note:519* ECONNRESET other side closed the connection520* ERESTARTSYS (on sock) we got a signal521*/522523if (rv < 0) {524if (rv == -ECONNRESET)525dev_info(DEV, "sock was reset by peer\n");526else if (rv != -ERESTARTSYS)527dev_err(DEV, "sock_recvmsg returned %d\n", rv);528break;529} else if (rv == 0) {530dev_info(DEV, "sock was shut down by peer\n");531break;532} else {533/* signal came in, or peer/link went down,534* after we read a partial message535*/536/* D_ASSERT(signal_pending(current)); */537break;538}539};540541set_fs(oldfs);542543if (rv != size)544drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));545546return rv;547}548549/* quoting tcp(7):550* On individual connections, the socket buffer size must be set prior to the551* listen(2) or connect(2) calls in order to have it take effect.552* This is our wrapper to do so.553*/554static void drbd_setbufsize(struct socket *sock, unsigned int snd,555unsigned int rcv)556{557/* open coded SO_SNDBUF, SO_RCVBUF */558if (snd) {559sock->sk->sk_sndbuf = snd;560sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;561}562if (rcv) {563sock->sk->sk_rcvbuf = rcv;564sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;565}566}567568static struct socket *drbd_try_connect(struct drbd_conf *mdev)569{570const char *what;571struct socket *sock;572struct sockaddr_in6 src_in6;573int err;574int disconnect_on_error = 1;575576if (!get_net_conf(mdev))577return NULL;578579what = "sock_create_kern";580err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,581SOCK_STREAM, IPPROTO_TCP, &sock);582if (err < 0) {583sock = NULL;584goto out;585}586587sock->sk->sk_rcvtimeo =588sock->sk->sk_sndtimeo = mdev->net_conf->try_connect_int*HZ;589drbd_setbufsize(sock, mdev->net_conf->sndbuf_size,590mdev->net_conf->rcvbuf_size);591592/* explicitly bind to the configured IP as source IP593* for the outgoing connections.594* This is needed for multihomed hosts and to be595* able to use lo: interfaces for drbd.596* Make sure to use 0 as port number, so linux selects597* a free one dynamically.598*/599memcpy(&src_in6, mdev->net_conf->my_addr,600min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));601if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)602src_in6.sin6_port = 0;603else604((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */605606what = "bind before connect";607err = sock->ops->bind(sock,608(struct sockaddr *) &src_in6,609mdev->net_conf->my_addr_len);610if (err < 0)611goto out;612613/* connect may fail, peer not yet available.614* stay C_WF_CONNECTION, don't go Disconnecting! */615disconnect_on_error = 0;616what = "connect";617err = sock->ops->connect(sock,618(struct sockaddr *)mdev->net_conf->peer_addr,619mdev->net_conf->peer_addr_len, 0);620621out:622if (err < 0) {623if (sock) {624sock_release(sock);625sock = NULL;626}627switch (-err) {628/* timeout, busy, signal pending */629case ETIMEDOUT: case EAGAIN: case EINPROGRESS:630case EINTR: case ERESTARTSYS:631/* peer not (yet) available, network problem */632case ECONNREFUSED: case ENETUNREACH:633case EHOSTDOWN: case EHOSTUNREACH:634disconnect_on_error = 0;635break;636default:637dev_err(DEV, "%s failed, err = %d\n", what, err);638}639if (disconnect_on_error)640drbd_force_state(mdev, NS(conn, C_DISCONNECTING));641}642put_net_conf(mdev);643return sock;644}645646static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)647{648int timeo, err;649struct socket *s_estab = NULL, *s_listen;650const char *what;651652if (!get_net_conf(mdev))653return NULL;654655what = "sock_create_kern";656err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,657SOCK_STREAM, IPPROTO_TCP, &s_listen);658if (err) {659s_listen = NULL;660goto out;661}662663timeo = mdev->net_conf->try_connect_int * HZ;664timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */665666s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */667s_listen->sk->sk_rcvtimeo = timeo;668s_listen->sk->sk_sndtimeo = timeo;669drbd_setbufsize(s_listen, mdev->net_conf->sndbuf_size,670mdev->net_conf->rcvbuf_size);671672what = "bind before listen";673err = s_listen->ops->bind(s_listen,674(struct sockaddr *) mdev->net_conf->my_addr,675mdev->net_conf->my_addr_len);676if (err < 0)677goto out;678679err = drbd_accept(mdev, &what, s_listen, &s_estab);680681out:682if (s_listen)683sock_release(s_listen);684if (err < 0) {685if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {686dev_err(DEV, "%s failed, err = %d\n", what, err);687drbd_force_state(mdev, NS(conn, C_DISCONNECTING));688}689}690put_net_conf(mdev);691692return s_estab;693}694695static int drbd_send_fp(struct drbd_conf *mdev,696struct socket *sock, enum drbd_packets cmd)697{698struct p_header80 *h = &mdev->data.sbuf.header.h80;699700return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);701}702703static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)704{705struct p_header80 *h = &mdev->data.rbuf.header.h80;706int rr;707708rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);709710if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)711return be16_to_cpu(h->command);712713return 0xffff;714}715716/**717* drbd_socket_okay() - Free the socket if its connection is not okay718* @mdev: DRBD device.719* @sock: pointer to the pointer to the socket.720*/721static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)722{723int rr;724char tb[4];725726if (!*sock)727return false;728729rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);730731if (rr > 0 || rr == -EAGAIN) {732return true;733} else {734sock_release(*sock);735*sock = NULL;736return false;737}738}739740/*741* return values:742* 1 yes, we have a valid connection743* 0 oops, did not work out, please try again744* -1 peer talks different language,745* no point in trying again, please go standalone.746* -2 We do not have a network config...747*/748static int drbd_connect(struct drbd_conf *mdev)749{750struct socket *s, *sock, *msock;751int try, h, ok;752753D_ASSERT(!mdev->data.socket);754755if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)756return -2;757758clear_bit(DISCARD_CONCURRENT, &mdev->flags);759760sock = NULL;761msock = NULL;762763do {764for (try = 0;;) {765/* 3 tries, this should take less than a second! */766s = drbd_try_connect(mdev);767if (s || ++try >= 3)768break;769/* give the other side time to call bind() & listen() */770schedule_timeout_interruptible(HZ / 10);771}772773if (s) {774if (!sock) {775drbd_send_fp(mdev, s, P_HAND_SHAKE_S);776sock = s;777s = NULL;778} else if (!msock) {779drbd_send_fp(mdev, s, P_HAND_SHAKE_M);780msock = s;781s = NULL;782} else {783dev_err(DEV, "Logic error in drbd_connect()\n");784goto out_release_sockets;785}786}787788if (sock && msock) {789schedule_timeout_interruptible(mdev->net_conf->ping_timeo*HZ/10);790ok = drbd_socket_okay(mdev, &sock);791ok = drbd_socket_okay(mdev, &msock) && ok;792if (ok)793break;794}795796retry:797s = drbd_wait_for_connect(mdev);798if (s) {799try = drbd_recv_fp(mdev, s);800drbd_socket_okay(mdev, &sock);801drbd_socket_okay(mdev, &msock);802switch (try) {803case P_HAND_SHAKE_S:804if (sock) {805dev_warn(DEV, "initial packet S crossed\n");806sock_release(sock);807}808sock = s;809break;810case P_HAND_SHAKE_M:811if (msock) {812dev_warn(DEV, "initial packet M crossed\n");813sock_release(msock);814}815msock = s;816set_bit(DISCARD_CONCURRENT, &mdev->flags);817break;818default:819dev_warn(DEV, "Error receiving initial packet\n");820sock_release(s);821if (random32() & 1)822goto retry;823}824}825826if (mdev->state.conn <= C_DISCONNECTING)827goto out_release_sockets;828if (signal_pending(current)) {829flush_signals(current);830smp_rmb();831if (get_t_state(&mdev->receiver) == Exiting)832goto out_release_sockets;833}834835if (sock && msock) {836ok = drbd_socket_okay(mdev, &sock);837ok = drbd_socket_okay(mdev, &msock) && ok;838if (ok)839break;840}841} while (1);842843msock->sk->sk_reuse = 1; /* SO_REUSEADDR */844sock->sk->sk_reuse = 1; /* SO_REUSEADDR */845846sock->sk->sk_allocation = GFP_NOIO;847msock->sk->sk_allocation = GFP_NOIO;848849sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;850msock->sk->sk_priority = TC_PRIO_INTERACTIVE;851852/* NOT YET ...853* sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;854* sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;855* first set it to the P_HAND_SHAKE timeout,856* which we set to 4x the configured ping_timeout. */857sock->sk->sk_sndtimeo =858sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;859860msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;861msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;862863/* we don't want delays.864* we use TCP_CORK where appropriate, though */865drbd_tcp_nodelay(sock);866drbd_tcp_nodelay(msock);867868mdev->data.socket = sock;869mdev->meta.socket = msock;870mdev->last_received = jiffies;871872D_ASSERT(mdev->asender.task == NULL);873874h = drbd_do_handshake(mdev);875if (h <= 0)876return h;877878if (mdev->cram_hmac_tfm) {879/* drbd_request_state(mdev, NS(conn, WFAuth)); */880switch (drbd_do_auth(mdev)) {881case -1:882dev_err(DEV, "Authentication of peer failed\n");883return -1;884case 0:885dev_err(DEV, "Authentication of peer failed, trying again.\n");886return 0;887}888}889890if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)891return 0;892893sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;894sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;895896atomic_set(&mdev->packet_seq, 0);897mdev->peer_seq = 0;898899drbd_thread_start(&mdev->asender);900901if (drbd_send_protocol(mdev) == -1)902return -1;903drbd_send_sync_param(mdev, &mdev->sync_conf);904drbd_send_sizes(mdev, 0, 0);905drbd_send_uuids(mdev);906drbd_send_state(mdev);907clear_bit(USE_DEGR_WFC_T, &mdev->flags);908clear_bit(RESIZE_PENDING, &mdev->flags);909mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */910911return 1;912913out_release_sockets:914if (sock)915sock_release(sock);916if (msock)917sock_release(msock);918return -1;919}920921static int drbd_recv_header(struct drbd_conf *mdev, enum drbd_packets *cmd, unsigned int *packet_size)922{923union p_header *h = &mdev->data.rbuf.header;924int r;925926r = drbd_recv(mdev, h, sizeof(*h));927if (unlikely(r != sizeof(*h))) {928if (!signal_pending(current))929dev_warn(DEV, "short read expecting header on sock: r=%d\n", r);930return false;931}932933if (likely(h->h80.magic == BE_DRBD_MAGIC)) {934*cmd = be16_to_cpu(h->h80.command);935*packet_size = be16_to_cpu(h->h80.length);936} else if (h->h95.magic == BE_DRBD_MAGIC_BIG) {937*cmd = be16_to_cpu(h->h95.command);938*packet_size = be32_to_cpu(h->h95.length);939} else {940dev_err(DEV, "magic?? on data m: 0x%08x c: %d l: %d\n",941be32_to_cpu(h->h80.magic),942be16_to_cpu(h->h80.command),943be16_to_cpu(h->h80.length));944return false;945}946mdev->last_received = jiffies;947948return true;949}950951static void drbd_flush(struct drbd_conf *mdev)952{953int rv;954955if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {956rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,957NULL);958if (rv) {959dev_err(DEV, "local disk flush failed with status %d\n", rv);960/* would rather check on EOPNOTSUPP, but that is not reliable.961* don't try again for ANY return value != 0962* if (rv == -EOPNOTSUPP) */963drbd_bump_write_ordering(mdev, WO_drain_io);964}965put_ldev(mdev);966}967}968969/**970* drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.971* @mdev: DRBD device.972* @epoch: Epoch object.973* @ev: Epoch event.974*/975static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,976struct drbd_epoch *epoch,977enum epoch_event ev)978{979int epoch_size;980struct drbd_epoch *next_epoch;981enum finish_epoch rv = FE_STILL_LIVE;982983spin_lock(&mdev->epoch_lock);984do {985next_epoch = NULL;986987epoch_size = atomic_read(&epoch->epoch_size);988989switch (ev & ~EV_CLEANUP) {990case EV_PUT:991atomic_dec(&epoch->active);992break;993case EV_GOT_BARRIER_NR:994set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);995break;996case EV_BECAME_LAST:997/* nothing to do*/998break;999}10001001if (epoch_size != 0 &&1002atomic_read(&epoch->active) == 0 &&1003test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {1004if (!(ev & EV_CLEANUP)) {1005spin_unlock(&mdev->epoch_lock);1006drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);1007spin_lock(&mdev->epoch_lock);1008}1009dec_unacked(mdev);10101011if (mdev->current_epoch != epoch) {1012next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);1013list_del(&epoch->list);1014ev = EV_BECAME_LAST | (ev & EV_CLEANUP);1015mdev->epochs--;1016kfree(epoch);10171018if (rv == FE_STILL_LIVE)1019rv = FE_DESTROYED;1020} else {1021epoch->flags = 0;1022atomic_set(&epoch->epoch_size, 0);1023/* atomic_set(&epoch->active, 0); is already zero */1024if (rv == FE_STILL_LIVE)1025rv = FE_RECYCLED;1026wake_up(&mdev->ee_wait);1027}1028}10291030if (!next_epoch)1031break;10321033epoch = next_epoch;1034} while (1);10351036spin_unlock(&mdev->epoch_lock);10371038return rv;1039}10401041/**1042* drbd_bump_write_ordering() - Fall back to an other write ordering method1043* @mdev: DRBD device.1044* @wo: Write ordering method to try.1045*/1046void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)1047{1048enum write_ordering_e pwo;1049static char *write_ordering_str[] = {1050[WO_none] = "none",1051[WO_drain_io] = "drain",1052[WO_bdev_flush] = "flush",1053};10541055pwo = mdev->write_ordering;1056wo = min(pwo, wo);1057if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)1058wo = WO_drain_io;1059if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)1060wo = WO_none;1061mdev->write_ordering = wo;1062if (pwo != mdev->write_ordering || wo == WO_bdev_flush)1063dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);1064}10651066/**1067* drbd_submit_ee()1068* @mdev: DRBD device.1069* @e: epoch entry1070* @rw: flag field, see bio->bi_rw1071*1072* May spread the pages to multiple bios,1073* depending on bio_add_page restrictions.1074*1075* Returns 0 if all bios have been submitted,1076* -ENOMEM if we could not allocate enough bios,1077* -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a1078* single page to an empty bio (which should never happen and likely indicates1079* that the lower level IO stack is in some way broken). This has been observed1080* on certain Xen deployments.1081*/1082/* TODO allocate from our own bio_set. */1083int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,1084const unsigned rw, const int fault_type)1085{1086struct bio *bios = NULL;1087struct bio *bio;1088struct page *page = e->pages;1089sector_t sector = e->sector;1090unsigned ds = e->size;1091unsigned n_bios = 0;1092unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;1093int err = -ENOMEM;10941095/* In most cases, we will only need one bio. But in case the lower1096* level restrictions happen to be different at this offset on this1097* side than those of the sending peer, we may need to submit the1098* request in more than one bio. */1099next_bio:1100bio = bio_alloc(GFP_NOIO, nr_pages);1101if (!bio) {1102dev_err(DEV, "submit_ee: Allocation of a bio failed\n");1103goto fail;1104}1105/* > e->sector, unless this is the first bio */1106bio->bi_sector = sector;1107bio->bi_bdev = mdev->ldev->backing_bdev;1108bio->bi_rw = rw;1109bio->bi_private = e;1110bio->bi_end_io = drbd_endio_sec;11111112bio->bi_next = bios;1113bios = bio;1114++n_bios;11151116page_chain_for_each(page) {1117unsigned len = min_t(unsigned, ds, PAGE_SIZE);1118if (!bio_add_page(bio, page, len, 0)) {1119/* A single page must always be possible!1120* But in case it fails anyways,1121* we deal with it, and complain (below). */1122if (bio->bi_vcnt == 0) {1123dev_err(DEV,1124"bio_add_page failed for len=%u, "1125"bi_vcnt=0 (bi_sector=%llu)\n",1126len, (unsigned long long)bio->bi_sector);1127err = -ENOSPC;1128goto fail;1129}1130goto next_bio;1131}1132ds -= len;1133sector += len >> 9;1134--nr_pages;1135}1136D_ASSERT(page == NULL);1137D_ASSERT(ds == 0);11381139atomic_set(&e->pending_bios, n_bios);1140do {1141bio = bios;1142bios = bios->bi_next;1143bio->bi_next = NULL;11441145drbd_generic_make_request(mdev, fault_type, bio);1146} while (bios);1147return 0;11481149fail:1150while (bios) {1151bio = bios;1152bios = bios->bi_next;1153bio_put(bio);1154}1155return err;1156}11571158static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)1159{1160int rv;1161struct p_barrier *p = &mdev->data.rbuf.barrier;1162struct drbd_epoch *epoch;11631164inc_unacked(mdev);11651166mdev->current_epoch->barrier_nr = p->barrier;1167rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);11681169/* P_BARRIER_ACK may imply that the corresponding extent is dropped from1170* the activity log, which means it would not be resynced in case the1171* R_PRIMARY crashes now.1172* Therefore we must send the barrier_ack after the barrier request was1173* completed. */1174switch (mdev->write_ordering) {1175case WO_none:1176if (rv == FE_RECYCLED)1177return true;11781179/* receiver context, in the writeout path of the other node.1180* avoid potential distributed deadlock */1181epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);1182if (epoch)1183break;1184else1185dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");1186/* Fall through */11871188case WO_bdev_flush:1189case WO_drain_io:1190drbd_wait_ee_list_empty(mdev, &mdev->active_ee);1191drbd_flush(mdev);11921193if (atomic_read(&mdev->current_epoch->epoch_size)) {1194epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);1195if (epoch)1196break;1197}11981199epoch = mdev->current_epoch;1200wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);12011202D_ASSERT(atomic_read(&epoch->active) == 0);1203D_ASSERT(epoch->flags == 0);12041205return true;1206default:1207dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);1208return false;1209}12101211epoch->flags = 0;1212atomic_set(&epoch->epoch_size, 0);1213atomic_set(&epoch->active, 0);12141215spin_lock(&mdev->epoch_lock);1216if (atomic_read(&mdev->current_epoch->epoch_size)) {1217list_add(&epoch->list, &mdev->current_epoch->list);1218mdev->current_epoch = epoch;1219mdev->epochs++;1220} else {1221/* The current_epoch got recycled while we allocated this one... */1222kfree(epoch);1223}1224spin_unlock(&mdev->epoch_lock);12251226return true;1227}12281229/* used from receive_RSDataReply (recv_resync_read)1230* and from receive_Data */1231static struct drbd_epoch_entry *1232read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)1233{1234const sector_t capacity = drbd_get_capacity(mdev->this_bdev);1235struct drbd_epoch_entry *e;1236struct page *page;1237int dgs, ds, rr;1238void *dig_in = mdev->int_dig_in;1239void *dig_vv = mdev->int_dig_vv;1240unsigned long *data;12411242dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?1243crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;12441245if (dgs) {1246rr = drbd_recv(mdev, dig_in, dgs);1247if (rr != dgs) {1248if (!signal_pending(current))1249dev_warn(DEV,1250"short read receiving data digest: read %d expected %d\n",1251rr, dgs);1252return NULL;1253}1254}12551256data_size -= dgs;12571258ERR_IF(data_size == 0) return NULL;1259ERR_IF(data_size & 0x1ff) return NULL;1260ERR_IF(data_size > DRBD_MAX_BIO_SIZE) return NULL;12611262/* even though we trust out peer,1263* we sometimes have to double check. */1264if (sector + (data_size>>9) > capacity) {1265dev_err(DEV, "request from peer beyond end of local disk: "1266"capacity: %llus < sector: %llus + size: %u\n",1267(unsigned long long)capacity,1268(unsigned long long)sector, data_size);1269return NULL;1270}12711272/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD1273* "criss-cross" setup, that might cause write-out on some other DRBD,1274* which in turn might block on the other node at this very place. */1275e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);1276if (!e)1277return NULL;12781279ds = data_size;1280page = e->pages;1281page_chain_for_each(page) {1282unsigned len = min_t(int, ds, PAGE_SIZE);1283data = kmap(page);1284rr = drbd_recv(mdev, data, len);1285if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {1286dev_err(DEV, "Fault injection: Corrupting data on receive\n");1287data[0] = data[0] ^ (unsigned long)-1;1288}1289kunmap(page);1290if (rr != len) {1291drbd_free_ee(mdev, e);1292if (!signal_pending(current))1293dev_warn(DEV, "short read receiving data: read %d expected %d\n",1294rr, len);1295return NULL;1296}1297ds -= rr;1298}12991300if (dgs) {1301drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);1302if (memcmp(dig_in, dig_vv, dgs)) {1303dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",1304(unsigned long long)sector, data_size);1305drbd_bcast_ee(mdev, "digest failed",1306dgs, dig_in, dig_vv, e);1307drbd_free_ee(mdev, e);1308return NULL;1309}1310}1311mdev->recv_cnt += data_size>>9;1312return e;1313}13141315/* drbd_drain_block() just takes a data block1316* out of the socket input buffer, and discards it.1317*/1318static int drbd_drain_block(struct drbd_conf *mdev, int data_size)1319{1320struct page *page;1321int rr, rv = 1;1322void *data;13231324if (!data_size)1325return true;13261327page = drbd_pp_alloc(mdev, 1, 1);13281329data = kmap(page);1330while (data_size) {1331rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));1332if (rr != min_t(int, data_size, PAGE_SIZE)) {1333rv = 0;1334if (!signal_pending(current))1335dev_warn(DEV,1336"short read receiving data: read %d expected %d\n",1337rr, min_t(int, data_size, PAGE_SIZE));1338break;1339}1340data_size -= rr;1341}1342kunmap(page);1343drbd_pp_free(mdev, page, 0);1344return rv;1345}13461347static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,1348sector_t sector, int data_size)1349{1350struct bio_vec *bvec;1351struct bio *bio;1352int dgs, rr, i, expect;1353void *dig_in = mdev->int_dig_in;1354void *dig_vv = mdev->int_dig_vv;13551356dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?1357crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;13581359if (dgs) {1360rr = drbd_recv(mdev, dig_in, dgs);1361if (rr != dgs) {1362if (!signal_pending(current))1363dev_warn(DEV,1364"short read receiving data reply digest: read %d expected %d\n",1365rr, dgs);1366return 0;1367}1368}13691370data_size -= dgs;13711372/* optimistically update recv_cnt. if receiving fails below,1373* we disconnect anyways, and counters will be reset. */1374mdev->recv_cnt += data_size>>9;13751376bio = req->master_bio;1377D_ASSERT(sector == bio->bi_sector);13781379bio_for_each_segment(bvec, bio, i) {1380expect = min_t(int, data_size, bvec->bv_len);1381rr = drbd_recv(mdev,1382kmap(bvec->bv_page)+bvec->bv_offset,1383expect);1384kunmap(bvec->bv_page);1385if (rr != expect) {1386if (!signal_pending(current))1387dev_warn(DEV, "short read receiving data reply: "1388"read %d expected %d\n",1389rr, expect);1390return 0;1391}1392data_size -= rr;1393}13941395if (dgs) {1396drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);1397if (memcmp(dig_in, dig_vv, dgs)) {1398dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");1399return 0;1400}1401}14021403D_ASSERT(data_size == 0);1404return 1;1405}14061407/* e_end_resync_block() is called via1408* drbd_process_done_ee() by asender only */1409static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)1410{1411struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;1412sector_t sector = e->sector;1413int ok;14141415D_ASSERT(hlist_unhashed(&e->collision));14161417if (likely((e->flags & EE_WAS_ERROR) == 0)) {1418drbd_set_in_sync(mdev, sector, e->size);1419ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);1420} else {1421/* Record failure to sync */1422drbd_rs_failed_io(mdev, sector, e->size);14231424ok = drbd_send_ack(mdev, P_NEG_ACK, e);1425}1426dec_unacked(mdev);14271428return ok;1429}14301431static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)1432{1433struct drbd_epoch_entry *e;14341435e = read_in_block(mdev, ID_SYNCER, sector, data_size);1436if (!e)1437goto fail;14381439dec_rs_pending(mdev);14401441inc_unacked(mdev);1442/* corresponding dec_unacked() in e_end_resync_block()1443* respective _drbd_clear_done_ee */14441445e->w.cb = e_end_resync_block;14461447spin_lock_irq(&mdev->req_lock);1448list_add(&e->w.list, &mdev->sync_ee);1449spin_unlock_irq(&mdev->req_lock);14501451atomic_add(data_size >> 9, &mdev->rs_sect_ev);1452if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)1453return true;14541455/* don't care for the reason here */1456dev_err(DEV, "submit failed, triggering re-connect\n");1457spin_lock_irq(&mdev->req_lock);1458list_del(&e->w.list);1459spin_unlock_irq(&mdev->req_lock);14601461drbd_free_ee(mdev, e);1462fail:1463put_ldev(mdev);1464return false;1465}14661467static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)1468{1469struct drbd_request *req;1470sector_t sector;1471int ok;1472struct p_data *p = &mdev->data.rbuf.data;14731474sector = be64_to_cpu(p->sector);14751476spin_lock_irq(&mdev->req_lock);1477req = _ar_id_to_req(mdev, p->block_id, sector);1478spin_unlock_irq(&mdev->req_lock);1479if (unlikely(!req)) {1480dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");1481return false;1482}14831484/* hlist_del(&req->collision) is done in _req_may_be_done, to avoid1485* special casing it there for the various failure cases.1486* still no race with drbd_fail_pending_reads */1487ok = recv_dless_read(mdev, req, sector, data_size);14881489if (ok)1490req_mod(req, data_received);1491/* else: nothing. handled from drbd_disconnect...1492* I don't think we may complete this just yet1493* in case we are "on-disconnect: freeze" */14941495return ok;1496}14971498static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)1499{1500sector_t sector;1501int ok;1502struct p_data *p = &mdev->data.rbuf.data;15031504sector = be64_to_cpu(p->sector);1505D_ASSERT(p->block_id == ID_SYNCER);15061507if (get_ldev(mdev)) {1508/* data is submitted to disk within recv_resync_read.1509* corresponding put_ldev done below on error,1510* or in drbd_endio_write_sec. */1511ok = recv_resync_read(mdev, sector, data_size);1512} else {1513if (__ratelimit(&drbd_ratelimit_state))1514dev_err(DEV, "Can not write resync data to local disk.\n");15151516ok = drbd_drain_block(mdev, data_size);15171518drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);1519}15201521atomic_add(data_size >> 9, &mdev->rs_sect_in);15221523return ok;1524}15251526/* e_end_block() is called via drbd_process_done_ee().1527* this means this function only runs in the asender thread1528*/1529static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)1530{1531struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;1532sector_t sector = e->sector;1533int ok = 1, pcmd;15341535if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {1536if (likely((e->flags & EE_WAS_ERROR) == 0)) {1537pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&1538mdev->state.conn <= C_PAUSED_SYNC_T &&1539e->flags & EE_MAY_SET_IN_SYNC) ?1540P_RS_WRITE_ACK : P_WRITE_ACK;1541ok &= drbd_send_ack(mdev, pcmd, e);1542if (pcmd == P_RS_WRITE_ACK)1543drbd_set_in_sync(mdev, sector, e->size);1544} else {1545ok = drbd_send_ack(mdev, P_NEG_ACK, e);1546/* we expect it to be marked out of sync anyways...1547* maybe assert this? */1548}1549dec_unacked(mdev);1550}1551/* we delete from the conflict detection hash _after_ we sent out the1552* P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */1553if (mdev->net_conf->two_primaries) {1554spin_lock_irq(&mdev->req_lock);1555D_ASSERT(!hlist_unhashed(&e->collision));1556hlist_del_init(&e->collision);1557spin_unlock_irq(&mdev->req_lock);1558} else {1559D_ASSERT(hlist_unhashed(&e->collision));1560}15611562drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));15631564return ok;1565}15661567static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)1568{1569struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;1570int ok = 1;15711572D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);1573ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);15741575spin_lock_irq(&mdev->req_lock);1576D_ASSERT(!hlist_unhashed(&e->collision));1577hlist_del_init(&e->collision);1578spin_unlock_irq(&mdev->req_lock);15791580dec_unacked(mdev);15811582return ok;1583}15841585/* Called from receive_Data.1586* Synchronize packets on sock with packets on msock.1587*1588* This is here so even when a P_DATA packet traveling via sock overtook an Ack1589* packet traveling on msock, they are still processed in the order they have1590* been sent.1591*1592* Note: we don't care for Ack packets overtaking P_DATA packets.1593*1594* In case packet_seq is larger than mdev->peer_seq number, there are1595* outstanding packets on the msock. We wait for them to arrive.1596* In case we are the logically next packet, we update mdev->peer_seq1597* ourselves. Correctly handles 32bit wrap around.1598*1599* Assume we have a 10 GBit connection, that is about 1<<30 byte per second,1600* about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds1601* for the 24bit wrap (historical atomic_t guarantee on some archs), and we have1602* 1<<9 == 512 seconds aka ages for the 32bit wrap around...1603*1604* returns 0 if we may process the packet,1605* -ERESTARTSYS if we were interrupted (by disconnect signal). */1606static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)1607{1608DEFINE_WAIT(wait);1609unsigned int p_seq;1610long timeout;1611int ret = 0;1612spin_lock(&mdev->peer_seq_lock);1613for (;;) {1614prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);1615if (seq_le(packet_seq, mdev->peer_seq+1))1616break;1617if (signal_pending(current)) {1618ret = -ERESTARTSYS;1619break;1620}1621p_seq = mdev->peer_seq;1622spin_unlock(&mdev->peer_seq_lock);1623timeout = schedule_timeout(30*HZ);1624spin_lock(&mdev->peer_seq_lock);1625if (timeout == 0 && p_seq == mdev->peer_seq) {1626ret = -ETIMEDOUT;1627dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");1628break;1629}1630}1631finish_wait(&mdev->seq_wait, &wait);1632if (mdev->peer_seq+1 == packet_seq)1633mdev->peer_seq++;1634spin_unlock(&mdev->peer_seq_lock);1635return ret;1636}16371638/* see also bio_flags_to_wire()1639* DRBD_REQ_*, because we need to semantically map the flags to data packet1640* flags and back. We may replicate to other kernel versions. */1641static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)1642{1643return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |1644(dpf & DP_FUA ? REQ_FUA : 0) |1645(dpf & DP_FLUSH ? REQ_FLUSH : 0) |1646(dpf & DP_DISCARD ? REQ_DISCARD : 0);1647}16481649/* mirrored write */1650static int receive_Data(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)1651{1652sector_t sector;1653struct drbd_epoch_entry *e;1654struct p_data *p = &mdev->data.rbuf.data;1655int rw = WRITE;1656u32 dp_flags;16571658if (!get_ldev(mdev)) {1659spin_lock(&mdev->peer_seq_lock);1660if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))1661mdev->peer_seq++;1662spin_unlock(&mdev->peer_seq_lock);16631664drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);1665atomic_inc(&mdev->current_epoch->epoch_size);1666return drbd_drain_block(mdev, data_size);1667}16681669/* get_ldev(mdev) successful.1670* Corresponding put_ldev done either below (on various errors),1671* or in drbd_endio_write_sec, if we successfully submit the data at1672* the end of this function. */16731674sector = be64_to_cpu(p->sector);1675e = read_in_block(mdev, p->block_id, sector, data_size);1676if (!e) {1677put_ldev(mdev);1678return false;1679}16801681e->w.cb = e_end_block;16821683dp_flags = be32_to_cpu(p->dp_flags);1684rw |= wire_flags_to_bio(mdev, dp_flags);16851686if (dp_flags & DP_MAY_SET_IN_SYNC)1687e->flags |= EE_MAY_SET_IN_SYNC;16881689spin_lock(&mdev->epoch_lock);1690e->epoch = mdev->current_epoch;1691atomic_inc(&e->epoch->epoch_size);1692atomic_inc(&e->epoch->active);1693spin_unlock(&mdev->epoch_lock);16941695/* I'm the receiver, I do hold a net_cnt reference. */1696if (!mdev->net_conf->two_primaries) {1697spin_lock_irq(&mdev->req_lock);1698} else {1699/* don't get the req_lock yet,1700* we may sleep in drbd_wait_peer_seq */1701const int size = e->size;1702const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);1703DEFINE_WAIT(wait);1704struct drbd_request *i;1705struct hlist_node *n;1706struct hlist_head *slot;1707int first;17081709D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);1710BUG_ON(mdev->ee_hash == NULL);1711BUG_ON(mdev->tl_hash == NULL);17121713/* conflict detection and handling:1714* 1. wait on the sequence number,1715* in case this data packet overtook ACK packets.1716* 2. check our hash tables for conflicting requests.1717* we only need to walk the tl_hash, since an ee can not1718* have a conflict with an other ee: on the submitting1719* node, the corresponding req had already been conflicting,1720* and a conflicting req is never sent.1721*1722* Note: for two_primaries, we are protocol C,1723* so there cannot be any request that is DONE1724* but still on the transfer log.1725*1726* unconditionally add to the ee_hash.1727*1728* if no conflicting request is found:1729* submit.1730*1731* if any conflicting request is found1732* that has not yet been acked,1733* AND I have the "discard concurrent writes" flag:1734* queue (via done_ee) the P_DISCARD_ACK; OUT.1735*1736* if any conflicting request is found:1737* block the receiver, waiting on misc_wait1738* until no more conflicting requests are there,1739* or we get interrupted (disconnect).1740*1741* we do not just write after local io completion of those1742* requests, but only after req is done completely, i.e.1743* we wait for the P_DISCARD_ACK to arrive!1744*1745* then proceed normally, i.e. submit.1746*/1747if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))1748goto out_interrupted;17491750spin_lock_irq(&mdev->req_lock);17511752hlist_add_head(&e->collision, ee_hash_slot(mdev, sector));17531754#define OVERLAPS overlaps(i->sector, i->size, sector, size)1755slot = tl_hash_slot(mdev, sector);1756first = 1;1757for (;;) {1758int have_unacked = 0;1759int have_conflict = 0;1760prepare_to_wait(&mdev->misc_wait, &wait,1761TASK_INTERRUPTIBLE);1762hlist_for_each_entry(i, n, slot, collision) {1763if (OVERLAPS) {1764/* only ALERT on first iteration,1765* we may be woken up early... */1766if (first)1767dev_alert(DEV, "%s[%u] Concurrent local write detected!"1768" new: %llus +%u; pending: %llus +%u\n",1769current->comm, current->pid,1770(unsigned long long)sector, size,1771(unsigned long long)i->sector, i->size);1772if (i->rq_state & RQ_NET_PENDING)1773++have_unacked;1774++have_conflict;1775}1776}1777#undef OVERLAPS1778if (!have_conflict)1779break;17801781/* Discard Ack only for the _first_ iteration */1782if (first && discard && have_unacked) {1783dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",1784(unsigned long long)sector);1785inc_unacked(mdev);1786e->w.cb = e_send_discard_ack;1787list_add_tail(&e->w.list, &mdev->done_ee);17881789spin_unlock_irq(&mdev->req_lock);17901791/* we could probably send that P_DISCARD_ACK ourselves,1792* but I don't like the receiver using the msock */17931794put_ldev(mdev);1795wake_asender(mdev);1796finish_wait(&mdev->misc_wait, &wait);1797return true;1798}17991800if (signal_pending(current)) {1801hlist_del_init(&e->collision);18021803spin_unlock_irq(&mdev->req_lock);18041805finish_wait(&mdev->misc_wait, &wait);1806goto out_interrupted;1807}18081809spin_unlock_irq(&mdev->req_lock);1810if (first) {1811first = 0;1812dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "1813"sec=%llus\n", (unsigned long long)sector);1814} else if (discard) {1815/* we had none on the first iteration.1816* there must be none now. */1817D_ASSERT(have_unacked == 0);1818}1819schedule();1820spin_lock_irq(&mdev->req_lock);1821}1822finish_wait(&mdev->misc_wait, &wait);1823}18241825list_add(&e->w.list, &mdev->active_ee);1826spin_unlock_irq(&mdev->req_lock);18271828switch (mdev->net_conf->wire_protocol) {1829case DRBD_PROT_C:1830inc_unacked(mdev);1831/* corresponding dec_unacked() in e_end_block()1832* respective _drbd_clear_done_ee */1833break;1834case DRBD_PROT_B:1835/* I really don't like it that the receiver thread1836* sends on the msock, but anyways */1837drbd_send_ack(mdev, P_RECV_ACK, e);1838break;1839case DRBD_PROT_A:1840/* nothing to do */1841break;1842}18431844if (mdev->state.pdsk < D_INCONSISTENT) {1845/* In case we have the only disk of the cluster, */1846drbd_set_out_of_sync(mdev, e->sector, e->size);1847e->flags |= EE_CALL_AL_COMPLETE_IO;1848e->flags &= ~EE_MAY_SET_IN_SYNC;1849drbd_al_begin_io(mdev, e->sector);1850}18511852if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)1853return true;18541855/* don't care for the reason here */1856dev_err(DEV, "submit failed, triggering re-connect\n");1857spin_lock_irq(&mdev->req_lock);1858list_del(&e->w.list);1859hlist_del_init(&e->collision);1860spin_unlock_irq(&mdev->req_lock);1861if (e->flags & EE_CALL_AL_COMPLETE_IO)1862drbd_al_complete_io(mdev, e->sector);18631864out_interrupted:1865drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + EV_CLEANUP);1866put_ldev(mdev);1867drbd_free_ee(mdev, e);1868return false;1869}18701871/* We may throttle resync, if the lower device seems to be busy,1872* and current sync rate is above c_min_rate.1873*1874* To decide whether or not the lower device is busy, we use a scheme similar1875* to MD RAID is_mddev_idle(): if the partition stats reveal "significant"1876* (more than 64 sectors) of activity we cannot account for with our own resync1877* activity, it obviously is "busy".1878*1879* The current sync rate used here uses only the most recent two step marks,1880* to have a short time average so we can react faster.1881*/1882int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)1883{1884struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;1885unsigned long db, dt, dbdt;1886struct lc_element *tmp;1887int curr_events;1888int throttle = 0;18891890/* feature disabled? */1891if (mdev->sync_conf.c_min_rate == 0)1892return 0;18931894spin_lock_irq(&mdev->al_lock);1895tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));1896if (tmp) {1897struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);1898if (test_bit(BME_PRIORITY, &bm_ext->flags)) {1899spin_unlock_irq(&mdev->al_lock);1900return 0;1901}1902/* Do not slow down if app IO is already waiting for this extent */1903}1904spin_unlock_irq(&mdev->al_lock);19051906curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +1907(int)part_stat_read(&disk->part0, sectors[1]) -1908atomic_read(&mdev->rs_sect_ev);19091910if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {1911unsigned long rs_left;1912int i;19131914mdev->rs_last_events = curr_events;19151916/* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,1917* approx. */1918i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;19191920if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)1921rs_left = mdev->ov_left;1922else1923rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;19241925dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;1926if (!dt)1927dt++;1928db = mdev->rs_mark_left[i] - rs_left;1929dbdt = Bit2KB(db/dt);19301931if (dbdt > mdev->sync_conf.c_min_rate)1932throttle = 1;1933}1934return throttle;1935}193619371938static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int digest_size)1939{1940sector_t sector;1941const sector_t capacity = drbd_get_capacity(mdev->this_bdev);1942struct drbd_epoch_entry *e;1943struct digest_info *di = NULL;1944int size, verb;1945unsigned int fault_type;1946struct p_block_req *p = &mdev->data.rbuf.block_req;19471948sector = be64_to_cpu(p->sector);1949size = be32_to_cpu(p->blksize);19501951if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_BIO_SIZE) {1952dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,1953(unsigned long long)sector, size);1954return false;1955}1956if (sector + (size>>9) > capacity) {1957dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,1958(unsigned long long)sector, size);1959return false;1960}19611962if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {1963verb = 1;1964switch (cmd) {1965case P_DATA_REQUEST:1966drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);1967break;1968case P_RS_DATA_REQUEST:1969case P_CSUM_RS_REQUEST:1970case P_OV_REQUEST:1971drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);1972break;1973case P_OV_REPLY:1974verb = 0;1975dec_rs_pending(mdev);1976drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);1977break;1978default:1979dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",1980cmdname(cmd));1981}1982if (verb && __ratelimit(&drbd_ratelimit_state))1983dev_err(DEV, "Can not satisfy peer's read request, "1984"no local data.\n");19851986/* drain possibly payload */1987return drbd_drain_block(mdev, digest_size);1988}19891990/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD1991* "criss-cross" setup, that might cause write-out on some other DRBD,1992* which in turn might block on the other node at this very place. */1993e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);1994if (!e) {1995put_ldev(mdev);1996return false;1997}19981999switch (cmd) {2000case P_DATA_REQUEST:2001e->w.cb = w_e_end_data_req;2002fault_type = DRBD_FAULT_DT_RD;2003/* application IO, don't drbd_rs_begin_io */2004goto submit;20052006case P_RS_DATA_REQUEST:2007e->w.cb = w_e_end_rsdata_req;2008fault_type = DRBD_FAULT_RS_RD;2009/* used in the sector offset progress display */2010mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);2011break;20122013case P_OV_REPLY:2014case P_CSUM_RS_REQUEST:2015fault_type = DRBD_FAULT_RS_RD;2016di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);2017if (!di)2018goto out_free_e;20192020di->digest_size = digest_size;2021di->digest = (((char *)di)+sizeof(struct digest_info));20222023e->digest = di;2024e->flags |= EE_HAS_DIGEST;20252026if (drbd_recv(mdev, di->digest, digest_size) != digest_size)2027goto out_free_e;20282029if (cmd == P_CSUM_RS_REQUEST) {2030D_ASSERT(mdev->agreed_pro_version >= 89);2031e->w.cb = w_e_end_csum_rs_req;2032/* used in the sector offset progress display */2033mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);2034} else if (cmd == P_OV_REPLY) {2035/* track progress, we may need to throttle */2036atomic_add(size >> 9, &mdev->rs_sect_in);2037e->w.cb = w_e_end_ov_reply;2038dec_rs_pending(mdev);2039/* drbd_rs_begin_io done when we sent this request,2040* but accounting still needs to be done. */2041goto submit_for_resync;2042}2043break;20442045case P_OV_REQUEST:2046if (mdev->ov_start_sector == ~(sector_t)0 &&2047mdev->agreed_pro_version >= 90) {2048unsigned long now = jiffies;2049int i;2050mdev->ov_start_sector = sector;2051mdev->ov_position = sector;2052mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);2053mdev->rs_total = mdev->ov_left;2054for (i = 0; i < DRBD_SYNC_MARKS; i++) {2055mdev->rs_mark_left[i] = mdev->ov_left;2056mdev->rs_mark_time[i] = now;2057}2058dev_info(DEV, "Online Verify start sector: %llu\n",2059(unsigned long long)sector);2060}2061e->w.cb = w_e_end_ov_req;2062fault_type = DRBD_FAULT_RS_RD;2063break;20642065default:2066dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",2067cmdname(cmd));2068fault_type = DRBD_FAULT_MAX;2069goto out_free_e;2070}20712072/* Throttle, drbd_rs_begin_io and submit should become asynchronous2073* wrt the receiver, but it is not as straightforward as it may seem.2074* Various places in the resync start and stop logic assume resync2075* requests are processed in order, requeuing this on the worker thread2076* introduces a bunch of new code for synchronization between threads.2077*2078* Unlimited throttling before drbd_rs_begin_io may stall the resync2079* "forever", throttling after drbd_rs_begin_io will lock that extent2080* for application writes for the same time. For now, just throttle2081* here, where the rest of the code expects the receiver to sleep for2082* a while, anyways.2083*/20842085/* Throttle before drbd_rs_begin_io, as that locks out application IO;2086* this defers syncer requests for some time, before letting at least2087* on request through. The resync controller on the receiving side2088* will adapt to the incoming rate accordingly.2089*2090* We cannot throttle here if remote is Primary/SyncTarget:2091* we would also throttle its application reads.2092* In that case, throttling is done on the SyncTarget only.2093*/2094if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))2095schedule_timeout_uninterruptible(HZ/10);2096if (drbd_rs_begin_io(mdev, sector))2097goto out_free_e;20982099submit_for_resync:2100atomic_add(size >> 9, &mdev->rs_sect_ev);21012102submit:2103inc_unacked(mdev);2104spin_lock_irq(&mdev->req_lock);2105list_add_tail(&e->w.list, &mdev->read_ee);2106spin_unlock_irq(&mdev->req_lock);21072108if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)2109return true;21102111/* don't care for the reason here */2112dev_err(DEV, "submit failed, triggering re-connect\n");2113spin_lock_irq(&mdev->req_lock);2114list_del(&e->w.list);2115spin_unlock_irq(&mdev->req_lock);2116/* no drbd_rs_complete_io(), we are dropping the connection anyways */21172118out_free_e:2119put_ldev(mdev);2120drbd_free_ee(mdev, e);2121return false;2122}21232124static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)2125{2126int self, peer, rv = -100;2127unsigned long ch_self, ch_peer;21282129self = mdev->ldev->md.uuid[UI_BITMAP] & 1;2130peer = mdev->p_uuid[UI_BITMAP] & 1;21312132ch_peer = mdev->p_uuid[UI_SIZE];2133ch_self = mdev->comm_bm_set;21342135switch (mdev->net_conf->after_sb_0p) {2136case ASB_CONSENSUS:2137case ASB_DISCARD_SECONDARY:2138case ASB_CALL_HELPER:2139dev_err(DEV, "Configuration error.\n");2140break;2141case ASB_DISCONNECT:2142break;2143case ASB_DISCARD_YOUNGER_PRI:2144if (self == 0 && peer == 1) {2145rv = -1;2146break;2147}2148if (self == 1 && peer == 0) {2149rv = 1;2150break;2151}2152/* Else fall through to one of the other strategies... */2153case ASB_DISCARD_OLDER_PRI:2154if (self == 0 && peer == 1) {2155rv = 1;2156break;2157}2158if (self == 1 && peer == 0) {2159rv = -1;2160break;2161}2162/* Else fall through to one of the other strategies... */2163dev_warn(DEV, "Discard younger/older primary did not find a decision\n"2164"Using discard-least-changes instead\n");2165case ASB_DISCARD_ZERO_CHG:2166if (ch_peer == 0 && ch_self == 0) {2167rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)2168? -1 : 1;2169break;2170} else {2171if (ch_peer == 0) { rv = 1; break; }2172if (ch_self == 0) { rv = -1; break; }2173}2174if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)2175break;2176case ASB_DISCARD_LEAST_CHG:2177if (ch_self < ch_peer)2178rv = -1;2179else if (ch_self > ch_peer)2180rv = 1;2181else /* ( ch_self == ch_peer ) */2182/* Well, then use something else. */2183rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)2184? -1 : 1;2185break;2186case ASB_DISCARD_LOCAL:2187rv = -1;2188break;2189case ASB_DISCARD_REMOTE:2190rv = 1;2191}21922193return rv;2194}21952196static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)2197{2198int hg, rv = -100;21992200switch (mdev->net_conf->after_sb_1p) {2201case ASB_DISCARD_YOUNGER_PRI:2202case ASB_DISCARD_OLDER_PRI:2203case ASB_DISCARD_LEAST_CHG:2204case ASB_DISCARD_LOCAL:2205case ASB_DISCARD_REMOTE:2206dev_err(DEV, "Configuration error.\n");2207break;2208case ASB_DISCONNECT:2209break;2210case ASB_CONSENSUS:2211hg = drbd_asb_recover_0p(mdev);2212if (hg == -1 && mdev->state.role == R_SECONDARY)2213rv = hg;2214if (hg == 1 && mdev->state.role == R_PRIMARY)2215rv = hg;2216break;2217case ASB_VIOLENTLY:2218rv = drbd_asb_recover_0p(mdev);2219break;2220case ASB_DISCARD_SECONDARY:2221return mdev->state.role == R_PRIMARY ? 1 : -1;2222case ASB_CALL_HELPER:2223hg = drbd_asb_recover_0p(mdev);2224if (hg == -1 && mdev->state.role == R_PRIMARY) {2225enum drbd_state_rv rv2;22262227drbd_set_role(mdev, R_SECONDARY, 0);2228/* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,2229* we might be here in C_WF_REPORT_PARAMS which is transient.2230* we do not need to wait for the after state change work either. */2231rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));2232if (rv2 != SS_SUCCESS) {2233drbd_khelper(mdev, "pri-lost-after-sb");2234} else {2235dev_warn(DEV, "Successfully gave up primary role.\n");2236rv = hg;2237}2238} else2239rv = hg;2240}22412242return rv;2243}22442245static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)2246{2247int hg, rv = -100;22482249switch (mdev->net_conf->after_sb_2p) {2250case ASB_DISCARD_YOUNGER_PRI:2251case ASB_DISCARD_OLDER_PRI:2252case ASB_DISCARD_LEAST_CHG:2253case ASB_DISCARD_LOCAL:2254case ASB_DISCARD_REMOTE:2255case ASB_CONSENSUS:2256case ASB_DISCARD_SECONDARY:2257dev_err(DEV, "Configuration error.\n");2258break;2259case ASB_VIOLENTLY:2260rv = drbd_asb_recover_0p(mdev);2261break;2262case ASB_DISCONNECT:2263break;2264case ASB_CALL_HELPER:2265hg = drbd_asb_recover_0p(mdev);2266if (hg == -1) {2267enum drbd_state_rv rv2;22682269/* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,2270* we might be here in C_WF_REPORT_PARAMS which is transient.2271* we do not need to wait for the after state change work either. */2272rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));2273if (rv2 != SS_SUCCESS) {2274drbd_khelper(mdev, "pri-lost-after-sb");2275} else {2276dev_warn(DEV, "Successfully gave up primary role.\n");2277rv = hg;2278}2279} else2280rv = hg;2281}22822283return rv;2284}22852286static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,2287u64 bits, u64 flags)2288{2289if (!uuid) {2290dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);2291return;2292}2293dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",2294text,2295(unsigned long long)uuid[UI_CURRENT],2296(unsigned long long)uuid[UI_BITMAP],2297(unsigned long long)uuid[UI_HISTORY_START],2298(unsigned long long)uuid[UI_HISTORY_END],2299(unsigned long long)bits,2300(unsigned long long)flags);2301}23022303/*2304100 after split brain try auto recover23052 C_SYNC_SOURCE set BitMap23061 C_SYNC_SOURCE use BitMap23070 no Sync2308-1 C_SYNC_TARGET use BitMap2309-2 C_SYNC_TARGET set BitMap2310-100 after split brain, disconnect2311-1000 unrelated data2312-1091 requires proto 912313-1096 requires proto 962314*/2315static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)2316{2317u64 self, peer;2318int i, j;23192320self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);2321peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);23222323*rule_nr = 10;2324if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)2325return 0;23262327*rule_nr = 20;2328if ((self == UUID_JUST_CREATED || self == (u64)0) &&2329peer != UUID_JUST_CREATED)2330return -2;23312332*rule_nr = 30;2333if (self != UUID_JUST_CREATED &&2334(peer == UUID_JUST_CREATED || peer == (u64)0))2335return 2;23362337if (self == peer) {2338int rct, dc; /* roles at crash time */23392340if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {23412342if (mdev->agreed_pro_version < 91)2343return -1091;23442345if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&2346(mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {2347dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");2348drbd_uuid_set_bm(mdev, 0UL);23492350drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,2351mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);2352*rule_nr = 34;2353} else {2354dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");2355*rule_nr = 36;2356}23572358return 1;2359}23602361if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {23622363if (mdev->agreed_pro_version < 91)2364return -1091;23652366if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&2367(mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {2368dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");23692370mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];2371mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];2372mdev->p_uuid[UI_BITMAP] = 0UL;23732374drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);2375*rule_nr = 35;2376} else {2377dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");2378*rule_nr = 37;2379}23802381return -1;2382}23832384/* Common power [off|failure] */2385rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +2386(mdev->p_uuid[UI_FLAGS] & 2);2387/* lowest bit is set when we were primary,2388* next bit (weight 2) is set when peer was primary */2389*rule_nr = 40;23902391switch (rct) {2392case 0: /* !self_pri && !peer_pri */ return 0;2393case 1: /* self_pri && !peer_pri */ return 1;2394case 2: /* !self_pri && peer_pri */ return -1;2395case 3: /* self_pri && peer_pri */2396dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);2397return dc ? -1 : 1;2398}2399}24002401*rule_nr = 50;2402peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);2403if (self == peer)2404return -1;24052406*rule_nr = 51;2407peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);2408if (self == peer) {2409if (mdev->agreed_pro_version < 96 ?2410(mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==2411(mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :2412peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {2413/* The last P_SYNC_UUID did not get though. Undo the last start of2414resync as sync source modifications of the peer's UUIDs. */24152416if (mdev->agreed_pro_version < 91)2417return -1091;24182419mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];2420mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];24212422dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");2423drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);24242425return -1;2426}2427}24282429*rule_nr = 60;2430self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);2431for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {2432peer = mdev->p_uuid[i] & ~((u64)1);2433if (self == peer)2434return -2;2435}24362437*rule_nr = 70;2438self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);2439peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);2440if (self == peer)2441return 1;24422443*rule_nr = 71;2444self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);2445if (self == peer) {2446if (mdev->agreed_pro_version < 96 ?2447(mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==2448(mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :2449self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {2450/* The last P_SYNC_UUID did not get though. Undo the last start of2451resync as sync source modifications of our UUIDs. */24522453if (mdev->agreed_pro_version < 91)2454return -1091;24552456_drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);2457_drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);24582459dev_info(DEV, "Last syncUUID did not get through, corrected:\n");2460drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,2461mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);24622463return 1;2464}2465}246624672468*rule_nr = 80;2469peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);2470for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {2471self = mdev->ldev->md.uuid[i] & ~((u64)1);2472if (self == peer)2473return 2;2474}24752476*rule_nr = 90;2477self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);2478peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);2479if (self == peer && self != ((u64)0))2480return 100;24812482*rule_nr = 100;2483for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {2484self = mdev->ldev->md.uuid[i] & ~((u64)1);2485for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {2486peer = mdev->p_uuid[j] & ~((u64)1);2487if (self == peer)2488return -100;2489}2490}24912492return -1000;2493}24942495/* drbd_sync_handshake() returns the new conn state on success, or2496CONN_MASK (-1) on failure.2497*/2498static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,2499enum drbd_disk_state peer_disk) __must_hold(local)2500{2501int hg, rule_nr;2502enum drbd_conns rv = C_MASK;2503enum drbd_disk_state mydisk;25042505mydisk = mdev->state.disk;2506if (mydisk == D_NEGOTIATING)2507mydisk = mdev->new_state_tmp.disk;25082509dev_info(DEV, "drbd_sync_handshake:\n");2510drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);2511drbd_uuid_dump(mdev, "peer", mdev->p_uuid,2512mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);25132514hg = drbd_uuid_compare(mdev, &rule_nr);25152516dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);25172518if (hg == -1000) {2519dev_alert(DEV, "Unrelated data, aborting!\n");2520return C_MASK;2521}2522if (hg < -1000) {2523dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);2524return C_MASK;2525}25262527if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||2528(peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {2529int f = (hg == -100) || abs(hg) == 2;2530hg = mydisk > D_INCONSISTENT ? 1 : -1;2531if (f)2532hg = hg*2;2533dev_info(DEV, "Becoming sync %s due to disk states.\n",2534hg > 0 ? "source" : "target");2535}25362537if (abs(hg) == 100)2538drbd_khelper(mdev, "initial-split-brain");25392540if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {2541int pcount = (mdev->state.role == R_PRIMARY)2542+ (peer_role == R_PRIMARY);2543int forced = (hg == -100);25442545switch (pcount) {2546case 0:2547hg = drbd_asb_recover_0p(mdev);2548break;2549case 1:2550hg = drbd_asb_recover_1p(mdev);2551break;2552case 2:2553hg = drbd_asb_recover_2p(mdev);2554break;2555}2556if (abs(hg) < 100) {2557dev_warn(DEV, "Split-Brain detected, %d primaries, "2558"automatically solved. Sync from %s node\n",2559pcount, (hg < 0) ? "peer" : "this");2560if (forced) {2561dev_warn(DEV, "Doing a full sync, since"2562" UUIDs where ambiguous.\n");2563hg = hg*2;2564}2565}2566}25672568if (hg == -100) {2569if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))2570hg = -1;2571if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))2572hg = 1;25732574if (abs(hg) < 100)2575dev_warn(DEV, "Split-Brain detected, manually solved. "2576"Sync from %s node\n",2577(hg < 0) ? "peer" : "this");2578}25792580if (hg == -100) {2581/* FIXME this log message is not correct if we end up here2582* after an attempted attach on a diskless node.2583* We just refuse to attach -- well, we drop the "connection"2584* to that disk, in a way... */2585dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");2586drbd_khelper(mdev, "split-brain");2587return C_MASK;2588}25892590if (hg > 0 && mydisk <= D_INCONSISTENT) {2591dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");2592return C_MASK;2593}25942595if (hg < 0 && /* by intention we do not use mydisk here. */2596mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {2597switch (mdev->net_conf->rr_conflict) {2598case ASB_CALL_HELPER:2599drbd_khelper(mdev, "pri-lost");2600/* fall through */2601case ASB_DISCONNECT:2602dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");2603return C_MASK;2604case ASB_VIOLENTLY:2605dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"2606"assumption\n");2607}2608}26092610if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {2611if (hg == 0)2612dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");2613else2614dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",2615drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),2616abs(hg) >= 2 ? "full" : "bit-map based");2617return C_MASK;2618}26192620if (abs(hg) >= 2) {2621dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");2622if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",2623BM_LOCKED_SET_ALLOWED))2624return C_MASK;2625}26262627if (hg > 0) { /* become sync source. */2628rv = C_WF_BITMAP_S;2629} else if (hg < 0) { /* become sync target */2630rv = C_WF_BITMAP_T;2631} else {2632rv = C_CONNECTED;2633if (drbd_bm_total_weight(mdev)) {2634dev_info(DEV, "No resync, but %lu bits in bitmap!\n",2635drbd_bm_total_weight(mdev));2636}2637}26382639return rv;2640}26412642/* returns 1 if invalid */2643static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)2644{2645/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */2646if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||2647(self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))2648return 0;26492650/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */2651if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||2652self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)2653return 1;26542655/* everything else is valid if they are equal on both sides. */2656if (peer == self)2657return 0;26582659/* everything es is invalid. */2660return 1;2661}26622663static int receive_protocol(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)2664{2665struct p_protocol *p = &mdev->data.rbuf.protocol;2666int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;2667int p_want_lose, p_two_primaries, cf;2668char p_integrity_alg[SHARED_SECRET_MAX] = "";26692670p_proto = be32_to_cpu(p->protocol);2671p_after_sb_0p = be32_to_cpu(p->after_sb_0p);2672p_after_sb_1p = be32_to_cpu(p->after_sb_1p);2673p_after_sb_2p = be32_to_cpu(p->after_sb_2p);2674p_two_primaries = be32_to_cpu(p->two_primaries);2675cf = be32_to_cpu(p->conn_flags);2676p_want_lose = cf & CF_WANT_LOSE;26772678clear_bit(CONN_DRY_RUN, &mdev->flags);26792680if (cf & CF_DRY_RUN)2681set_bit(CONN_DRY_RUN, &mdev->flags);26822683if (p_proto != mdev->net_conf->wire_protocol) {2684dev_err(DEV, "incompatible communication protocols\n");2685goto disconnect;2686}26872688if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {2689dev_err(DEV, "incompatible after-sb-0pri settings\n");2690goto disconnect;2691}26922693if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {2694dev_err(DEV, "incompatible after-sb-1pri settings\n");2695goto disconnect;2696}26972698if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {2699dev_err(DEV, "incompatible after-sb-2pri settings\n");2700goto disconnect;2701}27022703if (p_want_lose && mdev->net_conf->want_lose) {2704dev_err(DEV, "both sides have the 'want_lose' flag set\n");2705goto disconnect;2706}27072708if (p_two_primaries != mdev->net_conf->two_primaries) {2709dev_err(DEV, "incompatible setting of the two-primaries options\n");2710goto disconnect;2711}27122713if (mdev->agreed_pro_version >= 87) {2714unsigned char *my_alg = mdev->net_conf->integrity_alg;27152716if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)2717return false;27182719p_integrity_alg[SHARED_SECRET_MAX-1] = 0;2720if (strcmp(p_integrity_alg, my_alg)) {2721dev_err(DEV, "incompatible setting of the data-integrity-alg\n");2722goto disconnect;2723}2724dev_info(DEV, "data-integrity-alg: %s\n",2725my_alg[0] ? my_alg : (unsigned char *)"<not-used>");2726}27272728return true;27292730disconnect:2731drbd_force_state(mdev, NS(conn, C_DISCONNECTING));2732return false;2733}27342735/* helper function2736* input: alg name, feature name2737* return: NULL (alg name was "")2738* ERR_PTR(error) if something goes wrong2739* or the crypto hash ptr, if it worked out ok. */2740struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,2741const char *alg, const char *name)2742{2743struct crypto_hash *tfm;27442745if (!alg[0])2746return NULL;27472748tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);2749if (IS_ERR(tfm)) {2750dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",2751alg, name, PTR_ERR(tfm));2752return tfm;2753}2754if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {2755crypto_free_hash(tfm);2756dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);2757return ERR_PTR(-EINVAL);2758}2759return tfm;2760}27612762static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int packet_size)2763{2764int ok = true;2765struct p_rs_param_95 *p = &mdev->data.rbuf.rs_param_95;2766unsigned int header_size, data_size, exp_max_sz;2767struct crypto_hash *verify_tfm = NULL;2768struct crypto_hash *csums_tfm = NULL;2769const int apv = mdev->agreed_pro_version;2770int *rs_plan_s = NULL;2771int fifo_size = 0;27722773exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)2774: apv == 88 ? sizeof(struct p_rs_param)2775+ SHARED_SECRET_MAX2776: apv <= 94 ? sizeof(struct p_rs_param_89)2777: /* apv >= 95 */ sizeof(struct p_rs_param_95);27782779if (packet_size > exp_max_sz) {2780dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",2781packet_size, exp_max_sz);2782return false;2783}27842785if (apv <= 88) {2786header_size = sizeof(struct p_rs_param) - sizeof(struct p_header80);2787data_size = packet_size - header_size;2788} else if (apv <= 94) {2789header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header80);2790data_size = packet_size - header_size;2791D_ASSERT(data_size == 0);2792} else {2793header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header80);2794data_size = packet_size - header_size;2795D_ASSERT(data_size == 0);2796}27972798/* initialize verify_alg and csums_alg */2799memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);28002801if (drbd_recv(mdev, &p->head.payload, header_size) != header_size)2802return false;28032804mdev->sync_conf.rate = be32_to_cpu(p->rate);28052806if (apv >= 88) {2807if (apv == 88) {2808if (data_size > SHARED_SECRET_MAX) {2809dev_err(DEV, "verify-alg too long, "2810"peer wants %u, accepting only %u byte\n",2811data_size, SHARED_SECRET_MAX);2812return false;2813}28142815if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)2816return false;28172818/* we expect NUL terminated string */2819/* but just in case someone tries to be evil */2820D_ASSERT(p->verify_alg[data_size-1] == 0);2821p->verify_alg[data_size-1] = 0;28222823} else /* apv >= 89 */ {2824/* we still expect NUL terminated strings */2825/* but just in case someone tries to be evil */2826D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);2827D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);2828p->verify_alg[SHARED_SECRET_MAX-1] = 0;2829p->csums_alg[SHARED_SECRET_MAX-1] = 0;2830}28312832if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {2833if (mdev->state.conn == C_WF_REPORT_PARAMS) {2834dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",2835mdev->sync_conf.verify_alg, p->verify_alg);2836goto disconnect;2837}2838verify_tfm = drbd_crypto_alloc_digest_safe(mdev,2839p->verify_alg, "verify-alg");2840if (IS_ERR(verify_tfm)) {2841verify_tfm = NULL;2842goto disconnect;2843}2844}28452846if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {2847if (mdev->state.conn == C_WF_REPORT_PARAMS) {2848dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",2849mdev->sync_conf.csums_alg, p->csums_alg);2850goto disconnect;2851}2852csums_tfm = drbd_crypto_alloc_digest_safe(mdev,2853p->csums_alg, "csums-alg");2854if (IS_ERR(csums_tfm)) {2855csums_tfm = NULL;2856goto disconnect;2857}2858}28592860if (apv > 94) {2861mdev->sync_conf.rate = be32_to_cpu(p->rate);2862mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);2863mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target);2864mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target);2865mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate);28662867fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ;2868if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {2869rs_plan_s = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);2870if (!rs_plan_s) {2871dev_err(DEV, "kmalloc of fifo_buffer failed");2872goto disconnect;2873}2874}2875}28762877spin_lock(&mdev->peer_seq_lock);2878/* lock against drbd_nl_syncer_conf() */2879if (verify_tfm) {2880strcpy(mdev->sync_conf.verify_alg, p->verify_alg);2881mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;2882crypto_free_hash(mdev->verify_tfm);2883mdev->verify_tfm = verify_tfm;2884dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);2885}2886if (csums_tfm) {2887strcpy(mdev->sync_conf.csums_alg, p->csums_alg);2888mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;2889crypto_free_hash(mdev->csums_tfm);2890mdev->csums_tfm = csums_tfm;2891dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);2892}2893if (fifo_size != mdev->rs_plan_s.size) {2894kfree(mdev->rs_plan_s.values);2895mdev->rs_plan_s.values = rs_plan_s;2896mdev->rs_plan_s.size = fifo_size;2897mdev->rs_planed = 0;2898}2899spin_unlock(&mdev->peer_seq_lock);2900}29012902return ok;2903disconnect:2904/* just for completeness: actually not needed,2905* as this is not reached if csums_tfm was ok. */2906crypto_free_hash(csums_tfm);2907/* but free the verify_tfm again, if csums_tfm did not work out */2908crypto_free_hash(verify_tfm);2909drbd_force_state(mdev, NS(conn, C_DISCONNECTING));2910return false;2911}29122913/* warn if the arguments differ by more than 12.5% */2914static void warn_if_differ_considerably(struct drbd_conf *mdev,2915const char *s, sector_t a, sector_t b)2916{2917sector_t d;2918if (a == 0 || b == 0)2919return;2920d = (a > b) ? (a - b) : (b - a);2921if (d > (a>>3) || d > (b>>3))2922dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,2923(unsigned long long)a, (unsigned long long)b);2924}29252926static int receive_sizes(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)2927{2928struct p_sizes *p = &mdev->data.rbuf.sizes;2929enum determine_dev_size dd = unchanged;2930sector_t p_size, p_usize, my_usize;2931int ldsc = 0; /* local disk size changed */2932enum dds_flags ddsf;29332934p_size = be64_to_cpu(p->d_size);2935p_usize = be64_to_cpu(p->u_size);29362937if (p_size == 0 && mdev->state.disk == D_DISKLESS) {2938dev_err(DEV, "some backing storage is needed\n");2939drbd_force_state(mdev, NS(conn, C_DISCONNECTING));2940return false;2941}29422943/* just store the peer's disk size for now.2944* we still need to figure out whether we accept that. */2945mdev->p_size = p_size;29462947if (get_ldev(mdev)) {2948warn_if_differ_considerably(mdev, "lower level device sizes",2949p_size, drbd_get_max_capacity(mdev->ldev));2950warn_if_differ_considerably(mdev, "user requested size",2951p_usize, mdev->ldev->dc.disk_size);29522953/* if this is the first connect, or an otherwise expected2954* param exchange, choose the minimum */2955if (mdev->state.conn == C_WF_REPORT_PARAMS)2956p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,2957p_usize);29582959my_usize = mdev->ldev->dc.disk_size;29602961if (mdev->ldev->dc.disk_size != p_usize) {2962mdev->ldev->dc.disk_size = p_usize;2963dev_info(DEV, "Peer sets u_size to %lu sectors\n",2964(unsigned long)mdev->ldev->dc.disk_size);2965}29662967/* Never shrink a device with usable data during connect.2968But allow online shrinking if we are connected. */2969if (drbd_new_dev_size(mdev, mdev->ldev, 0) <2970drbd_get_capacity(mdev->this_bdev) &&2971mdev->state.disk >= D_OUTDATED &&2972mdev->state.conn < C_CONNECTED) {2973dev_err(DEV, "The peer's disk size is too small!\n");2974drbd_force_state(mdev, NS(conn, C_DISCONNECTING));2975mdev->ldev->dc.disk_size = my_usize;2976put_ldev(mdev);2977return false;2978}2979put_ldev(mdev);2980}29812982ddsf = be16_to_cpu(p->dds_flags);2983if (get_ldev(mdev)) {2984dd = drbd_determine_dev_size(mdev, ddsf);2985put_ldev(mdev);2986if (dd == dev_size_error)2987return false;2988drbd_md_sync(mdev);2989} else {2990/* I am diskless, need to accept the peer's size. */2991drbd_set_my_capacity(mdev, p_size);2992}29932994mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);2995drbd_reconsider_max_bio_size(mdev);29962997if (get_ldev(mdev)) {2998if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {2999mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);3000ldsc = 1;3001}30023003put_ldev(mdev);3004}30053006if (mdev->state.conn > C_WF_REPORT_PARAMS) {3007if (be64_to_cpu(p->c_size) !=3008drbd_get_capacity(mdev->this_bdev) || ldsc) {3009/* we have different sizes, probably peer3010* needs to know my new size... */3011drbd_send_sizes(mdev, 0, ddsf);3012}3013if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||3014(dd == grew && mdev->state.conn == C_CONNECTED)) {3015if (mdev->state.pdsk >= D_INCONSISTENT &&3016mdev->state.disk >= D_INCONSISTENT) {3017if (ddsf & DDSF_NO_RESYNC)3018dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");3019else3020resync_after_online_grow(mdev);3021} else3022set_bit(RESYNC_AFTER_NEG, &mdev->flags);3023}3024}30253026return true;3027}30283029static int receive_uuids(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)3030{3031struct p_uuids *p = &mdev->data.rbuf.uuids;3032u64 *p_uuid;3033int i, updated_uuids = 0;30343035p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);30363037for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)3038p_uuid[i] = be64_to_cpu(p->uuid[i]);30393040kfree(mdev->p_uuid);3041mdev->p_uuid = p_uuid;30423043if (mdev->state.conn < C_CONNECTED &&3044mdev->state.disk < D_INCONSISTENT &&3045mdev->state.role == R_PRIMARY &&3046(mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {3047dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",3048(unsigned long long)mdev->ed_uuid);3049drbd_force_state(mdev, NS(conn, C_DISCONNECTING));3050return false;3051}30523053if (get_ldev(mdev)) {3054int skip_initial_sync =3055mdev->state.conn == C_CONNECTED &&3056mdev->agreed_pro_version >= 90 &&3057mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&3058(p_uuid[UI_FLAGS] & 8);3059if (skip_initial_sync) {3060dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");3061drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,3062"clear_n_write from receive_uuids",3063BM_LOCKED_TEST_ALLOWED);3064_drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);3065_drbd_uuid_set(mdev, UI_BITMAP, 0);3066_drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),3067CS_VERBOSE, NULL);3068drbd_md_sync(mdev);3069updated_uuids = 1;3070}3071put_ldev(mdev);3072} else if (mdev->state.disk < D_INCONSISTENT &&3073mdev->state.role == R_PRIMARY) {3074/* I am a diskless primary, the peer just created a new current UUID3075for me. */3076updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);3077}30783079/* Before we test for the disk state, we should wait until an eventually3080ongoing cluster wide state change is finished. That is important if3081we are primary and are detaching from our disk. We need to see the3082new disk state... */3083wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));3084if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)3085updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);30863087if (updated_uuids)3088drbd_print_uuids(mdev, "receiver updated UUIDs to");30893090return true;3091}30923093/**3094* convert_state() - Converts the peer's view of the cluster state to our point of view3095* @ps: The state as seen by the peer.3096*/3097static union drbd_state convert_state(union drbd_state ps)3098{3099union drbd_state ms;31003101static enum drbd_conns c_tab[] = {3102[C_CONNECTED] = C_CONNECTED,31033104[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,3105[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,3106[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */3107[C_VERIFY_S] = C_VERIFY_T,3108[C_MASK] = C_MASK,3109};31103111ms.i = ps.i;31123113ms.conn = c_tab[ps.conn];3114ms.peer = ps.role;3115ms.role = ps.peer;3116ms.pdsk = ps.disk;3117ms.disk = ps.pdsk;3118ms.peer_isp = (ps.aftr_isp | ps.user_isp);31193120return ms;3121}31223123static int receive_req_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)3124{3125struct p_req_state *p = &mdev->data.rbuf.req_state;3126union drbd_state mask, val;3127enum drbd_state_rv rv;31283129mask.i = be32_to_cpu(p->mask);3130val.i = be32_to_cpu(p->val);31313132if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&3133test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {3134drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);3135return true;3136}31373138mask = convert_state(mask);3139val = convert_state(val);31403141rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);31423143drbd_send_sr_reply(mdev, rv);3144drbd_md_sync(mdev);31453146return true;3147}31483149static int receive_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)3150{3151struct p_state *p = &mdev->data.rbuf.state;3152union drbd_state os, ns, peer_state;3153enum drbd_disk_state real_peer_disk;3154enum chg_state_flags cs_flags;3155int rv;31563157peer_state.i = be32_to_cpu(p->state);31583159real_peer_disk = peer_state.disk;3160if (peer_state.disk == D_NEGOTIATING) {3161real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;3162dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));3163}31643165spin_lock_irq(&mdev->req_lock);3166retry:3167os = ns = mdev->state;3168spin_unlock_irq(&mdev->req_lock);31693170/* peer says his disk is uptodate, while we think it is inconsistent,3171* and this happens while we think we have a sync going on. */3172if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&3173os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {3174/* If we are (becoming) SyncSource, but peer is still in sync3175* preparation, ignore its uptodate-ness to avoid flapping, it3176* will change to inconsistent once the peer reaches active3177* syncing states.3178* It may have changed syncer-paused flags, however, so we3179* cannot ignore this completely. */3180if (peer_state.conn > C_CONNECTED &&3181peer_state.conn < C_SYNC_SOURCE)3182real_peer_disk = D_INCONSISTENT;31833184/* if peer_state changes to connected at the same time,3185* it explicitly notifies us that it finished resync.3186* Maybe we should finish it up, too? */3187else if (os.conn >= C_SYNC_SOURCE &&3188peer_state.conn == C_CONNECTED) {3189if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)3190drbd_resync_finished(mdev);3191return true;3192}3193}31943195/* peer says his disk is inconsistent, while we think it is uptodate,3196* and this happens while the peer still thinks we have a sync going on,3197* but we think we are already done with the sync.3198* We ignore this to avoid flapping pdsk.3199* This should not happen, if the peer is a recent version of drbd. */3200if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&3201os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)3202real_peer_disk = D_UP_TO_DATE;32033204if (ns.conn == C_WF_REPORT_PARAMS)3205ns.conn = C_CONNECTED;32063207if (peer_state.conn == C_AHEAD)3208ns.conn = C_BEHIND;32093210if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&3211get_ldev_if_state(mdev, D_NEGOTIATING)) {3212int cr; /* consider resync */32133214/* if we established a new connection */3215cr = (os.conn < C_CONNECTED);3216/* if we had an established connection3217* and one of the nodes newly attaches a disk */3218cr |= (os.conn == C_CONNECTED &&3219(peer_state.disk == D_NEGOTIATING ||3220os.disk == D_NEGOTIATING));3221/* if we have both been inconsistent, and the peer has been3222* forced to be UpToDate with --overwrite-data */3223cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);3224/* if we had been plain connected, and the admin requested to3225* start a sync by "invalidate" or "invalidate-remote" */3226cr |= (os.conn == C_CONNECTED &&3227(peer_state.conn >= C_STARTING_SYNC_S &&3228peer_state.conn <= C_WF_BITMAP_T));32293230if (cr)3231ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);32323233put_ldev(mdev);3234if (ns.conn == C_MASK) {3235ns.conn = C_CONNECTED;3236if (mdev->state.disk == D_NEGOTIATING) {3237drbd_force_state(mdev, NS(disk, D_FAILED));3238} else if (peer_state.disk == D_NEGOTIATING) {3239dev_err(DEV, "Disk attach process on the peer node was aborted.\n");3240peer_state.disk = D_DISKLESS;3241real_peer_disk = D_DISKLESS;3242} else {3243if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))3244return false;3245D_ASSERT(os.conn == C_WF_REPORT_PARAMS);3246drbd_force_state(mdev, NS(conn, C_DISCONNECTING));3247return false;3248}3249}3250}32513252spin_lock_irq(&mdev->req_lock);3253if (mdev->state.i != os.i)3254goto retry;3255clear_bit(CONSIDER_RESYNC, &mdev->flags);3256ns.peer = peer_state.role;3257ns.pdsk = real_peer_disk;3258ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);3259if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)3260ns.disk = mdev->new_state_tmp.disk;3261cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);3262if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&3263test_bit(NEW_CUR_UUID, &mdev->flags)) {3264/* Do not allow tl_restart(resend) for a rebooted peer. We can only allow this3265for temporal network outages! */3266spin_unlock_irq(&mdev->req_lock);3267dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");3268tl_clear(mdev);3269drbd_uuid_new_current(mdev);3270clear_bit(NEW_CUR_UUID, &mdev->flags);3271drbd_force_state(mdev, NS2(conn, C_PROTOCOL_ERROR, susp, 0));3272return false;3273}3274rv = _drbd_set_state(mdev, ns, cs_flags, NULL);3275ns = mdev->state;3276spin_unlock_irq(&mdev->req_lock);32773278if (rv < SS_SUCCESS) {3279drbd_force_state(mdev, NS(conn, C_DISCONNECTING));3280return false;3281}32823283if (os.conn > C_WF_REPORT_PARAMS) {3284if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&3285peer_state.disk != D_NEGOTIATING ) {3286/* we want resync, peer has not yet decided to sync... */3287/* Nowadays only used when forcing a node into primary role and3288setting its disk to UpToDate with that */3289drbd_send_uuids(mdev);3290drbd_send_state(mdev);3291}3292}32933294mdev->net_conf->want_lose = 0;32953296drbd_md_sync(mdev); /* update connected indicator, la_size, ... */32973298return true;3299}33003301static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)3302{3303struct p_rs_uuid *p = &mdev->data.rbuf.rs_uuid;33043305wait_event(mdev->misc_wait,3306mdev->state.conn == C_WF_SYNC_UUID ||3307mdev->state.conn == C_BEHIND ||3308mdev->state.conn < C_CONNECTED ||3309mdev->state.disk < D_NEGOTIATING);33103311/* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */33123313/* Here the _drbd_uuid_ functions are right, current should3314_not_ be rotated into the history */3315if (get_ldev_if_state(mdev, D_NEGOTIATING)) {3316_drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));3317_drbd_uuid_set(mdev, UI_BITMAP, 0UL);33183319drbd_print_uuids(mdev, "updated sync uuid");3320drbd_start_resync(mdev, C_SYNC_TARGET);33213322put_ldev(mdev);3323} else3324dev_err(DEV, "Ignoring SyncUUID packet!\n");33253326return true;3327}33283329/**3330* receive_bitmap_plain3331*3332* Return 0 when done, 1 when another iteration is needed, and a negative error3333* code upon failure.3334*/3335static int3336receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,3337unsigned long *buffer, struct bm_xfer_ctx *c)3338{3339unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);3340unsigned want = num_words * sizeof(long);3341int err;33423343if (want != data_size) {3344dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);3345return -EIO;3346}3347if (want == 0)3348return 0;3349err = drbd_recv(mdev, buffer, want);3350if (err != want) {3351if (err >= 0)3352err = -EIO;3353return err;3354}33553356drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);33573358c->word_offset += num_words;3359c->bit_offset = c->word_offset * BITS_PER_LONG;3360if (c->bit_offset > c->bm_bits)3361c->bit_offset = c->bm_bits;33623363return 1;3364}33653366/**3367* recv_bm_rle_bits3368*3369* Return 0 when done, 1 when another iteration is needed, and a negative error3370* code upon failure.3371*/3372static int3373recv_bm_rle_bits(struct drbd_conf *mdev,3374struct p_compressed_bm *p,3375struct bm_xfer_ctx *c)3376{3377struct bitstream bs;3378u64 look_ahead;3379u64 rl;3380u64 tmp;3381unsigned long s = c->bit_offset;3382unsigned long e;3383int len = be16_to_cpu(p->head.length) - (sizeof(*p) - sizeof(p->head));3384int toggle = DCBP_get_start(p);3385int have;3386int bits;33873388bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));33893390bits = bitstream_get_bits(&bs, &look_ahead, 64);3391if (bits < 0)3392return -EIO;33933394for (have = bits; have > 0; s += rl, toggle = !toggle) {3395bits = vli_decode_bits(&rl, look_ahead);3396if (bits <= 0)3397return -EIO;33983399if (toggle) {3400e = s + rl -1;3401if (e >= c->bm_bits) {3402dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);3403return -EIO;3404}3405_drbd_bm_set_bits(mdev, s, e);3406}34073408if (have < bits) {3409dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",3410have, bits, look_ahead,3411(unsigned int)(bs.cur.b - p->code),3412(unsigned int)bs.buf_len);3413return -EIO;3414}3415look_ahead >>= bits;3416have -= bits;34173418bits = bitstream_get_bits(&bs, &tmp, 64 - have);3419if (bits < 0)3420return -EIO;3421look_ahead |= tmp << have;3422have += bits;3423}34243425c->bit_offset = s;3426bm_xfer_ctx_bit_to_word_offset(c);34273428return (s != c->bm_bits);3429}34303431/**3432* decode_bitmap_c3433*3434* Return 0 when done, 1 when another iteration is needed, and a negative error3435* code upon failure.3436*/3437static int3438decode_bitmap_c(struct drbd_conf *mdev,3439struct p_compressed_bm *p,3440struct bm_xfer_ctx *c)3441{3442if (DCBP_get_code(p) == RLE_VLI_Bits)3443return recv_bm_rle_bits(mdev, p, c);34443445/* other variants had been implemented for evaluation,3446* but have been dropped as this one turned out to be "best"3447* during all our tests. */34483449dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);3450drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));3451return -EIO;3452}34533454void INFO_bm_xfer_stats(struct drbd_conf *mdev,3455const char *direction, struct bm_xfer_ctx *c)3456{3457/* what would it take to transfer it "plaintext" */3458unsigned plain = sizeof(struct p_header80) *3459((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)3460+ c->bm_words * sizeof(long);3461unsigned total = c->bytes[0] + c->bytes[1];3462unsigned r;34633464/* total can not be zero. but just in case: */3465if (total == 0)3466return;34673468/* don't report if not compressed */3469if (total >= plain)3470return;34713472/* total < plain. check for overflow, still */3473r = (total > UINT_MAX/1000) ? (total / (plain/1000))3474: (1000 * total / plain);34753476if (r > 1000)3477r = 1000;34783479r = 1000 - r;3480dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "3481"total %u; compression: %u.%u%%\n",3482direction,3483c->bytes[1], c->packets[1],3484c->bytes[0], c->packets[0],3485total, r/10, r % 10);3486}34873488/* Since we are processing the bitfield from lower addresses to higher,3489it does not matter if the process it in 32 bit chunks or 64 bit3490chunks as long as it is little endian. (Understand it as byte stream,3491beginning with the lowest byte...) If we would use big endian3492we would need to process it from the highest address to the lowest,3493in order to be agnostic to the 32 vs 64 bits issue.34943495returns 0 on failure, 1 if we successfully received it. */3496static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)3497{3498struct bm_xfer_ctx c;3499void *buffer;3500int err;3501int ok = false;3502struct p_header80 *h = &mdev->data.rbuf.header.h80;35033504drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);3505/* you are supposed to send additional out-of-sync information3506* if you actually set bits during this phase */35073508/* maybe we should use some per thread scratch page,3509* and allocate that during initial device creation? */3510buffer = (unsigned long *) __get_free_page(GFP_NOIO);3511if (!buffer) {3512dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);3513goto out;3514}35153516c = (struct bm_xfer_ctx) {3517.bm_bits = drbd_bm_bits(mdev),3518.bm_words = drbd_bm_words(mdev),3519};35203521for(;;) {3522if (cmd == P_BITMAP) {3523err = receive_bitmap_plain(mdev, data_size, buffer, &c);3524} else if (cmd == P_COMPRESSED_BITMAP) {3525/* MAYBE: sanity check that we speak proto >= 90,3526* and the feature is enabled! */3527struct p_compressed_bm *p;35283529if (data_size > BM_PACKET_PAYLOAD_BYTES) {3530dev_err(DEV, "ReportCBitmap packet too large\n");3531goto out;3532}3533/* use the page buff */3534p = buffer;3535memcpy(p, h, sizeof(*h));3536if (drbd_recv(mdev, p->head.payload, data_size) != data_size)3537goto out;3538if (data_size <= (sizeof(*p) - sizeof(p->head))) {3539dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);3540goto out;3541}3542err = decode_bitmap_c(mdev, p, &c);3543} else {3544dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);3545goto out;3546}35473548c.packets[cmd == P_BITMAP]++;3549c.bytes[cmd == P_BITMAP] += sizeof(struct p_header80) + data_size;35503551if (err <= 0) {3552if (err < 0)3553goto out;3554break;3555}3556if (!drbd_recv_header(mdev, &cmd, &data_size))3557goto out;3558}35593560INFO_bm_xfer_stats(mdev, "receive", &c);35613562if (mdev->state.conn == C_WF_BITMAP_T) {3563enum drbd_state_rv rv;35643565ok = !drbd_send_bitmap(mdev);3566if (!ok)3567goto out;3568/* Omit CS_ORDERED with this state transition to avoid deadlocks. */3569rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);3570D_ASSERT(rv == SS_SUCCESS);3571} else if (mdev->state.conn != C_WF_BITMAP_S) {3572/* admin may have requested C_DISCONNECTING,3573* other threads may have noticed network errors */3574dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",3575drbd_conn_str(mdev->state.conn));3576}35773578ok = true;3579out:3580drbd_bm_unlock(mdev);3581if (ok && mdev->state.conn == C_WF_BITMAP_S)3582drbd_start_resync(mdev, C_SYNC_SOURCE);3583free_page((unsigned long) buffer);3584return ok;3585}35863587static int receive_skip(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)3588{3589/* TODO zero copy sink :) */3590static char sink[128];3591int size, want, r;35923593dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",3594cmd, data_size);35953596size = data_size;3597while (size > 0) {3598want = min_t(int, size, sizeof(sink));3599r = drbd_recv(mdev, sink, want);3600ERR_IF(r <= 0) break;3601size -= r;3602}3603return size == 0;3604}36053606static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)3607{3608/* Make sure we've acked all the TCP data associated3609* with the data requests being unplugged */3610drbd_tcp_quickack(mdev->data.socket);36113612return true;3613}36143615static int receive_out_of_sync(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)3616{3617struct p_block_desc *p = &mdev->data.rbuf.block_desc;36183619switch (mdev->state.conn) {3620case C_WF_SYNC_UUID:3621case C_WF_BITMAP_T:3622case C_BEHIND:3623break;3624default:3625dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",3626drbd_conn_str(mdev->state.conn));3627}36283629drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));36303631return true;3632}36333634typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, enum drbd_packets cmd, unsigned int to_receive);36353636struct data_cmd {3637int expect_payload;3638size_t pkt_size;3639drbd_cmd_handler_f function;3640};36413642static struct data_cmd drbd_cmd_handler[] = {3643[P_DATA] = { 1, sizeof(struct p_data), receive_Data },3644[P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },3645[P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,3646[P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,3647[P_BITMAP] = { 1, sizeof(struct p_header80), receive_bitmap } ,3648[P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header80), receive_bitmap } ,3649[P_UNPLUG_REMOTE] = { 0, sizeof(struct p_header80), receive_UnplugRemote },3650[P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },3651[P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },3652[P_SYNC_PARAM] = { 1, sizeof(struct p_header80), receive_SyncParam },3653[P_SYNC_PARAM89] = { 1, sizeof(struct p_header80), receive_SyncParam },3654[P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },3655[P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },3656[P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },3657[P_STATE] = { 0, sizeof(struct p_state), receive_state },3658[P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },3659[P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },3660[P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },3661[P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },3662[P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },3663[P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },3664[P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },3665/* anything missing from this table is in3666* the asender_tbl, see get_asender_cmd */3667[P_MAX_CMD] = { 0, 0, NULL },3668};36693670/* All handler functions that expect a sub-header get that sub-heder in3671mdev->data.rbuf.header.head.payload.36723673Usually in mdev->data.rbuf.header.head the callback can find the usual3674p_header, but they may not rely on that. Since there is also p_header95 !3675*/36763677static void drbdd(struct drbd_conf *mdev)3678{3679union p_header *header = &mdev->data.rbuf.header;3680unsigned int packet_size;3681enum drbd_packets cmd;3682size_t shs; /* sub header size */3683int rv;36843685while (get_t_state(&mdev->receiver) == Running) {3686drbd_thread_current_set_cpu(mdev);3687if (!drbd_recv_header(mdev, &cmd, &packet_size))3688goto err_out;36893690if (unlikely(cmd >= P_MAX_CMD || !drbd_cmd_handler[cmd].function)) {3691dev_err(DEV, "unknown packet type %d, l: %d!\n", cmd, packet_size);3692goto err_out;3693}36943695shs = drbd_cmd_handler[cmd].pkt_size - sizeof(union p_header);3696if (packet_size - shs > 0 && !drbd_cmd_handler[cmd].expect_payload) {3697dev_err(DEV, "No payload expected %s l:%d\n", cmdname(cmd), packet_size);3698goto err_out;3699}37003701if (shs) {3702rv = drbd_recv(mdev, &header->h80.payload, shs);3703if (unlikely(rv != shs)) {3704if (!signal_pending(current))3705dev_warn(DEV, "short read while reading sub header: rv=%d\n", rv);3706goto err_out;3707}3708}37093710rv = drbd_cmd_handler[cmd].function(mdev, cmd, packet_size - shs);37113712if (unlikely(!rv)) {3713dev_err(DEV, "error receiving %s, l: %d!\n",3714cmdname(cmd), packet_size);3715goto err_out;3716}3717}37183719if (0) {3720err_out:3721drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));3722}3723/* If we leave here, we probably want to update at least the3724* "Connected" indicator on stable storage. Do so explicitly here. */3725drbd_md_sync(mdev);3726}37273728void drbd_flush_workqueue(struct drbd_conf *mdev)3729{3730struct drbd_wq_barrier barr;37313732barr.w.cb = w_prev_work_done;3733init_completion(&barr.done);3734drbd_queue_work(&mdev->data.work, &barr.w);3735wait_for_completion(&barr.done);3736}37373738void drbd_free_tl_hash(struct drbd_conf *mdev)3739{3740struct hlist_head *h;37413742spin_lock_irq(&mdev->req_lock);37433744if (!mdev->tl_hash || mdev->state.conn != C_STANDALONE) {3745spin_unlock_irq(&mdev->req_lock);3746return;3747}3748/* paranoia code */3749for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)3750if (h->first)3751dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",3752(int)(h - mdev->ee_hash), h->first);3753kfree(mdev->ee_hash);3754mdev->ee_hash = NULL;3755mdev->ee_hash_s = 0;37563757/* paranoia code */3758for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)3759if (h->first)3760dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",3761(int)(h - mdev->tl_hash), h->first);3762kfree(mdev->tl_hash);3763mdev->tl_hash = NULL;3764mdev->tl_hash_s = 0;3765spin_unlock_irq(&mdev->req_lock);3766}37673768static void drbd_disconnect(struct drbd_conf *mdev)3769{3770enum drbd_fencing_p fp;3771union drbd_state os, ns;3772int rv = SS_UNKNOWN_ERROR;3773unsigned int i;37743775if (mdev->state.conn == C_STANDALONE)3776return;37773778/* asender does not clean up anything. it must not interfere, either */3779drbd_thread_stop(&mdev->asender);3780drbd_free_sock(mdev);37813782/* wait for current activity to cease. */3783spin_lock_irq(&mdev->req_lock);3784_drbd_wait_ee_list_empty(mdev, &mdev->active_ee);3785_drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);3786_drbd_wait_ee_list_empty(mdev, &mdev->read_ee);3787spin_unlock_irq(&mdev->req_lock);37883789/* We do not have data structures that would allow us to3790* get the rs_pending_cnt down to 0 again.3791* * On C_SYNC_TARGET we do not have any data structures describing3792* the pending RSDataRequest's we have sent.3793* * On C_SYNC_SOURCE there is no data structure that tracks3794* the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.3795* And no, it is not the sum of the reference counts in the3796* resync_LRU. The resync_LRU tracks the whole operation including3797* the disk-IO, while the rs_pending_cnt only tracks the blocks3798* on the fly. */3799drbd_rs_cancel_all(mdev);3800mdev->rs_total = 0;3801mdev->rs_failed = 0;3802atomic_set(&mdev->rs_pending_cnt, 0);3803wake_up(&mdev->misc_wait);38043805del_timer(&mdev->request_timer);38063807/* make sure syncer is stopped and w_resume_next_sg queued */3808del_timer_sync(&mdev->resync_timer);3809resync_timer_fn((unsigned long)mdev);38103811/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,3812* w_make_resync_request etc. which may still be on the worker queue3813* to be "canceled" */3814drbd_flush_workqueue(mdev);38153816/* This also does reclaim_net_ee(). If we do this too early, we might3817* miss some resync ee and pages.*/3818drbd_process_done_ee(mdev);38193820kfree(mdev->p_uuid);3821mdev->p_uuid = NULL;38223823if (!is_susp(mdev->state))3824tl_clear(mdev);38253826dev_info(DEV, "Connection closed\n");38273828drbd_md_sync(mdev);38293830fp = FP_DONT_CARE;3831if (get_ldev(mdev)) {3832fp = mdev->ldev->dc.fencing;3833put_ldev(mdev);3834}38353836if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)3837drbd_try_outdate_peer_async(mdev);38383839spin_lock_irq(&mdev->req_lock);3840os = mdev->state;3841if (os.conn >= C_UNCONNECTED) {3842/* Do not restart in case we are C_DISCONNECTING */3843ns = os;3844ns.conn = C_UNCONNECTED;3845rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);3846}3847spin_unlock_irq(&mdev->req_lock);38483849if (os.conn == C_DISCONNECTING) {3850wait_event(mdev->net_cnt_wait, atomic_read(&mdev->net_cnt) == 0);38513852crypto_free_hash(mdev->cram_hmac_tfm);3853mdev->cram_hmac_tfm = NULL;38543855kfree(mdev->net_conf);3856mdev->net_conf = NULL;3857drbd_request_state(mdev, NS(conn, C_STANDALONE));3858}38593860/* serialize with bitmap writeout triggered by the state change,3861* if any. */3862wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));38633864/* tcp_close and release of sendpage pages can be deferred. I don't3865* want to use SO_LINGER, because apparently it can be deferred for3866* more than 20 seconds (longest time I checked).3867*3868* Actually we don't care for exactly when the network stack does its3869* put_page(), but release our reference on these pages right here.3870*/3871i = drbd_release_ee(mdev, &mdev->net_ee);3872if (i)3873dev_info(DEV, "net_ee not empty, killed %u entries\n", i);3874i = atomic_read(&mdev->pp_in_use_by_net);3875if (i)3876dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);3877i = atomic_read(&mdev->pp_in_use);3878if (i)3879dev_info(DEV, "pp_in_use = %d, expected 0\n", i);38803881D_ASSERT(list_empty(&mdev->read_ee));3882D_ASSERT(list_empty(&mdev->active_ee));3883D_ASSERT(list_empty(&mdev->sync_ee));3884D_ASSERT(list_empty(&mdev->done_ee));38853886/* ok, no more ee's on the fly, it is safe to reset the epoch_size */3887atomic_set(&mdev->current_epoch->epoch_size, 0);3888D_ASSERT(list_empty(&mdev->current_epoch->list));3889}38903891/*3892* We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version3893* we can agree on is stored in agreed_pro_version.3894*3895* feature flags and the reserved array should be enough room for future3896* enhancements of the handshake protocol, and possible plugins...3897*3898* for now, they are expected to be zero, but ignored.3899*/3900static int drbd_send_handshake(struct drbd_conf *mdev)3901{3902/* ASSERT current == mdev->receiver ... */3903struct p_handshake *p = &mdev->data.sbuf.handshake;3904int ok;39053906if (mutex_lock_interruptible(&mdev->data.mutex)) {3907dev_err(DEV, "interrupted during initial handshake\n");3908return 0; /* interrupted. not ok. */3909}39103911if (mdev->data.socket == NULL) {3912mutex_unlock(&mdev->data.mutex);3913return 0;3914}39153916memset(p, 0, sizeof(*p));3917p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);3918p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);3919ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,3920(struct p_header80 *)p, sizeof(*p), 0 );3921mutex_unlock(&mdev->data.mutex);3922return ok;3923}39243925/*3926* return values:3927* 1 yes, we have a valid connection3928* 0 oops, did not work out, please try again3929* -1 peer talks different language,3930* no point in trying again, please go standalone.3931*/3932static int drbd_do_handshake(struct drbd_conf *mdev)3933{3934/* ASSERT current == mdev->receiver ... */3935struct p_handshake *p = &mdev->data.rbuf.handshake;3936const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);3937unsigned int length;3938enum drbd_packets cmd;3939int rv;39403941rv = drbd_send_handshake(mdev);3942if (!rv)3943return 0;39443945rv = drbd_recv_header(mdev, &cmd, &length);3946if (!rv)3947return 0;39483949if (cmd != P_HAND_SHAKE) {3950dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",3951cmdname(cmd), cmd);3952return -1;3953}39543955if (length != expect) {3956dev_err(DEV, "expected HandShake length: %u, received: %u\n",3957expect, length);3958return -1;3959}39603961rv = drbd_recv(mdev, &p->head.payload, expect);39623963if (rv != expect) {3964if (!signal_pending(current))3965dev_warn(DEV, "short read receiving handshake packet: l=%u\n", rv);3966return 0;3967}39683969p->protocol_min = be32_to_cpu(p->protocol_min);3970p->protocol_max = be32_to_cpu(p->protocol_max);3971if (p->protocol_max == 0)3972p->protocol_max = p->protocol_min;39733974if (PRO_VERSION_MAX < p->protocol_min ||3975PRO_VERSION_MIN > p->protocol_max)3976goto incompat;39773978mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);39793980dev_info(DEV, "Handshake successful: "3981"Agreed network protocol version %d\n", mdev->agreed_pro_version);39823983return 1;39843985incompat:3986dev_err(DEV, "incompatible DRBD dialects: "3987"I support %d-%d, peer supports %d-%d\n",3988PRO_VERSION_MIN, PRO_VERSION_MAX,3989p->protocol_min, p->protocol_max);3990return -1;3991}39923993#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)3994static int drbd_do_auth(struct drbd_conf *mdev)3995{3996dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");3997dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");3998return -1;3999}4000#else4001#define CHALLENGE_LEN 6440024003/* Return value:40041 - auth succeeded,40050 - failed, try again (network error),4006-1 - auth failed, don't try again.4007*/40084009static int drbd_do_auth(struct drbd_conf *mdev)4010{4011char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */4012struct scatterlist sg;4013char *response = NULL;4014char *right_response = NULL;4015char *peers_ch = NULL;4016unsigned int key_len = strlen(mdev->net_conf->shared_secret);4017unsigned int resp_size;4018struct hash_desc desc;4019enum drbd_packets cmd;4020unsigned int length;4021int rv;40224023desc.tfm = mdev->cram_hmac_tfm;4024desc.flags = 0;40254026rv = crypto_hash_setkey(mdev->cram_hmac_tfm,4027(u8 *)mdev->net_conf->shared_secret, key_len);4028if (rv) {4029dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);4030rv = -1;4031goto fail;4032}40334034get_random_bytes(my_challenge, CHALLENGE_LEN);40354036rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);4037if (!rv)4038goto fail;40394040rv = drbd_recv_header(mdev, &cmd, &length);4041if (!rv)4042goto fail;40434044if (cmd != P_AUTH_CHALLENGE) {4045dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",4046cmdname(cmd), cmd);4047rv = 0;4048goto fail;4049}40504051if (length > CHALLENGE_LEN * 2) {4052dev_err(DEV, "expected AuthChallenge payload too big.\n");4053rv = -1;4054goto fail;4055}40564057peers_ch = kmalloc(length, GFP_NOIO);4058if (peers_ch == NULL) {4059dev_err(DEV, "kmalloc of peers_ch failed\n");4060rv = -1;4061goto fail;4062}40634064rv = drbd_recv(mdev, peers_ch, length);40654066if (rv != length) {4067if (!signal_pending(current))4068dev_warn(DEV, "short read AuthChallenge: l=%u\n", rv);4069rv = 0;4070goto fail;4071}40724073resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);4074response = kmalloc(resp_size, GFP_NOIO);4075if (response == NULL) {4076dev_err(DEV, "kmalloc of response failed\n");4077rv = -1;4078goto fail;4079}40804081sg_init_table(&sg, 1);4082sg_set_buf(&sg, peers_ch, length);40834084rv = crypto_hash_digest(&desc, &sg, sg.length, response);4085if (rv) {4086dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);4087rv = -1;4088goto fail;4089}40904091rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);4092if (!rv)4093goto fail;40944095rv = drbd_recv_header(mdev, &cmd, &length);4096if (!rv)4097goto fail;40984099if (cmd != P_AUTH_RESPONSE) {4100dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",4101cmdname(cmd), cmd);4102rv = 0;4103goto fail;4104}41054106if (length != resp_size) {4107dev_err(DEV, "expected AuthResponse payload of wrong size\n");4108rv = 0;4109goto fail;4110}41114112rv = drbd_recv(mdev, response , resp_size);41134114if (rv != resp_size) {4115if (!signal_pending(current))4116dev_warn(DEV, "short read receiving AuthResponse: l=%u\n", rv);4117rv = 0;4118goto fail;4119}41204121right_response = kmalloc(resp_size, GFP_NOIO);4122if (right_response == NULL) {4123dev_err(DEV, "kmalloc of right_response failed\n");4124rv = -1;4125goto fail;4126}41274128sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);41294130rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);4131if (rv) {4132dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);4133rv = -1;4134goto fail;4135}41364137rv = !memcmp(response, right_response, resp_size);41384139if (rv)4140dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",4141resp_size, mdev->net_conf->cram_hmac_alg);4142else4143rv = -1;41444145fail:4146kfree(peers_ch);4147kfree(response);4148kfree(right_response);41494150return rv;4151}4152#endif41534154int drbdd_init(struct drbd_thread *thi)4155{4156struct drbd_conf *mdev = thi->mdev;4157unsigned int minor = mdev_to_minor(mdev);4158int h;41594160sprintf(current->comm, "drbd%d_receiver", minor);41614162dev_info(DEV, "receiver (re)started\n");41634164do {4165h = drbd_connect(mdev);4166if (h == 0) {4167drbd_disconnect(mdev);4168schedule_timeout_interruptible(HZ);4169}4170if (h == -1) {4171dev_warn(DEV, "Discarding network configuration.\n");4172drbd_force_state(mdev, NS(conn, C_DISCONNECTING));4173}4174} while (h == 0);41754176if (h > 0) {4177if (get_net_conf(mdev)) {4178drbdd(mdev);4179put_net_conf(mdev);4180}4181}41824183drbd_disconnect(mdev);41844185dev_info(DEV, "receiver terminated\n");4186return 0;4187}41884189/* ********* acknowledge sender ******** */41904191static int got_RqSReply(struct drbd_conf *mdev, struct p_header80 *h)4192{4193struct p_req_state_reply *p = (struct p_req_state_reply *)h;41944195int retcode = be32_to_cpu(p->retcode);41964197if (retcode >= SS_SUCCESS) {4198set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);4199} else {4200set_bit(CL_ST_CHG_FAIL, &mdev->flags);4201dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",4202drbd_set_st_err_str(retcode), retcode);4203}4204wake_up(&mdev->state_wait);42054206return true;4207}42084209static int got_Ping(struct drbd_conf *mdev, struct p_header80 *h)4210{4211return drbd_send_ping_ack(mdev);42124213}42144215static int got_PingAck(struct drbd_conf *mdev, struct p_header80 *h)4216{4217/* restore idle timeout */4218mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;4219if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))4220wake_up(&mdev->misc_wait);42214222return true;4223}42244225static int got_IsInSync(struct drbd_conf *mdev, struct p_header80 *h)4226{4227struct p_block_ack *p = (struct p_block_ack *)h;4228sector_t sector = be64_to_cpu(p->sector);4229int blksize = be32_to_cpu(p->blksize);42304231D_ASSERT(mdev->agreed_pro_version >= 89);42324233update_peer_seq(mdev, be32_to_cpu(p->seq_num));42344235if (get_ldev(mdev)) {4236drbd_rs_complete_io(mdev, sector);4237drbd_set_in_sync(mdev, sector, blksize);4238/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */4239mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);4240put_ldev(mdev);4241}4242dec_rs_pending(mdev);4243atomic_add(blksize >> 9, &mdev->rs_sect_in);42444245return true;4246}42474248/* when we receive the ACK for a write request,4249* verify that we actually know about it */4250static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,4251u64 id, sector_t sector)4252{4253struct hlist_head *slot = tl_hash_slot(mdev, sector);4254struct hlist_node *n;4255struct drbd_request *req;42564257hlist_for_each_entry(req, n, slot, collision) {4258if ((unsigned long)req == (unsigned long)id) {4259if (req->sector != sector) {4260dev_err(DEV, "_ack_id_to_req: found req %p but it has "4261"wrong sector (%llus versus %llus)\n", req,4262(unsigned long long)req->sector,4263(unsigned long long)sector);4264break;4265}4266return req;4267}4268}4269return NULL;4270}42714272typedef struct drbd_request *(req_validator_fn)4273(struct drbd_conf *mdev, u64 id, sector_t sector);42744275static int validate_req_change_req_state(struct drbd_conf *mdev,4276u64 id, sector_t sector, req_validator_fn validator,4277const char *func, enum drbd_req_event what)4278{4279struct drbd_request *req;4280struct bio_and_error m;42814282spin_lock_irq(&mdev->req_lock);4283req = validator(mdev, id, sector);4284if (unlikely(!req)) {4285spin_unlock_irq(&mdev->req_lock);42864287dev_err(DEV, "%s: failed to find req %p, sector %llus\n", func,4288(void *)(unsigned long)id, (unsigned long long)sector);4289return false;4290}4291__req_mod(req, what, &m);4292spin_unlock_irq(&mdev->req_lock);42934294if (m.bio)4295complete_master_bio(mdev, &m);4296return true;4297}42984299static int got_BlockAck(struct drbd_conf *mdev, struct p_header80 *h)4300{4301struct p_block_ack *p = (struct p_block_ack *)h;4302sector_t sector = be64_to_cpu(p->sector);4303int blksize = be32_to_cpu(p->blksize);4304enum drbd_req_event what;43054306update_peer_seq(mdev, be32_to_cpu(p->seq_num));43074308if (is_syncer_block_id(p->block_id)) {4309drbd_set_in_sync(mdev, sector, blksize);4310dec_rs_pending(mdev);4311return true;4312}4313switch (be16_to_cpu(h->command)) {4314case P_RS_WRITE_ACK:4315D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);4316what = write_acked_by_peer_and_sis;4317break;4318case P_WRITE_ACK:4319D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);4320what = write_acked_by_peer;4321break;4322case P_RECV_ACK:4323D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);4324what = recv_acked_by_peer;4325break;4326case P_DISCARD_ACK:4327D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);4328what = conflict_discarded_by_peer;4329break;4330default:4331D_ASSERT(0);4332return false;4333}43344335return validate_req_change_req_state(mdev, p->block_id, sector,4336_ack_id_to_req, __func__ , what);4337}43384339static int got_NegAck(struct drbd_conf *mdev, struct p_header80 *h)4340{4341struct p_block_ack *p = (struct p_block_ack *)h;4342sector_t sector = be64_to_cpu(p->sector);4343int size = be32_to_cpu(p->blksize);4344struct drbd_request *req;4345struct bio_and_error m;43464347update_peer_seq(mdev, be32_to_cpu(p->seq_num));43484349if (is_syncer_block_id(p->block_id)) {4350dec_rs_pending(mdev);4351drbd_rs_failed_io(mdev, sector, size);4352return true;4353}43544355spin_lock_irq(&mdev->req_lock);4356req = _ack_id_to_req(mdev, p->block_id, sector);4357if (!req) {4358spin_unlock_irq(&mdev->req_lock);4359if (mdev->net_conf->wire_protocol == DRBD_PROT_A ||4360mdev->net_conf->wire_protocol == DRBD_PROT_B) {4361/* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.4362The master bio might already be completed, therefore the4363request is no longer in the collision hash.4364=> Do not try to validate block_id as request. */4365/* In Protocol B we might already have got a P_RECV_ACK4366but then get a P_NEG_ACK after wards. */4367drbd_set_out_of_sync(mdev, sector, size);4368return true;4369} else {4370dev_err(DEV, "%s: failed to find req %p, sector %llus\n", __func__,4371(void *)(unsigned long)p->block_id, (unsigned long long)sector);4372return false;4373}4374}4375__req_mod(req, neg_acked, &m);4376spin_unlock_irq(&mdev->req_lock);43774378if (m.bio)4379complete_master_bio(mdev, &m);4380return true;4381}43824383static int got_NegDReply(struct drbd_conf *mdev, struct p_header80 *h)4384{4385struct p_block_ack *p = (struct p_block_ack *)h;4386sector_t sector = be64_to_cpu(p->sector);43874388update_peer_seq(mdev, be32_to_cpu(p->seq_num));4389dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",4390(unsigned long long)sector, be32_to_cpu(p->blksize));43914392return validate_req_change_req_state(mdev, p->block_id, sector,4393_ar_id_to_req, __func__ , neg_acked);4394}43954396static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header80 *h)4397{4398sector_t sector;4399int size;4400struct p_block_ack *p = (struct p_block_ack *)h;44014402sector = be64_to_cpu(p->sector);4403size = be32_to_cpu(p->blksize);44044405update_peer_seq(mdev, be32_to_cpu(p->seq_num));44064407dec_rs_pending(mdev);44084409if (get_ldev_if_state(mdev, D_FAILED)) {4410drbd_rs_complete_io(mdev, sector);4411switch (be16_to_cpu(h->command)) {4412case P_NEG_RS_DREPLY:4413drbd_rs_failed_io(mdev, sector, size);4414case P_RS_CANCEL:4415break;4416default:4417D_ASSERT(0);4418put_ldev(mdev);4419return false;4420}4421put_ldev(mdev);4422}44234424return true;4425}44264427static int got_BarrierAck(struct drbd_conf *mdev, struct p_header80 *h)4428{4429struct p_barrier_ack *p = (struct p_barrier_ack *)h;44304431tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));44324433if (mdev->state.conn == C_AHEAD &&4434atomic_read(&mdev->ap_in_flight) == 0 &&4435!test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {4436mdev->start_resync_timer.expires = jiffies + HZ;4437add_timer(&mdev->start_resync_timer);4438}44394440return true;4441}44424443static int got_OVResult(struct drbd_conf *mdev, struct p_header80 *h)4444{4445struct p_block_ack *p = (struct p_block_ack *)h;4446struct drbd_work *w;4447sector_t sector;4448int size;44494450sector = be64_to_cpu(p->sector);4451size = be32_to_cpu(p->blksize);44524453update_peer_seq(mdev, be32_to_cpu(p->seq_num));44544455if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)4456drbd_ov_oos_found(mdev, sector, size);4457else4458ov_oos_print(mdev);44594460if (!get_ldev(mdev))4461return true;44624463drbd_rs_complete_io(mdev, sector);4464dec_rs_pending(mdev);44654466--mdev->ov_left;44674468/* let's advance progress step marks only for every other megabyte */4469if ((mdev->ov_left & 0x200) == 0x200)4470drbd_advance_rs_marks(mdev, mdev->ov_left);44714472if (mdev->ov_left == 0) {4473w = kmalloc(sizeof(*w), GFP_NOIO);4474if (w) {4475w->cb = w_ov_finished;4476drbd_queue_work_front(&mdev->data.work, w);4477} else {4478dev_err(DEV, "kmalloc(w) failed.");4479ov_oos_print(mdev);4480drbd_resync_finished(mdev);4481}4482}4483put_ldev(mdev);4484return true;4485}44864487static int got_skip(struct drbd_conf *mdev, struct p_header80 *h)4488{4489return true;4490}44914492struct asender_cmd {4493size_t pkt_size;4494int (*process)(struct drbd_conf *mdev, struct p_header80 *h);4495};44964497static struct asender_cmd *get_asender_cmd(int cmd)4498{4499static struct asender_cmd asender_tbl[] = {4500/* anything missing from this table is in4501* the drbd_cmd_handler (drbd_default_handler) table,4502* see the beginning of drbdd() */4503[P_PING] = { sizeof(struct p_header80), got_Ping },4504[P_PING_ACK] = { sizeof(struct p_header80), got_PingAck },4505[P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },4506[P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },4507[P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },4508[P_DISCARD_ACK] = { sizeof(struct p_block_ack), got_BlockAck },4509[P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },4510[P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },4511[P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply},4512[P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },4513[P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },4514[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },4515[P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },4516[P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },4517[P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply},4518[P_MAX_CMD] = { 0, NULL },4519};4520if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)4521return NULL;4522return &asender_tbl[cmd];4523}45244525int drbd_asender(struct drbd_thread *thi)4526{4527struct drbd_conf *mdev = thi->mdev;4528struct p_header80 *h = &mdev->meta.rbuf.header.h80;4529struct asender_cmd *cmd = NULL;45304531int rv, len;4532void *buf = h;4533int received = 0;4534int expect = sizeof(struct p_header80);4535int empty;4536int ping_timeout_active = 0;45374538sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));45394540current->policy = SCHED_RR; /* Make this a realtime task! */4541current->rt_priority = 2; /* more important than all other tasks */45424543while (get_t_state(thi) == Running) {4544drbd_thread_current_set_cpu(mdev);4545if (test_and_clear_bit(SEND_PING, &mdev->flags)) {4546ERR_IF(!drbd_send_ping(mdev)) goto reconnect;4547mdev->meta.socket->sk->sk_rcvtimeo =4548mdev->net_conf->ping_timeo*HZ/10;4549ping_timeout_active = 1;4550}45514552/* conditionally cork;4553* it may hurt latency if we cork without much to send */4554if (!mdev->net_conf->no_cork &&45553 < atomic_read(&mdev->unacked_cnt))4556drbd_tcp_cork(mdev->meta.socket);4557while (1) {4558clear_bit(SIGNAL_ASENDER, &mdev->flags);4559flush_signals(current);4560if (!drbd_process_done_ee(mdev))4561goto reconnect;4562/* to avoid race with newly queued ACKs */4563set_bit(SIGNAL_ASENDER, &mdev->flags);4564spin_lock_irq(&mdev->req_lock);4565empty = list_empty(&mdev->done_ee);4566spin_unlock_irq(&mdev->req_lock);4567/* new ack may have been queued right here,4568* but then there is also a signal pending,4569* and we start over... */4570if (empty)4571break;4572}4573/* but unconditionally uncork unless disabled */4574if (!mdev->net_conf->no_cork)4575drbd_tcp_uncork(mdev->meta.socket);45764577/* short circuit, recv_msg would return EINTR anyways. */4578if (signal_pending(current))4579continue;45804581rv = drbd_recv_short(mdev, mdev->meta.socket,4582buf, expect-received, 0);4583clear_bit(SIGNAL_ASENDER, &mdev->flags);45844585flush_signals(current);45864587/* Note:4588* -EINTR (on meta) we got a signal4589* -EAGAIN (on meta) rcvtimeo expired4590* -ECONNRESET other side closed the connection4591* -ERESTARTSYS (on data) we got a signal4592* rv < 0 other than above: unexpected error!4593* rv == expected: full header or command4594* rv < expected: "woken" by signal during receive4595* rv == 0 : "connection shut down by peer"4596*/4597if (likely(rv > 0)) {4598received += rv;4599buf += rv;4600} else if (rv == 0) {4601dev_err(DEV, "meta connection shut down by peer.\n");4602goto reconnect;4603} else if (rv == -EAGAIN) {4604/* If the data socket received something meanwhile,4605* that is good enough: peer is still alive. */4606if (time_after(mdev->last_received,4607jiffies - mdev->meta.socket->sk->sk_rcvtimeo))4608continue;4609if (ping_timeout_active) {4610dev_err(DEV, "PingAck did not arrive in time.\n");4611goto reconnect;4612}4613set_bit(SEND_PING, &mdev->flags);4614continue;4615} else if (rv == -EINTR) {4616continue;4617} else {4618dev_err(DEV, "sock_recvmsg returned %d\n", rv);4619goto reconnect;4620}46214622if (received == expect && cmd == NULL) {4623if (unlikely(h->magic != BE_DRBD_MAGIC)) {4624dev_err(DEV, "magic?? on meta m: 0x%08x c: %d l: %d\n",4625be32_to_cpu(h->magic),4626be16_to_cpu(h->command),4627be16_to_cpu(h->length));4628goto reconnect;4629}4630cmd = get_asender_cmd(be16_to_cpu(h->command));4631len = be16_to_cpu(h->length);4632if (unlikely(cmd == NULL)) {4633dev_err(DEV, "unknown command?? on meta m: 0x%08x c: %d l: %d\n",4634be32_to_cpu(h->magic),4635be16_to_cpu(h->command),4636be16_to_cpu(h->length));4637goto disconnect;4638}4639expect = cmd->pkt_size;4640ERR_IF(len != expect-sizeof(struct p_header80))4641goto reconnect;4642}4643if (received == expect) {4644mdev->last_received = jiffies;4645D_ASSERT(cmd != NULL);4646if (!cmd->process(mdev, h))4647goto reconnect;46484649/* the idle_timeout (ping-int)4650* has been restored in got_PingAck() */4651if (cmd == get_asender_cmd(P_PING_ACK))4652ping_timeout_active = 0;46534654buf = h;4655received = 0;4656expect = sizeof(struct p_header80);4657cmd = NULL;4658}4659}46604661if (0) {4662reconnect:4663drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));4664drbd_md_sync(mdev);4665}4666if (0) {4667disconnect:4668drbd_force_state(mdev, NS(conn, C_DISCONNECTING));4669drbd_md_sync(mdev);4670}4671clear_bit(SIGNAL_ASENDER, &mdev->flags);46724673D_ASSERT(mdev->state.conn < C_CONNECTED);4674dev_info(DEV, "asender terminated\n");46754676return 0;4677}467846794680