Path: blob/master/net/sunrpc/xprtrdma/svc_rdma_transport.c
15109 views
/*1* Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved.2*3* This software is available to you under a choice of one of two4* licenses. You may choose to be licensed under the terms of the GNU5* General Public License (GPL) Version 2, available from the file6* COPYING in the main directory of this source tree, or the BSD-type7* license below:8*9* Redistribution and use in source and binary forms, with or without10* modification, are permitted provided that the following conditions11* are met:12*13* Redistributions of source code must retain the above copyright14* notice, this list of conditions and the following disclaimer.15*16* Redistributions in binary form must reproduce the above17* copyright notice, this list of conditions and the following18* disclaimer in the documentation and/or other materials provided19* with the distribution.20*21* Neither the name of the Network Appliance, Inc. nor the names of22* its contributors may be used to endorse or promote products23* derived from this software without specific prior written24* permission.25*26* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS27* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT28* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR29* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT30* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,31* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT32* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,33* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY34* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT35* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE36* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.37*38* Author: Tom Tucker <[email protected]>39*/4041#include <linux/sunrpc/svc_xprt.h>42#include <linux/sunrpc/debug.h>43#include <linux/sunrpc/rpc_rdma.h>44#include <linux/sched.h>45#include <linux/slab.h>46#include <linux/spinlock.h>47#include <linux/workqueue.h>48#include <rdma/ib_verbs.h>49#include <rdma/rdma_cm.h>50#include <linux/sunrpc/svc_rdma.h>5152#define RPCDBG_FACILITY RPCDBG_SVCXPRT5354static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,55struct net *net,56struct sockaddr *sa, int salen,57int flags);58static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt);59static void svc_rdma_release_rqst(struct svc_rqst *);60static void dto_tasklet_func(unsigned long data);61static void svc_rdma_detach(struct svc_xprt *xprt);62static void svc_rdma_free(struct svc_xprt *xprt);63static int svc_rdma_has_wspace(struct svc_xprt *xprt);64static void rq_cq_reap(struct svcxprt_rdma *xprt);65static void sq_cq_reap(struct svcxprt_rdma *xprt);6667static DECLARE_TASKLET(dto_tasklet, dto_tasklet_func, 0UL);68static DEFINE_SPINLOCK(dto_lock);69static LIST_HEAD(dto_xprt_q);7071static struct svc_xprt_ops svc_rdma_ops = {72.xpo_create = svc_rdma_create,73.xpo_recvfrom = svc_rdma_recvfrom,74.xpo_sendto = svc_rdma_sendto,75.xpo_release_rqst = svc_rdma_release_rqst,76.xpo_detach = svc_rdma_detach,77.xpo_free = svc_rdma_free,78.xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr,79.xpo_has_wspace = svc_rdma_has_wspace,80.xpo_accept = svc_rdma_accept,81};8283struct svc_xprt_class svc_rdma_class = {84.xcl_name = "rdma",85.xcl_owner = THIS_MODULE,86.xcl_ops = &svc_rdma_ops,87.xcl_max_payload = RPCSVC_MAXPAYLOAD_TCP,88};8990/* WR context cache. Created in svc_rdma.c */91extern struct kmem_cache *svc_rdma_ctxt_cachep;9293/* Workqueue created in svc_rdma.c */94extern struct workqueue_struct *svc_rdma_wq;9596struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)97{98struct svc_rdma_op_ctxt *ctxt;99100while (1) {101ctxt = kmem_cache_alloc(svc_rdma_ctxt_cachep, GFP_KERNEL);102if (ctxt)103break;104schedule_timeout_uninterruptible(msecs_to_jiffies(500));105}106ctxt->xprt = xprt;107INIT_LIST_HEAD(&ctxt->dto_q);108ctxt->count = 0;109ctxt->frmr = NULL;110atomic_inc(&xprt->sc_ctxt_used);111return ctxt;112}113114void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt)115{116struct svcxprt_rdma *xprt = ctxt->xprt;117int i;118for (i = 0; i < ctxt->count && ctxt->sge[i].length; i++) {119/*120* Unmap the DMA addr in the SGE if the lkey matches121* the sc_dma_lkey, otherwise, ignore it since it is122* an FRMR lkey and will be unmapped later when the123* last WR that uses it completes.124*/125if (ctxt->sge[i].lkey == xprt->sc_dma_lkey) {126atomic_dec(&xprt->sc_dma_used);127ib_dma_unmap_page(xprt->sc_cm_id->device,128ctxt->sge[i].addr,129ctxt->sge[i].length,130ctxt->direction);131}132}133}134135void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages)136{137struct svcxprt_rdma *xprt;138int i;139140BUG_ON(!ctxt);141xprt = ctxt->xprt;142if (free_pages)143for (i = 0; i < ctxt->count; i++)144put_page(ctxt->pages[i]);145146kmem_cache_free(svc_rdma_ctxt_cachep, ctxt);147atomic_dec(&xprt->sc_ctxt_used);148}149150/* Temporary NFS request map cache. Created in svc_rdma.c */151extern struct kmem_cache *svc_rdma_map_cachep;152153/*154* Temporary NFS req mappings are shared across all transport155* instances. These are short lived and should be bounded by the number156* of concurrent server threads * depth of the SQ.157*/158struct svc_rdma_req_map *svc_rdma_get_req_map(void)159{160struct svc_rdma_req_map *map;161while (1) {162map = kmem_cache_alloc(svc_rdma_map_cachep, GFP_KERNEL);163if (map)164break;165schedule_timeout_uninterruptible(msecs_to_jiffies(500));166}167map->count = 0;168map->frmr = NULL;169return map;170}171172void svc_rdma_put_req_map(struct svc_rdma_req_map *map)173{174kmem_cache_free(svc_rdma_map_cachep, map);175}176177/* ib_cq event handler */178static void cq_event_handler(struct ib_event *event, void *context)179{180struct svc_xprt *xprt = context;181dprintk("svcrdma: received CQ event id=%d, context=%p\n",182event->event, context);183set_bit(XPT_CLOSE, &xprt->xpt_flags);184}185186/* QP event handler */187static void qp_event_handler(struct ib_event *event, void *context)188{189struct svc_xprt *xprt = context;190191switch (event->event) {192/* These are considered benign events */193case IB_EVENT_PATH_MIG:194case IB_EVENT_COMM_EST:195case IB_EVENT_SQ_DRAINED:196case IB_EVENT_QP_LAST_WQE_REACHED:197dprintk("svcrdma: QP event %d received for QP=%p\n",198event->event, event->element.qp);199break;200/* These are considered fatal events */201case IB_EVENT_PATH_MIG_ERR:202case IB_EVENT_QP_FATAL:203case IB_EVENT_QP_REQ_ERR:204case IB_EVENT_QP_ACCESS_ERR:205case IB_EVENT_DEVICE_FATAL:206default:207dprintk("svcrdma: QP ERROR event %d received for QP=%p, "208"closing transport\n",209event->event, event->element.qp);210set_bit(XPT_CLOSE, &xprt->xpt_flags);211break;212}213}214215/*216* Data Transfer Operation Tasklet217*218* Walks a list of transports with I/O pending, removing entries as219* they are added to the server's I/O pending list. Two bits indicate220* if SQ, RQ, or both have I/O pending. The dto_lock is an irqsave221* spinlock that serializes access to the transport list with the RQ222* and SQ interrupt handlers.223*/224static void dto_tasklet_func(unsigned long data)225{226struct svcxprt_rdma *xprt;227unsigned long flags;228229spin_lock_irqsave(&dto_lock, flags);230while (!list_empty(&dto_xprt_q)) {231xprt = list_entry(dto_xprt_q.next,232struct svcxprt_rdma, sc_dto_q);233list_del_init(&xprt->sc_dto_q);234spin_unlock_irqrestore(&dto_lock, flags);235236rq_cq_reap(xprt);237sq_cq_reap(xprt);238239svc_xprt_put(&xprt->sc_xprt);240spin_lock_irqsave(&dto_lock, flags);241}242spin_unlock_irqrestore(&dto_lock, flags);243}244245/*246* Receive Queue Completion Handler247*248* Since an RQ completion handler is called on interrupt context, we249* need to defer the handling of the I/O to a tasklet250*/251static void rq_comp_handler(struct ib_cq *cq, void *cq_context)252{253struct svcxprt_rdma *xprt = cq_context;254unsigned long flags;255256/* Guard against unconditional flush call for destroyed QP */257if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)258return;259260/*261* Set the bit regardless of whether or not it's on the list262* because it may be on the list already due to an SQ263* completion.264*/265set_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags);266267/*268* If this transport is not already on the DTO transport queue,269* add it270*/271spin_lock_irqsave(&dto_lock, flags);272if (list_empty(&xprt->sc_dto_q)) {273svc_xprt_get(&xprt->sc_xprt);274list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);275}276spin_unlock_irqrestore(&dto_lock, flags);277278/* Tasklet does all the work to avoid irqsave locks. */279tasklet_schedule(&dto_tasklet);280}281282/*283* rq_cq_reap - Process the RQ CQ.284*285* Take all completing WC off the CQE and enqueue the associated DTO286* context on the dto_q for the transport.287*288* Note that caller must hold a transport reference.289*/290static void rq_cq_reap(struct svcxprt_rdma *xprt)291{292int ret;293struct ib_wc wc;294struct svc_rdma_op_ctxt *ctxt = NULL;295296if (!test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags))297return;298299ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);300atomic_inc(&rdma_stat_rq_poll);301302while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) {303ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;304ctxt->wc_status = wc.status;305ctxt->byte_len = wc.byte_len;306svc_rdma_unmap_dma(ctxt);307if (wc.status != IB_WC_SUCCESS) {308/* Close the transport */309dprintk("svcrdma: transport closing putting ctxt %p\n", ctxt);310set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);311svc_rdma_put_context(ctxt, 1);312svc_xprt_put(&xprt->sc_xprt);313continue;314}315spin_lock_bh(&xprt->sc_rq_dto_lock);316list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);317spin_unlock_bh(&xprt->sc_rq_dto_lock);318svc_xprt_put(&xprt->sc_xprt);319}320321if (ctxt)322atomic_inc(&rdma_stat_rq_prod);323324set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);325/*326* If data arrived before established event,327* don't enqueue. This defers RPC I/O until the328* RDMA connection is complete.329*/330if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))331svc_xprt_enqueue(&xprt->sc_xprt);332}333334/*335* Process a completion context336*/337static void process_context(struct svcxprt_rdma *xprt,338struct svc_rdma_op_ctxt *ctxt)339{340svc_rdma_unmap_dma(ctxt);341342switch (ctxt->wr_op) {343case IB_WR_SEND:344if (test_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags))345svc_rdma_put_frmr(xprt, ctxt->frmr);346svc_rdma_put_context(ctxt, 1);347break;348349case IB_WR_RDMA_WRITE:350svc_rdma_put_context(ctxt, 0);351break;352353case IB_WR_RDMA_READ:354case IB_WR_RDMA_READ_WITH_INV:355if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {356struct svc_rdma_op_ctxt *read_hdr = ctxt->read_hdr;357BUG_ON(!read_hdr);358if (test_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags))359svc_rdma_put_frmr(xprt, ctxt->frmr);360spin_lock_bh(&xprt->sc_rq_dto_lock);361set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);362list_add_tail(&read_hdr->dto_q,363&xprt->sc_read_complete_q);364spin_unlock_bh(&xprt->sc_rq_dto_lock);365svc_xprt_enqueue(&xprt->sc_xprt);366}367svc_rdma_put_context(ctxt, 0);368break;369370default:371printk(KERN_ERR "svcrdma: unexpected completion type, "372"opcode=%d\n",373ctxt->wr_op);374break;375}376}377378/*379* Send Queue Completion Handler - potentially called on interrupt context.380*381* Note that caller must hold a transport reference.382*/383static void sq_cq_reap(struct svcxprt_rdma *xprt)384{385struct svc_rdma_op_ctxt *ctxt = NULL;386struct ib_wc wc;387struct ib_cq *cq = xprt->sc_sq_cq;388int ret;389390if (!test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags))391return;392393ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);394atomic_inc(&rdma_stat_sq_poll);395while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) {396if (wc.status != IB_WC_SUCCESS)397/* Close the transport */398set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);399400/* Decrement used SQ WR count */401atomic_dec(&xprt->sc_sq_count);402wake_up(&xprt->sc_send_wait);403404ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;405if (ctxt)406process_context(xprt, ctxt);407408svc_xprt_put(&xprt->sc_xprt);409}410411if (ctxt)412atomic_inc(&rdma_stat_sq_prod);413}414415static void sq_comp_handler(struct ib_cq *cq, void *cq_context)416{417struct svcxprt_rdma *xprt = cq_context;418unsigned long flags;419420/* Guard against unconditional flush call for destroyed QP */421if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)422return;423424/*425* Set the bit regardless of whether or not it's on the list426* because it may be on the list already due to an RQ427* completion.428*/429set_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags);430431/*432* If this transport is not already on the DTO transport queue,433* add it434*/435spin_lock_irqsave(&dto_lock, flags);436if (list_empty(&xprt->sc_dto_q)) {437svc_xprt_get(&xprt->sc_xprt);438list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);439}440spin_unlock_irqrestore(&dto_lock, flags);441442/* Tasklet does all the work to avoid irqsave locks. */443tasklet_schedule(&dto_tasklet);444}445446static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,447int listener)448{449struct svcxprt_rdma *cma_xprt = kzalloc(sizeof *cma_xprt, GFP_KERNEL);450451if (!cma_xprt)452return NULL;453svc_xprt_init(&svc_rdma_class, &cma_xprt->sc_xprt, serv);454INIT_LIST_HEAD(&cma_xprt->sc_accept_q);455INIT_LIST_HEAD(&cma_xprt->sc_dto_q);456INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q);457INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);458INIT_LIST_HEAD(&cma_xprt->sc_frmr_q);459init_waitqueue_head(&cma_xprt->sc_send_wait);460461spin_lock_init(&cma_xprt->sc_lock);462spin_lock_init(&cma_xprt->sc_rq_dto_lock);463spin_lock_init(&cma_xprt->sc_frmr_q_lock);464465cma_xprt->sc_ord = svcrdma_ord;466467cma_xprt->sc_max_req_size = svcrdma_max_req_size;468cma_xprt->sc_max_requests = svcrdma_max_requests;469cma_xprt->sc_sq_depth = svcrdma_max_requests * RPCRDMA_SQ_DEPTH_MULT;470atomic_set(&cma_xprt->sc_sq_count, 0);471atomic_set(&cma_xprt->sc_ctxt_used, 0);472473if (listener)474set_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags);475476return cma_xprt;477}478479struct page *svc_rdma_get_page(void)480{481struct page *page;482483while ((page = alloc_page(GFP_KERNEL)) == NULL) {484/* If we can't get memory, wait a bit and try again */485printk(KERN_INFO "svcrdma: out of memory...retrying in 1000 "486"jiffies.\n");487schedule_timeout_uninterruptible(msecs_to_jiffies(1000));488}489return page;490}491492int svc_rdma_post_recv(struct svcxprt_rdma *xprt)493{494struct ib_recv_wr recv_wr, *bad_recv_wr;495struct svc_rdma_op_ctxt *ctxt;496struct page *page;497dma_addr_t pa;498int sge_no;499int buflen;500int ret;501502ctxt = svc_rdma_get_context(xprt);503buflen = 0;504ctxt->direction = DMA_FROM_DEVICE;505for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) {506BUG_ON(sge_no >= xprt->sc_max_sge);507page = svc_rdma_get_page();508ctxt->pages[sge_no] = page;509pa = ib_dma_map_page(xprt->sc_cm_id->device,510page, 0, PAGE_SIZE,511DMA_FROM_DEVICE);512if (ib_dma_mapping_error(xprt->sc_cm_id->device, pa))513goto err_put_ctxt;514atomic_inc(&xprt->sc_dma_used);515ctxt->sge[sge_no].addr = pa;516ctxt->sge[sge_no].length = PAGE_SIZE;517ctxt->sge[sge_no].lkey = xprt->sc_dma_lkey;518ctxt->count = sge_no + 1;519buflen += PAGE_SIZE;520}521recv_wr.next = NULL;522recv_wr.sg_list = &ctxt->sge[0];523recv_wr.num_sge = ctxt->count;524recv_wr.wr_id = (u64)(unsigned long)ctxt;525526svc_xprt_get(&xprt->sc_xprt);527ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr);528if (ret) {529svc_rdma_unmap_dma(ctxt);530svc_rdma_put_context(ctxt, 1);531svc_xprt_put(&xprt->sc_xprt);532}533return ret;534535err_put_ctxt:536svc_rdma_unmap_dma(ctxt);537svc_rdma_put_context(ctxt, 1);538return -ENOMEM;539}540541/*542* This function handles the CONNECT_REQUEST event on a listening543* endpoint. It is passed the cma_id for the _new_ connection. The context in544* this cma_id is inherited from the listening cma_id and is the svc_xprt545* structure for the listening endpoint.546*547* This function creates a new xprt for the new connection and enqueues it on548* the accept queue for the listent xprt. When the listen thread is kicked, it549* will call the recvfrom method on the listen xprt which will accept the new550* connection.551*/552static void handle_connect_req(struct rdma_cm_id *new_cma_id, size_t client_ird)553{554struct svcxprt_rdma *listen_xprt = new_cma_id->context;555struct svcxprt_rdma *newxprt;556struct sockaddr *sa;557558/* Create a new transport */559newxprt = rdma_create_xprt(listen_xprt->sc_xprt.xpt_server, 0);560if (!newxprt) {561dprintk("svcrdma: failed to create new transport\n");562return;563}564newxprt->sc_cm_id = new_cma_id;565new_cma_id->context = newxprt;566dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n",567newxprt, newxprt->sc_cm_id, listen_xprt);568569/* Save client advertised inbound read limit for use later in accept. */570newxprt->sc_ord = client_ird;571572/* Set the local and remote addresses in the transport */573sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;574svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa));575sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;576svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa));577578/*579* Enqueue the new transport on the accept queue of the listening580* transport581*/582spin_lock_bh(&listen_xprt->sc_lock);583list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q);584spin_unlock_bh(&listen_xprt->sc_lock);585586/*587* Can't use svc_xprt_received here because we are not on a588* rqstp thread589*/590set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags);591svc_xprt_enqueue(&listen_xprt->sc_xprt);592}593594/*595* Handles events generated on the listening endpoint. These events will be596* either be incoming connect requests or adapter removal events.597*/598static int rdma_listen_handler(struct rdma_cm_id *cma_id,599struct rdma_cm_event *event)600{601struct svcxprt_rdma *xprt = cma_id->context;602int ret = 0;603604switch (event->event) {605case RDMA_CM_EVENT_CONNECT_REQUEST:606dprintk("svcrdma: Connect request on cma_id=%p, xprt = %p, "607"event=%d\n", cma_id, cma_id->context, event->event);608handle_connect_req(cma_id,609event->param.conn.initiator_depth);610break;611612case RDMA_CM_EVENT_ESTABLISHED:613/* Accept complete */614dprintk("svcrdma: Connection completed on LISTEN xprt=%p, "615"cm_id=%p\n", xprt, cma_id);616break;617618case RDMA_CM_EVENT_DEVICE_REMOVAL:619dprintk("svcrdma: Device removal xprt=%p, cm_id=%p\n",620xprt, cma_id);621if (xprt)622set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);623break;624625default:626dprintk("svcrdma: Unexpected event on listening endpoint %p, "627"event=%d\n", cma_id, event->event);628break;629}630631return ret;632}633634static int rdma_cma_handler(struct rdma_cm_id *cma_id,635struct rdma_cm_event *event)636{637struct svc_xprt *xprt = cma_id->context;638struct svcxprt_rdma *rdma =639container_of(xprt, struct svcxprt_rdma, sc_xprt);640switch (event->event) {641case RDMA_CM_EVENT_ESTABLISHED:642/* Accept complete */643svc_xprt_get(xprt);644dprintk("svcrdma: Connection completed on DTO xprt=%p, "645"cm_id=%p\n", xprt, cma_id);646clear_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags);647svc_xprt_enqueue(xprt);648break;649case RDMA_CM_EVENT_DISCONNECTED:650dprintk("svcrdma: Disconnect on DTO xprt=%p, cm_id=%p\n",651xprt, cma_id);652if (xprt) {653set_bit(XPT_CLOSE, &xprt->xpt_flags);654svc_xprt_enqueue(xprt);655svc_xprt_put(xprt);656}657break;658case RDMA_CM_EVENT_DEVICE_REMOVAL:659dprintk("svcrdma: Device removal cma_id=%p, xprt = %p, "660"event=%d\n", cma_id, xprt, event->event);661if (xprt) {662set_bit(XPT_CLOSE, &xprt->xpt_flags);663svc_xprt_enqueue(xprt);664}665break;666default:667dprintk("svcrdma: Unexpected event on DTO endpoint %p, "668"event=%d\n", cma_id, event->event);669break;670}671return 0;672}673674/*675* Create a listening RDMA service endpoint.676*/677static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,678struct net *net,679struct sockaddr *sa, int salen,680int flags)681{682struct rdma_cm_id *listen_id;683struct svcxprt_rdma *cma_xprt;684struct svc_xprt *xprt;685int ret;686687dprintk("svcrdma: Creating RDMA socket\n");688if (sa->sa_family != AF_INET) {689dprintk("svcrdma: Address family %d is not supported.\n", sa->sa_family);690return ERR_PTR(-EAFNOSUPPORT);691}692cma_xprt = rdma_create_xprt(serv, 1);693if (!cma_xprt)694return ERR_PTR(-ENOMEM);695xprt = &cma_xprt->sc_xprt;696697listen_id = rdma_create_id(rdma_listen_handler, cma_xprt, RDMA_PS_TCP,698IB_QPT_RC);699if (IS_ERR(listen_id)) {700ret = PTR_ERR(listen_id);701dprintk("svcrdma: rdma_create_id failed = %d\n", ret);702goto err0;703}704705ret = rdma_bind_addr(listen_id, sa);706if (ret) {707dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret);708goto err1;709}710cma_xprt->sc_cm_id = listen_id;711712ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG);713if (ret) {714dprintk("svcrdma: rdma_listen failed = %d\n", ret);715goto err1;716}717718/*719* We need to use the address from the cm_id in case the720* caller specified 0 for the port number.721*/722sa = (struct sockaddr *)&cma_xprt->sc_cm_id->route.addr.src_addr;723svc_xprt_set_local(&cma_xprt->sc_xprt, sa, salen);724725return &cma_xprt->sc_xprt;726727err1:728rdma_destroy_id(listen_id);729err0:730kfree(cma_xprt);731return ERR_PTR(ret);732}733734static struct svc_rdma_fastreg_mr *rdma_alloc_frmr(struct svcxprt_rdma *xprt)735{736struct ib_mr *mr;737struct ib_fast_reg_page_list *pl;738struct svc_rdma_fastreg_mr *frmr;739740frmr = kmalloc(sizeof(*frmr), GFP_KERNEL);741if (!frmr)742goto err;743744mr = ib_alloc_fast_reg_mr(xprt->sc_pd, RPCSVC_MAXPAGES);745if (IS_ERR(mr))746goto err_free_frmr;747748pl = ib_alloc_fast_reg_page_list(xprt->sc_cm_id->device,749RPCSVC_MAXPAGES);750if (IS_ERR(pl))751goto err_free_mr;752753frmr->mr = mr;754frmr->page_list = pl;755INIT_LIST_HEAD(&frmr->frmr_list);756return frmr;757758err_free_mr:759ib_dereg_mr(mr);760err_free_frmr:761kfree(frmr);762err:763return ERR_PTR(-ENOMEM);764}765766static void rdma_dealloc_frmr_q(struct svcxprt_rdma *xprt)767{768struct svc_rdma_fastreg_mr *frmr;769770while (!list_empty(&xprt->sc_frmr_q)) {771frmr = list_entry(xprt->sc_frmr_q.next,772struct svc_rdma_fastreg_mr, frmr_list);773list_del_init(&frmr->frmr_list);774ib_dereg_mr(frmr->mr);775ib_free_fast_reg_page_list(frmr->page_list);776kfree(frmr);777}778}779780struct svc_rdma_fastreg_mr *svc_rdma_get_frmr(struct svcxprt_rdma *rdma)781{782struct svc_rdma_fastreg_mr *frmr = NULL;783784spin_lock_bh(&rdma->sc_frmr_q_lock);785if (!list_empty(&rdma->sc_frmr_q)) {786frmr = list_entry(rdma->sc_frmr_q.next,787struct svc_rdma_fastreg_mr, frmr_list);788list_del_init(&frmr->frmr_list);789frmr->map_len = 0;790frmr->page_list_len = 0;791}792spin_unlock_bh(&rdma->sc_frmr_q_lock);793if (frmr)794return frmr;795796return rdma_alloc_frmr(rdma);797}798799static void frmr_unmap_dma(struct svcxprt_rdma *xprt,800struct svc_rdma_fastreg_mr *frmr)801{802int page_no;803for (page_no = 0; page_no < frmr->page_list_len; page_no++) {804dma_addr_t addr = frmr->page_list->page_list[page_no];805if (ib_dma_mapping_error(frmr->mr->device, addr))806continue;807atomic_dec(&xprt->sc_dma_used);808ib_dma_unmap_page(frmr->mr->device, addr, PAGE_SIZE,809frmr->direction);810}811}812813void svc_rdma_put_frmr(struct svcxprt_rdma *rdma,814struct svc_rdma_fastreg_mr *frmr)815{816if (frmr) {817frmr_unmap_dma(rdma, frmr);818spin_lock_bh(&rdma->sc_frmr_q_lock);819BUG_ON(!list_empty(&frmr->frmr_list));820list_add(&frmr->frmr_list, &rdma->sc_frmr_q);821spin_unlock_bh(&rdma->sc_frmr_q_lock);822}823}824825/*826* This is the xpo_recvfrom function for listening endpoints. Its827* purpose is to accept incoming connections. The CMA callback handler828* has already created a new transport and attached it to the new CMA829* ID.830*831* There is a queue of pending connections hung on the listening832* transport. This queue contains the new svc_xprt structure. This833* function takes svc_xprt structures off the accept_q and completes834* the connection.835*/836static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)837{838struct svcxprt_rdma *listen_rdma;839struct svcxprt_rdma *newxprt = NULL;840struct rdma_conn_param conn_param;841struct ib_qp_init_attr qp_attr;842struct ib_device_attr devattr;843int uninitialized_var(dma_mr_acc);844int need_dma_mr;845int ret;846int i;847848listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt);849clear_bit(XPT_CONN, &xprt->xpt_flags);850/* Get the next entry off the accept list */851spin_lock_bh(&listen_rdma->sc_lock);852if (!list_empty(&listen_rdma->sc_accept_q)) {853newxprt = list_entry(listen_rdma->sc_accept_q.next,854struct svcxprt_rdma, sc_accept_q);855list_del_init(&newxprt->sc_accept_q);856}857if (!list_empty(&listen_rdma->sc_accept_q))858set_bit(XPT_CONN, &listen_rdma->sc_xprt.xpt_flags);859spin_unlock_bh(&listen_rdma->sc_lock);860if (!newxprt)861return NULL;862863dprintk("svcrdma: newxprt from accept queue = %p, cm_id=%p\n",864newxprt, newxprt->sc_cm_id);865866ret = ib_query_device(newxprt->sc_cm_id->device, &devattr);867if (ret) {868dprintk("svcrdma: could not query device attributes on "869"device %p, rc=%d\n", newxprt->sc_cm_id->device, ret);870goto errout;871}872873/* Qualify the transport resource defaults with the874* capabilities of this particular device */875newxprt->sc_max_sge = min((size_t)devattr.max_sge,876(size_t)RPCSVC_MAXPAGES);877newxprt->sc_max_requests = min((size_t)devattr.max_qp_wr,878(size_t)svcrdma_max_requests);879newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_max_requests;880881/*882* Limit ORD based on client limit, local device limit, and883* configured svcrdma limit.884*/885newxprt->sc_ord = min_t(size_t, devattr.max_qp_rd_atom, newxprt->sc_ord);886newxprt->sc_ord = min_t(size_t, svcrdma_ord, newxprt->sc_ord);887888newxprt->sc_pd = ib_alloc_pd(newxprt->sc_cm_id->device);889if (IS_ERR(newxprt->sc_pd)) {890dprintk("svcrdma: error creating PD for connect request\n");891goto errout;892}893newxprt->sc_sq_cq = ib_create_cq(newxprt->sc_cm_id->device,894sq_comp_handler,895cq_event_handler,896newxprt,897newxprt->sc_sq_depth,8980);899if (IS_ERR(newxprt->sc_sq_cq)) {900dprintk("svcrdma: error creating SQ CQ for connect request\n");901goto errout;902}903newxprt->sc_rq_cq = ib_create_cq(newxprt->sc_cm_id->device,904rq_comp_handler,905cq_event_handler,906newxprt,907newxprt->sc_max_requests,9080);909if (IS_ERR(newxprt->sc_rq_cq)) {910dprintk("svcrdma: error creating RQ CQ for connect request\n");911goto errout;912}913914memset(&qp_attr, 0, sizeof qp_attr);915qp_attr.event_handler = qp_event_handler;916qp_attr.qp_context = &newxprt->sc_xprt;917qp_attr.cap.max_send_wr = newxprt->sc_sq_depth;918qp_attr.cap.max_recv_wr = newxprt->sc_max_requests;919qp_attr.cap.max_send_sge = newxprt->sc_max_sge;920qp_attr.cap.max_recv_sge = newxprt->sc_max_sge;921qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;922qp_attr.qp_type = IB_QPT_RC;923qp_attr.send_cq = newxprt->sc_sq_cq;924qp_attr.recv_cq = newxprt->sc_rq_cq;925dprintk("svcrdma: newxprt->sc_cm_id=%p, newxprt->sc_pd=%p\n"926" cm_id->device=%p, sc_pd->device=%p\n"927" cap.max_send_wr = %d\n"928" cap.max_recv_wr = %d\n"929" cap.max_send_sge = %d\n"930" cap.max_recv_sge = %d\n",931newxprt->sc_cm_id, newxprt->sc_pd,932newxprt->sc_cm_id->device, newxprt->sc_pd->device,933qp_attr.cap.max_send_wr,934qp_attr.cap.max_recv_wr,935qp_attr.cap.max_send_sge,936qp_attr.cap.max_recv_sge);937938ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd, &qp_attr);939if (ret) {940/*941* XXX: This is a hack. We need a xx_request_qp interface942* that will adjust the qp_attr's with a best-effort943* number944*/945qp_attr.cap.max_send_sge -= 2;946qp_attr.cap.max_recv_sge -= 2;947ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd,948&qp_attr);949if (ret) {950dprintk("svcrdma: failed to create QP, ret=%d\n", ret);951goto errout;952}953newxprt->sc_max_sge = qp_attr.cap.max_send_sge;954newxprt->sc_max_sge = qp_attr.cap.max_recv_sge;955newxprt->sc_sq_depth = qp_attr.cap.max_send_wr;956newxprt->sc_max_requests = qp_attr.cap.max_recv_wr;957}958newxprt->sc_qp = newxprt->sc_cm_id->qp;959960/*961* Use the most secure set of MR resources based on the962* transport type and available memory management features in963* the device. Here's the table implemented below:964*965* Fast Global DMA Remote WR966* Reg LKEY MR Access967* Sup'd Sup'd Needed Needed968*969* IWARP N N Y Y970* N Y Y Y971* Y N Y N972* Y Y N -973*974* IB N N Y N975* N Y N -976* Y N Y N977* Y Y N -978*979* NB: iWARP requires remote write access for the data sink980* of an RDMA_READ. IB does not.981*/982if (devattr.device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) {983newxprt->sc_frmr_pg_list_len =984devattr.max_fast_reg_page_list_len;985newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_FAST_REG;986}987988/*989* Determine if a DMA MR is required and if so, what privs are required990*/991switch (rdma_node_get_transport(newxprt->sc_cm_id->device->node_type)) {992case RDMA_TRANSPORT_IWARP:993newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_READ_W_INV;994if (!(newxprt->sc_dev_caps & SVCRDMA_DEVCAP_FAST_REG)) {995need_dma_mr = 1;996dma_mr_acc =997(IB_ACCESS_LOCAL_WRITE |998IB_ACCESS_REMOTE_WRITE);999} else if (!(devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY)) {1000need_dma_mr = 1;1001dma_mr_acc = IB_ACCESS_LOCAL_WRITE;1002} else1003need_dma_mr = 0;1004break;1005case RDMA_TRANSPORT_IB:1006if (!(devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY)) {1007need_dma_mr = 1;1008dma_mr_acc = IB_ACCESS_LOCAL_WRITE;1009} else1010need_dma_mr = 0;1011break;1012default:1013goto errout;1014}10151016/* Create the DMA MR if needed, otherwise, use the DMA LKEY */1017if (need_dma_mr) {1018/* Register all of physical memory */1019newxprt->sc_phys_mr =1020ib_get_dma_mr(newxprt->sc_pd, dma_mr_acc);1021if (IS_ERR(newxprt->sc_phys_mr)) {1022dprintk("svcrdma: Failed to create DMA MR ret=%d\n",1023ret);1024goto errout;1025}1026newxprt->sc_dma_lkey = newxprt->sc_phys_mr->lkey;1027} else1028newxprt->sc_dma_lkey =1029newxprt->sc_cm_id->device->local_dma_lkey;10301031/* Post receive buffers */1032for (i = 0; i < newxprt->sc_max_requests; i++) {1033ret = svc_rdma_post_recv(newxprt);1034if (ret) {1035dprintk("svcrdma: failure posting receive buffers\n");1036goto errout;1037}1038}10391040/* Swap out the handler */1041newxprt->sc_cm_id->event_handler = rdma_cma_handler;10421043/*1044* Arm the CQs for the SQ and RQ before accepting so we can't1045* miss the first message1046*/1047ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);1048ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP);10491050/* Accept Connection */1051set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags);1052memset(&conn_param, 0, sizeof conn_param);1053conn_param.responder_resources = 0;1054conn_param.initiator_depth = newxprt->sc_ord;1055ret = rdma_accept(newxprt->sc_cm_id, &conn_param);1056if (ret) {1057dprintk("svcrdma: failed to accept new connection, ret=%d\n",1058ret);1059goto errout;1060}10611062dprintk("svcrdma: new connection %p accepted with the following "1063"attributes:\n"1064" local_ip : %pI4\n"1065" local_port : %d\n"1066" remote_ip : %pI4\n"1067" remote_port : %d\n"1068" max_sge : %d\n"1069" sq_depth : %d\n"1070" max_requests : %d\n"1071" ord : %d\n",1072newxprt,1073&((struct sockaddr_in *)&newxprt->sc_cm_id->1074route.addr.src_addr)->sin_addr.s_addr,1075ntohs(((struct sockaddr_in *)&newxprt->sc_cm_id->1076route.addr.src_addr)->sin_port),1077&((struct sockaddr_in *)&newxprt->sc_cm_id->1078route.addr.dst_addr)->sin_addr.s_addr,1079ntohs(((struct sockaddr_in *)&newxprt->sc_cm_id->1080route.addr.dst_addr)->sin_port),1081newxprt->sc_max_sge,1082newxprt->sc_sq_depth,1083newxprt->sc_max_requests,1084newxprt->sc_ord);10851086return &newxprt->sc_xprt;10871088errout:1089dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret);1090/* Take a reference in case the DTO handler runs */1091svc_xprt_get(&newxprt->sc_xprt);1092if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp))1093ib_destroy_qp(newxprt->sc_qp);1094rdma_destroy_id(newxprt->sc_cm_id);1095/* This call to put will destroy the transport */1096svc_xprt_put(&newxprt->sc_xprt);1097return NULL;1098}10991100static void svc_rdma_release_rqst(struct svc_rqst *rqstp)1101{1102}11031104/*1105* When connected, an svc_xprt has at least two references:1106*1107* - A reference held by the cm_id between the ESTABLISHED and1108* DISCONNECTED events. If the remote peer disconnected first, this1109* reference could be gone.1110*1111* - A reference held by the svc_recv code that called this function1112* as part of close processing.1113*1114* At a minimum one references should still be held.1115*/1116static void svc_rdma_detach(struct svc_xprt *xprt)1117{1118struct svcxprt_rdma *rdma =1119container_of(xprt, struct svcxprt_rdma, sc_xprt);1120dprintk("svc: svc_rdma_detach(%p)\n", xprt);11211122/* Disconnect and flush posted WQE */1123rdma_disconnect(rdma->sc_cm_id);1124}11251126static void __svc_rdma_free(struct work_struct *work)1127{1128struct svcxprt_rdma *rdma =1129container_of(work, struct svcxprt_rdma, sc_work);1130dprintk("svcrdma: svc_rdma_free(%p)\n", rdma);11311132/* We should only be called from kref_put */1133BUG_ON(atomic_read(&rdma->sc_xprt.xpt_ref.refcount) != 0);11341135/*1136* Destroy queued, but not processed read completions. Note1137* that this cleanup has to be done before destroying the1138* cm_id because the device ptr is needed to unmap the dma in1139* svc_rdma_put_context.1140*/1141while (!list_empty(&rdma->sc_read_complete_q)) {1142struct svc_rdma_op_ctxt *ctxt;1143ctxt = list_entry(rdma->sc_read_complete_q.next,1144struct svc_rdma_op_ctxt,1145dto_q);1146list_del_init(&ctxt->dto_q);1147svc_rdma_put_context(ctxt, 1);1148}11491150/* Destroy queued, but not processed recv completions */1151while (!list_empty(&rdma->sc_rq_dto_q)) {1152struct svc_rdma_op_ctxt *ctxt;1153ctxt = list_entry(rdma->sc_rq_dto_q.next,1154struct svc_rdma_op_ctxt,1155dto_q);1156list_del_init(&ctxt->dto_q);1157svc_rdma_put_context(ctxt, 1);1158}11591160/* Warn if we leaked a resource or under-referenced */1161WARN_ON(atomic_read(&rdma->sc_ctxt_used) != 0);1162WARN_ON(atomic_read(&rdma->sc_dma_used) != 0);11631164/* De-allocate fastreg mr */1165rdma_dealloc_frmr_q(rdma);11661167/* Destroy the QP if present (not a listener) */1168if (rdma->sc_qp && !IS_ERR(rdma->sc_qp))1169ib_destroy_qp(rdma->sc_qp);11701171if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq))1172ib_destroy_cq(rdma->sc_sq_cq);11731174if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq))1175ib_destroy_cq(rdma->sc_rq_cq);11761177if (rdma->sc_phys_mr && !IS_ERR(rdma->sc_phys_mr))1178ib_dereg_mr(rdma->sc_phys_mr);11791180if (rdma->sc_pd && !IS_ERR(rdma->sc_pd))1181ib_dealloc_pd(rdma->sc_pd);11821183/* Destroy the CM ID */1184rdma_destroy_id(rdma->sc_cm_id);11851186kfree(rdma);1187}11881189static void svc_rdma_free(struct svc_xprt *xprt)1190{1191struct svcxprt_rdma *rdma =1192container_of(xprt, struct svcxprt_rdma, sc_xprt);1193INIT_WORK(&rdma->sc_work, __svc_rdma_free);1194queue_work(svc_rdma_wq, &rdma->sc_work);1195}11961197static int svc_rdma_has_wspace(struct svc_xprt *xprt)1198{1199struct svcxprt_rdma *rdma =1200container_of(xprt, struct svcxprt_rdma, sc_xprt);12011202/*1203* If there are fewer SQ WR available than required to send a1204* simple response, return false.1205*/1206if ((rdma->sc_sq_depth - atomic_read(&rdma->sc_sq_count) < 3))1207return 0;12081209/*1210* ...or there are already waiters on the SQ,1211* return false.1212*/1213if (waitqueue_active(&rdma->sc_send_wait))1214return 0;12151216/* Otherwise return true. */1217return 1;1218}12191220/*1221* Attempt to register the kvec representing the RPC memory with the1222* device.1223*1224* Returns:1225* NULL : The device does not support fastreg or there were no more1226* fastreg mr.1227* frmr : The kvec register request was successfully posted.1228* <0 : An error was encountered attempting to register the kvec.1229*/1230int svc_rdma_fastreg(struct svcxprt_rdma *xprt,1231struct svc_rdma_fastreg_mr *frmr)1232{1233struct ib_send_wr fastreg_wr;1234u8 key;12351236/* Bump the key */1237key = (u8)(frmr->mr->lkey & 0x000000FF);1238ib_update_fast_reg_key(frmr->mr, ++key);12391240/* Prepare FASTREG WR */1241memset(&fastreg_wr, 0, sizeof fastreg_wr);1242fastreg_wr.opcode = IB_WR_FAST_REG_MR;1243fastreg_wr.send_flags = IB_SEND_SIGNALED;1244fastreg_wr.wr.fast_reg.iova_start = (unsigned long)frmr->kva;1245fastreg_wr.wr.fast_reg.page_list = frmr->page_list;1246fastreg_wr.wr.fast_reg.page_list_len = frmr->page_list_len;1247fastreg_wr.wr.fast_reg.page_shift = PAGE_SHIFT;1248fastreg_wr.wr.fast_reg.length = frmr->map_len;1249fastreg_wr.wr.fast_reg.access_flags = frmr->access_flags;1250fastreg_wr.wr.fast_reg.rkey = frmr->mr->lkey;1251return svc_rdma_send(xprt, &fastreg_wr);1252}12531254int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)1255{1256struct ib_send_wr *bad_wr, *n_wr;1257int wr_count;1258int i;1259int ret;12601261if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))1262return -ENOTCONN;12631264BUG_ON(wr->send_flags != IB_SEND_SIGNALED);1265wr_count = 1;1266for (n_wr = wr->next; n_wr; n_wr = n_wr->next)1267wr_count++;12681269/* If the SQ is full, wait until an SQ entry is available */1270while (1) {1271spin_lock_bh(&xprt->sc_lock);1272if (xprt->sc_sq_depth < atomic_read(&xprt->sc_sq_count) + wr_count) {1273spin_unlock_bh(&xprt->sc_lock);1274atomic_inc(&rdma_stat_sq_starve);12751276/* See if we can opportunistically reap SQ WR to make room */1277sq_cq_reap(xprt);12781279/* Wait until SQ WR available if SQ still full */1280wait_event(xprt->sc_send_wait,1281atomic_read(&xprt->sc_sq_count) <1282xprt->sc_sq_depth);1283if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))1284return -ENOTCONN;1285continue;1286}1287/* Take a transport ref for each WR posted */1288for (i = 0; i < wr_count; i++)1289svc_xprt_get(&xprt->sc_xprt);12901291/* Bump used SQ WR count and post */1292atomic_add(wr_count, &xprt->sc_sq_count);1293ret = ib_post_send(xprt->sc_qp, wr, &bad_wr);1294if (ret) {1295set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);1296atomic_sub(wr_count, &xprt->sc_sq_count);1297for (i = 0; i < wr_count; i ++)1298svc_xprt_put(&xprt->sc_xprt);1299dprintk("svcrdma: failed to post SQ WR rc=%d, "1300"sc_sq_count=%d, sc_sq_depth=%d\n",1301ret, atomic_read(&xprt->sc_sq_count),1302xprt->sc_sq_depth);1303}1304spin_unlock_bh(&xprt->sc_lock);1305if (ret)1306wake_up(&xprt->sc_send_wait);1307break;1308}1309return ret;1310}13111312void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,1313enum rpcrdma_errcode err)1314{1315struct ib_send_wr err_wr;1316struct page *p;1317struct svc_rdma_op_ctxt *ctxt;1318u32 *va;1319int length;1320int ret;13211322p = svc_rdma_get_page();1323va = page_address(p);13241325/* XDR encode error */1326length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);13271328ctxt = svc_rdma_get_context(xprt);1329ctxt->direction = DMA_FROM_DEVICE;1330ctxt->count = 1;1331ctxt->pages[0] = p;13321333/* Prepare SGE for local address */1334ctxt->sge[0].addr = ib_dma_map_page(xprt->sc_cm_id->device,1335p, 0, length, DMA_FROM_DEVICE);1336if (ib_dma_mapping_error(xprt->sc_cm_id->device, ctxt->sge[0].addr)) {1337put_page(p);1338svc_rdma_put_context(ctxt, 1);1339return;1340}1341atomic_inc(&xprt->sc_dma_used);1342ctxt->sge[0].lkey = xprt->sc_dma_lkey;1343ctxt->sge[0].length = length;13441345/* Prepare SEND WR */1346memset(&err_wr, 0, sizeof err_wr);1347ctxt->wr_op = IB_WR_SEND;1348err_wr.wr_id = (unsigned long)ctxt;1349err_wr.sg_list = ctxt->sge;1350err_wr.num_sge = 1;1351err_wr.opcode = IB_WR_SEND;1352err_wr.send_flags = IB_SEND_SIGNALED;13531354/* Post It */1355ret = svc_rdma_send(xprt, &err_wr);1356if (ret) {1357dprintk("svcrdma: Error %d posting send for protocol error\n",1358ret);1359svc_rdma_unmap_dma(ctxt);1360svc_rdma_put_context(ctxt, 1);1361}1362}136313641365