Path: blob/master/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
15111 views
/*1* Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.2*3* This software is available to you under a choice of one of two4* licenses. You may choose to be licensed under the terms of the GNU5* General Public License (GPL) Version 2, available from the file6* COPYING in the main directory of this source tree, or the BSD-type7* license below:8*9* Redistribution and use in source and binary forms, with or without10* modification, are permitted provided that the following conditions11* are met:12*13* Redistributions of source code must retain the above copyright14* notice, this list of conditions and the following disclaimer.15*16* Redistributions in binary form must reproduce the above17* copyright notice, this list of conditions and the following18* disclaimer in the documentation and/or other materials provided19* with the distribution.20*21* Neither the name of the Network Appliance, Inc. nor the names of22* its contributors may be used to endorse or promote products23* derived from this software without specific prior written24* permission.25*26* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS27* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT28* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR29* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT30* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,31* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT32* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,33* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY34* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT35* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE36* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.37*38* Author: Tom Tucker <[email protected]>39*/4041#include <linux/sunrpc/debug.h>42#include <linux/sunrpc/rpc_rdma.h>43#include <linux/spinlock.h>44#include <asm/unaligned.h>45#include <rdma/ib_verbs.h>46#include <rdma/rdma_cm.h>47#include <linux/sunrpc/svc_rdma.h>4849#define RPCDBG_FACILITY RPCDBG_SVCXPRT5051/*52* Replace the pages in the rq_argpages array with the pages from the SGE in53* the RDMA_RECV completion. The SGL should contain full pages up until the54* last one.55*/56static void rdma_build_arg_xdr(struct svc_rqst *rqstp,57struct svc_rdma_op_ctxt *ctxt,58u32 byte_count)59{60struct page *page;61u32 bc;62int sge_no;6364/* Swap the page in the SGE with the page in argpages */65page = ctxt->pages[0];66put_page(rqstp->rq_pages[0]);67rqstp->rq_pages[0] = page;6869/* Set up the XDR head */70rqstp->rq_arg.head[0].iov_base = page_address(page);71rqstp->rq_arg.head[0].iov_len = min(byte_count, ctxt->sge[0].length);72rqstp->rq_arg.len = byte_count;73rqstp->rq_arg.buflen = byte_count;7475/* Compute bytes past head in the SGL */76bc = byte_count - rqstp->rq_arg.head[0].iov_len;7778/* If data remains, store it in the pagelist */79rqstp->rq_arg.page_len = bc;80rqstp->rq_arg.page_base = 0;81rqstp->rq_arg.pages = &rqstp->rq_pages[1];82sge_no = 1;83while (bc && sge_no < ctxt->count) {84page = ctxt->pages[sge_no];85put_page(rqstp->rq_pages[sge_no]);86rqstp->rq_pages[sge_no] = page;87bc -= min(bc, ctxt->sge[sge_no].length);88rqstp->rq_arg.buflen += ctxt->sge[sge_no].length;89sge_no++;90}91rqstp->rq_respages = &rqstp->rq_pages[sge_no];9293/* We should never run out of SGE because the limit is defined to94* support the max allowed RPC data length95*/96BUG_ON(bc && (sge_no == ctxt->count));97BUG_ON((rqstp->rq_arg.head[0].iov_len + rqstp->rq_arg.page_len)98!= byte_count);99BUG_ON(rqstp->rq_arg.len != byte_count);100101/* If not all pages were used from the SGL, free the remaining ones */102bc = sge_no;103while (sge_no < ctxt->count) {104page = ctxt->pages[sge_no++];105put_page(page);106}107ctxt->count = bc;108109/* Set up tail */110rqstp->rq_arg.tail[0].iov_base = NULL;111rqstp->rq_arg.tail[0].iov_len = 0;112}113114/* Encode a read-chunk-list as an array of IB SGE115*116* Assumptions:117* - chunk[0]->position points to pages[0] at an offset of 0118* - pages[] is not physically or virtually contiguous and consists of119* PAGE_SIZE elements.120*121* Output:122* - sge array pointing into pages[] array.123* - chunk_sge array specifying sge index and count for each124* chunk in the read list125*126*/127static int map_read_chunks(struct svcxprt_rdma *xprt,128struct svc_rqst *rqstp,129struct svc_rdma_op_ctxt *head,130struct rpcrdma_msg *rmsgp,131struct svc_rdma_req_map *rpl_map,132struct svc_rdma_req_map *chl_map,133int ch_count,134int byte_count)135{136int sge_no;137int sge_bytes;138int page_off;139int page_no;140int ch_bytes;141int ch_no;142struct rpcrdma_read_chunk *ch;143144sge_no = 0;145page_no = 0;146page_off = 0;147ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];148ch_no = 0;149ch_bytes = ch->rc_target.rs_length;150head->arg.head[0] = rqstp->rq_arg.head[0];151head->arg.tail[0] = rqstp->rq_arg.tail[0];152head->arg.pages = &head->pages[head->count];153head->hdr_count = head->count; /* save count of hdr pages */154head->arg.page_base = 0;155head->arg.page_len = ch_bytes;156head->arg.len = rqstp->rq_arg.len + ch_bytes;157head->arg.buflen = rqstp->rq_arg.buflen + ch_bytes;158head->count++;159chl_map->ch[0].start = 0;160while (byte_count) {161rpl_map->sge[sge_no].iov_base =162page_address(rqstp->rq_arg.pages[page_no]) + page_off;163sge_bytes = min_t(int, PAGE_SIZE-page_off, ch_bytes);164rpl_map->sge[sge_no].iov_len = sge_bytes;165/*166* Don't bump head->count here because the same page167* may be used by multiple SGE.168*/169head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no];170rqstp->rq_respages = &rqstp->rq_arg.pages[page_no+1];171172byte_count -= sge_bytes;173ch_bytes -= sge_bytes;174sge_no++;175/*176* If all bytes for this chunk have been mapped to an177* SGE, move to the next SGE178*/179if (ch_bytes == 0) {180chl_map->ch[ch_no].count =181sge_no - chl_map->ch[ch_no].start;182ch_no++;183ch++;184chl_map->ch[ch_no].start = sge_no;185ch_bytes = ch->rc_target.rs_length;186/* If bytes remaining account for next chunk */187if (byte_count) {188head->arg.page_len += ch_bytes;189head->arg.len += ch_bytes;190head->arg.buflen += ch_bytes;191}192}193/*194* If this SGE consumed all of the page, move to the195* next page196*/197if ((sge_bytes + page_off) == PAGE_SIZE) {198page_no++;199page_off = 0;200/*201* If there are still bytes left to map, bump202* the page count203*/204if (byte_count)205head->count++;206} else207page_off += sge_bytes;208}209BUG_ON(byte_count != 0);210return sge_no;211}212213/* Map a read-chunk-list to an XDR and fast register the page-list.214*215* Assumptions:216* - chunk[0] position points to pages[0] at an offset of 0217* - pages[] will be made physically contiguous by creating a one-off memory218* region using the fastreg verb.219* - byte_count is # of bytes in read-chunk-list220* - ch_count is # of chunks in read-chunk-list221*222* Output:223* - sge array pointing into pages[] array.224* - chunk_sge array specifying sge index and count for each225* chunk in the read list226*/227static int fast_reg_read_chunks(struct svcxprt_rdma *xprt,228struct svc_rqst *rqstp,229struct svc_rdma_op_ctxt *head,230struct rpcrdma_msg *rmsgp,231struct svc_rdma_req_map *rpl_map,232struct svc_rdma_req_map *chl_map,233int ch_count,234int byte_count)235{236int page_no;237int ch_no;238u32 offset;239struct rpcrdma_read_chunk *ch;240struct svc_rdma_fastreg_mr *frmr;241int ret = 0;242243frmr = svc_rdma_get_frmr(xprt);244if (IS_ERR(frmr))245return -ENOMEM;246247head->frmr = frmr;248head->arg.head[0] = rqstp->rq_arg.head[0];249head->arg.tail[0] = rqstp->rq_arg.tail[0];250head->arg.pages = &head->pages[head->count];251head->hdr_count = head->count; /* save count of hdr pages */252head->arg.page_base = 0;253head->arg.page_len = byte_count;254head->arg.len = rqstp->rq_arg.len + byte_count;255head->arg.buflen = rqstp->rq_arg.buflen + byte_count;256257/* Fast register the page list */258frmr->kva = page_address(rqstp->rq_arg.pages[0]);259frmr->direction = DMA_FROM_DEVICE;260frmr->access_flags = (IB_ACCESS_LOCAL_WRITE|IB_ACCESS_REMOTE_WRITE);261frmr->map_len = byte_count;262frmr->page_list_len = PAGE_ALIGN(byte_count) >> PAGE_SHIFT;263for (page_no = 0; page_no < frmr->page_list_len; page_no++) {264frmr->page_list->page_list[page_no] =265ib_dma_map_page(xprt->sc_cm_id->device,266rqstp->rq_arg.pages[page_no], 0,267PAGE_SIZE, DMA_FROM_DEVICE);268if (ib_dma_mapping_error(xprt->sc_cm_id->device,269frmr->page_list->page_list[page_no]))270goto fatal_err;271atomic_inc(&xprt->sc_dma_used);272head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no];273}274head->count += page_no;275276/* rq_respages points one past arg pages */277rqstp->rq_respages = &rqstp->rq_arg.pages[page_no];278279/* Create the reply and chunk maps */280offset = 0;281ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];282for (ch_no = 0; ch_no < ch_count; ch_no++) {283rpl_map->sge[ch_no].iov_base = frmr->kva + offset;284rpl_map->sge[ch_no].iov_len = ch->rc_target.rs_length;285chl_map->ch[ch_no].count = 1;286chl_map->ch[ch_no].start = ch_no;287offset += ch->rc_target.rs_length;288ch++;289}290291ret = svc_rdma_fastreg(xprt, frmr);292if (ret)293goto fatal_err;294295return ch_no;296297fatal_err:298printk("svcrdma: error fast registering xdr for xprt %p", xprt);299svc_rdma_put_frmr(xprt, frmr);300return -EIO;301}302303static int rdma_set_ctxt_sge(struct svcxprt_rdma *xprt,304struct svc_rdma_op_ctxt *ctxt,305struct svc_rdma_fastreg_mr *frmr,306struct kvec *vec,307u64 *sgl_offset,308int count)309{310int i;311unsigned long off;312313ctxt->count = count;314ctxt->direction = DMA_FROM_DEVICE;315for (i = 0; i < count; i++) {316ctxt->sge[i].length = 0; /* in case map fails */317if (!frmr) {318BUG_ON(0 == virt_to_page(vec[i].iov_base));319off = (unsigned long)vec[i].iov_base & ~PAGE_MASK;320ctxt->sge[i].addr =321ib_dma_map_page(xprt->sc_cm_id->device,322virt_to_page(vec[i].iov_base),323off,324vec[i].iov_len,325DMA_FROM_DEVICE);326if (ib_dma_mapping_error(xprt->sc_cm_id->device,327ctxt->sge[i].addr))328return -EINVAL;329ctxt->sge[i].lkey = xprt->sc_dma_lkey;330atomic_inc(&xprt->sc_dma_used);331} else {332ctxt->sge[i].addr = (unsigned long)vec[i].iov_base;333ctxt->sge[i].lkey = frmr->mr->lkey;334}335ctxt->sge[i].length = vec[i].iov_len;336*sgl_offset = *sgl_offset + vec[i].iov_len;337}338return 0;339}340341static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count)342{343if ((rdma_node_get_transport(xprt->sc_cm_id->device->node_type) ==344RDMA_TRANSPORT_IWARP) &&345sge_count > 1)346return 1;347else348return min_t(int, sge_count, xprt->sc_max_sge);349}350351/*352* Use RDMA_READ to read data from the advertised client buffer into the353* XDR stream starting at rq_arg.head[0].iov_base.354* Each chunk in the array355* contains the following fields:356* discrim - '1', This isn't used for data placement357* position - The xdr stream offset (the same for every chunk)358* handle - RMR for client memory region359* length - data transfer length360* offset - 64 bit tagged offset in remote memory region361*362* On our side, we need to read into a pagelist. The first page immediately363* follows the RPC header.364*365* This function returns:366* 0 - No error and no read-list found.367*368* 1 - Successful read-list processing. The data is not yet in369* the pagelist and therefore the RPC request must be deferred. The370* I/O completion will enqueue the transport again and371* svc_rdma_recvfrom will complete the request.372*373* <0 - Error processing/posting read-list.374*375* NOTE: The ctxt must not be touched after the last WR has been posted376* because the I/O completion processing may occur on another377* processor and free / modify the context. Ne touche pas!378*/379static int rdma_read_xdr(struct svcxprt_rdma *xprt,380struct rpcrdma_msg *rmsgp,381struct svc_rqst *rqstp,382struct svc_rdma_op_ctxt *hdr_ctxt)383{384struct ib_send_wr read_wr;385struct ib_send_wr inv_wr;386int err = 0;387int ch_no;388int ch_count;389int byte_count;390int sge_count;391u64 sgl_offset;392struct rpcrdma_read_chunk *ch;393struct svc_rdma_op_ctxt *ctxt = NULL;394struct svc_rdma_req_map *rpl_map;395struct svc_rdma_req_map *chl_map;396397/* If no read list is present, return 0 */398ch = svc_rdma_get_read_chunk(rmsgp);399if (!ch)400return 0;401402svc_rdma_rcl_chunk_counts(ch, &ch_count, &byte_count);403if (ch_count > RPCSVC_MAXPAGES)404return -EINVAL;405406/* Allocate temporary reply and chunk maps */407rpl_map = svc_rdma_get_req_map();408chl_map = svc_rdma_get_req_map();409410if (!xprt->sc_frmr_pg_list_len)411sge_count = map_read_chunks(xprt, rqstp, hdr_ctxt, rmsgp,412rpl_map, chl_map, ch_count,413byte_count);414else415sge_count = fast_reg_read_chunks(xprt, rqstp, hdr_ctxt, rmsgp,416rpl_map, chl_map, ch_count,417byte_count);418if (sge_count < 0) {419err = -EIO;420goto out;421}422423sgl_offset = 0;424ch_no = 0;425426for (ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];427ch->rc_discrim != 0; ch++, ch_no++) {428next_sge:429ctxt = svc_rdma_get_context(xprt);430ctxt->direction = DMA_FROM_DEVICE;431ctxt->frmr = hdr_ctxt->frmr;432ctxt->read_hdr = NULL;433clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);434clear_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);435436/* Prepare READ WR */437memset(&read_wr, 0, sizeof read_wr);438read_wr.wr_id = (unsigned long)ctxt;439read_wr.opcode = IB_WR_RDMA_READ;440ctxt->wr_op = read_wr.opcode;441read_wr.send_flags = IB_SEND_SIGNALED;442read_wr.wr.rdma.rkey = ch->rc_target.rs_handle;443read_wr.wr.rdma.remote_addr =444get_unaligned(&(ch->rc_target.rs_offset)) +445sgl_offset;446read_wr.sg_list = ctxt->sge;447read_wr.num_sge =448rdma_read_max_sge(xprt, chl_map->ch[ch_no].count);449err = rdma_set_ctxt_sge(xprt, ctxt, hdr_ctxt->frmr,450&rpl_map->sge[chl_map->ch[ch_no].start],451&sgl_offset,452read_wr.num_sge);453if (err) {454svc_rdma_unmap_dma(ctxt);455svc_rdma_put_context(ctxt, 0);456goto out;457}458if (((ch+1)->rc_discrim == 0) &&459(read_wr.num_sge == chl_map->ch[ch_no].count)) {460/*461* Mark the last RDMA_READ with a bit to462* indicate all RPC data has been fetched from463* the client and the RPC needs to be enqueued.464*/465set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);466if (hdr_ctxt->frmr) {467set_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);468/*469* Invalidate the local MR used to map the data470* sink.471*/472if (xprt->sc_dev_caps &473SVCRDMA_DEVCAP_READ_W_INV) {474read_wr.opcode =475IB_WR_RDMA_READ_WITH_INV;476ctxt->wr_op = read_wr.opcode;477read_wr.ex.invalidate_rkey =478ctxt->frmr->mr->lkey;479} else {480/* Prepare INVALIDATE WR */481memset(&inv_wr, 0, sizeof inv_wr);482inv_wr.opcode = IB_WR_LOCAL_INV;483inv_wr.send_flags = IB_SEND_SIGNALED;484inv_wr.ex.invalidate_rkey =485hdr_ctxt->frmr->mr->lkey;486read_wr.next = &inv_wr;487}488}489ctxt->read_hdr = hdr_ctxt;490}491/* Post the read */492err = svc_rdma_send(xprt, &read_wr);493if (err) {494printk(KERN_ERR "svcrdma: Error %d posting RDMA_READ\n",495err);496set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);497svc_rdma_unmap_dma(ctxt);498svc_rdma_put_context(ctxt, 0);499goto out;500}501atomic_inc(&rdma_stat_read);502503if (read_wr.num_sge < chl_map->ch[ch_no].count) {504chl_map->ch[ch_no].count -= read_wr.num_sge;505chl_map->ch[ch_no].start += read_wr.num_sge;506goto next_sge;507}508sgl_offset = 0;509err = 1;510}511512out:513svc_rdma_put_req_map(rpl_map);514svc_rdma_put_req_map(chl_map);515516/* Detach arg pages. svc_recv will replenish them */517for (ch_no = 0; &rqstp->rq_pages[ch_no] < rqstp->rq_respages; ch_no++)518rqstp->rq_pages[ch_no] = NULL;519520/*521* Detach res pages. svc_release must see a resused count of522* zero or it will attempt to put them.523*/524while (rqstp->rq_resused)525rqstp->rq_respages[--rqstp->rq_resused] = NULL;526527return err;528}529530static int rdma_read_complete(struct svc_rqst *rqstp,531struct svc_rdma_op_ctxt *head)532{533int page_no;534int ret;535536BUG_ON(!head);537538/* Copy RPC pages */539for (page_no = 0; page_no < head->count; page_no++) {540put_page(rqstp->rq_pages[page_no]);541rqstp->rq_pages[page_no] = head->pages[page_no];542}543/* Point rq_arg.pages past header */544rqstp->rq_arg.pages = &rqstp->rq_pages[head->hdr_count];545rqstp->rq_arg.page_len = head->arg.page_len;546rqstp->rq_arg.page_base = head->arg.page_base;547548/* rq_respages starts after the last arg page */549rqstp->rq_respages = &rqstp->rq_arg.pages[page_no];550rqstp->rq_resused = 0;551552/* Rebuild rq_arg head and tail. */553rqstp->rq_arg.head[0] = head->arg.head[0];554rqstp->rq_arg.tail[0] = head->arg.tail[0];555rqstp->rq_arg.len = head->arg.len;556rqstp->rq_arg.buflen = head->arg.buflen;557558/* Free the context */559svc_rdma_put_context(head, 0);560561/* XXX: What should this be? */562rqstp->rq_prot = IPPROTO_MAX;563svc_xprt_copy_addrs(rqstp, rqstp->rq_xprt);564565ret = rqstp->rq_arg.head[0].iov_len566+ rqstp->rq_arg.page_len567+ rqstp->rq_arg.tail[0].iov_len;568dprintk("svcrdma: deferred read ret=%d, rq_arg.len =%d, "569"rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len = %zd\n",570ret, rqstp->rq_arg.len, rqstp->rq_arg.head[0].iov_base,571rqstp->rq_arg.head[0].iov_len);572573return ret;574}575576/*577* Set up the rqstp thread context to point to the RQ buffer. If578* necessary, pull additional data from the client with an RDMA_READ579* request.580*/581int svc_rdma_recvfrom(struct svc_rqst *rqstp)582{583struct svc_xprt *xprt = rqstp->rq_xprt;584struct svcxprt_rdma *rdma_xprt =585container_of(xprt, struct svcxprt_rdma, sc_xprt);586struct svc_rdma_op_ctxt *ctxt = NULL;587struct rpcrdma_msg *rmsgp;588int ret = 0;589int len;590591dprintk("svcrdma: rqstp=%p\n", rqstp);592593spin_lock_bh(&rdma_xprt->sc_rq_dto_lock);594if (!list_empty(&rdma_xprt->sc_read_complete_q)) {595ctxt = list_entry(rdma_xprt->sc_read_complete_q.next,596struct svc_rdma_op_ctxt,597dto_q);598list_del_init(&ctxt->dto_q);599}600if (ctxt) {601spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);602return rdma_read_complete(rqstp, ctxt);603}604605if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {606ctxt = list_entry(rdma_xprt->sc_rq_dto_q.next,607struct svc_rdma_op_ctxt,608dto_q);609list_del_init(&ctxt->dto_q);610} else {611atomic_inc(&rdma_stat_rq_starve);612clear_bit(XPT_DATA, &xprt->xpt_flags);613ctxt = NULL;614}615spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);616if (!ctxt) {617/* This is the EAGAIN path. The svc_recv routine will618* return -EAGAIN, the nfsd thread will go to call into619* svc_recv again and we shouldn't be on the active620* transport list621*/622if (test_bit(XPT_CLOSE, &xprt->xpt_flags))623goto close_out;624625BUG_ON(ret);626goto out;627}628dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p, status=%d\n",629ctxt, rdma_xprt, rqstp, ctxt->wc_status);630BUG_ON(ctxt->wc_status != IB_WC_SUCCESS);631atomic_inc(&rdma_stat_recv);632633/* Build up the XDR from the receive buffers. */634rdma_build_arg_xdr(rqstp, ctxt, ctxt->byte_len);635636/* Decode the RDMA header. */637len = svc_rdma_xdr_decode_req(&rmsgp, rqstp);638rqstp->rq_xprt_hlen = len;639640/* If the request is invalid, reply with an error */641if (len < 0) {642if (len == -ENOSYS)643svc_rdma_send_error(rdma_xprt, rmsgp, ERR_VERS);644goto close_out;645}646647/* Read read-list data. */648ret = rdma_read_xdr(rdma_xprt, rmsgp, rqstp, ctxt);649if (ret > 0) {650/* read-list posted, defer until data received from client. */651goto defer;652}653if (ret < 0) {654/* Post of read-list failed, free context. */655svc_rdma_put_context(ctxt, 1);656return 0;657}658659ret = rqstp->rq_arg.head[0].iov_len660+ rqstp->rq_arg.page_len661+ rqstp->rq_arg.tail[0].iov_len;662svc_rdma_put_context(ctxt, 0);663out:664dprintk("svcrdma: ret = %d, rq_arg.len =%d, "665"rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len = %zd\n",666ret, rqstp->rq_arg.len,667rqstp->rq_arg.head[0].iov_base,668rqstp->rq_arg.head[0].iov_len);669rqstp->rq_prot = IPPROTO_MAX;670svc_xprt_copy_addrs(rqstp, xprt);671return ret;672673close_out:674if (ctxt)675svc_rdma_put_context(ctxt, 1);676dprintk("svcrdma: transport %p is closing\n", xprt);677/*678* Set the close bit and enqueue it. svc_recv will see the679* close bit and call svc_xprt_delete680*/681set_bit(XPT_CLOSE, &xprt->xpt_flags);682defer:683return 0;684}685686687