Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
awilliam
GitHub Repository: awilliam/linux-vfio
Path: blob/master/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
15111 views
1
/*
2
* Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
3
*
4
* This software is available to you under a choice of one of two
5
* licenses. You may choose to be licensed under the terms of the GNU
6
* General Public License (GPL) Version 2, available from the file
7
* COPYING in the main directory of this source tree, or the BSD-type
8
* license below:
9
*
10
* Redistribution and use in source and binary forms, with or without
11
* modification, are permitted provided that the following conditions
12
* are met:
13
*
14
* Redistributions of source code must retain the above copyright
15
* notice, this list of conditions and the following disclaimer.
16
*
17
* Redistributions in binary form must reproduce the above
18
* copyright notice, this list of conditions and the following
19
* disclaimer in the documentation and/or other materials provided
20
* with the distribution.
21
*
22
* Neither the name of the Network Appliance, Inc. nor the names of
23
* its contributors may be used to endorse or promote products
24
* derived from this software without specific prior written
25
* permission.
26
*
27
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38
*
39
* Author: Tom Tucker <[email protected]>
40
*/
41
42
#include <linux/sunrpc/debug.h>
43
#include <linux/sunrpc/rpc_rdma.h>
44
#include <linux/spinlock.h>
45
#include <asm/unaligned.h>
46
#include <rdma/ib_verbs.h>
47
#include <rdma/rdma_cm.h>
48
#include <linux/sunrpc/svc_rdma.h>
49
50
#define RPCDBG_FACILITY RPCDBG_SVCXPRT
51
52
/*
53
* Replace the pages in the rq_argpages array with the pages from the SGE in
54
* the RDMA_RECV completion. The SGL should contain full pages up until the
55
* last one.
56
*/
57
static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
58
struct svc_rdma_op_ctxt *ctxt,
59
u32 byte_count)
60
{
61
struct page *page;
62
u32 bc;
63
int sge_no;
64
65
/* Swap the page in the SGE with the page in argpages */
66
page = ctxt->pages[0];
67
put_page(rqstp->rq_pages[0]);
68
rqstp->rq_pages[0] = page;
69
70
/* Set up the XDR head */
71
rqstp->rq_arg.head[0].iov_base = page_address(page);
72
rqstp->rq_arg.head[0].iov_len = min(byte_count, ctxt->sge[0].length);
73
rqstp->rq_arg.len = byte_count;
74
rqstp->rq_arg.buflen = byte_count;
75
76
/* Compute bytes past head in the SGL */
77
bc = byte_count - rqstp->rq_arg.head[0].iov_len;
78
79
/* If data remains, store it in the pagelist */
80
rqstp->rq_arg.page_len = bc;
81
rqstp->rq_arg.page_base = 0;
82
rqstp->rq_arg.pages = &rqstp->rq_pages[1];
83
sge_no = 1;
84
while (bc && sge_no < ctxt->count) {
85
page = ctxt->pages[sge_no];
86
put_page(rqstp->rq_pages[sge_no]);
87
rqstp->rq_pages[sge_no] = page;
88
bc -= min(bc, ctxt->sge[sge_no].length);
89
rqstp->rq_arg.buflen += ctxt->sge[sge_no].length;
90
sge_no++;
91
}
92
rqstp->rq_respages = &rqstp->rq_pages[sge_no];
93
94
/* We should never run out of SGE because the limit is defined to
95
* support the max allowed RPC data length
96
*/
97
BUG_ON(bc && (sge_no == ctxt->count));
98
BUG_ON((rqstp->rq_arg.head[0].iov_len + rqstp->rq_arg.page_len)
99
!= byte_count);
100
BUG_ON(rqstp->rq_arg.len != byte_count);
101
102
/* If not all pages were used from the SGL, free the remaining ones */
103
bc = sge_no;
104
while (sge_no < ctxt->count) {
105
page = ctxt->pages[sge_no++];
106
put_page(page);
107
}
108
ctxt->count = bc;
109
110
/* Set up tail */
111
rqstp->rq_arg.tail[0].iov_base = NULL;
112
rqstp->rq_arg.tail[0].iov_len = 0;
113
}
114
115
/* Encode a read-chunk-list as an array of IB SGE
116
*
117
* Assumptions:
118
* - chunk[0]->position points to pages[0] at an offset of 0
119
* - pages[] is not physically or virtually contiguous and consists of
120
* PAGE_SIZE elements.
121
*
122
* Output:
123
* - sge array pointing into pages[] array.
124
* - chunk_sge array specifying sge index and count for each
125
* chunk in the read list
126
*
127
*/
128
static int map_read_chunks(struct svcxprt_rdma *xprt,
129
struct svc_rqst *rqstp,
130
struct svc_rdma_op_ctxt *head,
131
struct rpcrdma_msg *rmsgp,
132
struct svc_rdma_req_map *rpl_map,
133
struct svc_rdma_req_map *chl_map,
134
int ch_count,
135
int byte_count)
136
{
137
int sge_no;
138
int sge_bytes;
139
int page_off;
140
int page_no;
141
int ch_bytes;
142
int ch_no;
143
struct rpcrdma_read_chunk *ch;
144
145
sge_no = 0;
146
page_no = 0;
147
page_off = 0;
148
ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
149
ch_no = 0;
150
ch_bytes = ch->rc_target.rs_length;
151
head->arg.head[0] = rqstp->rq_arg.head[0];
152
head->arg.tail[0] = rqstp->rq_arg.tail[0];
153
head->arg.pages = &head->pages[head->count];
154
head->hdr_count = head->count; /* save count of hdr pages */
155
head->arg.page_base = 0;
156
head->arg.page_len = ch_bytes;
157
head->arg.len = rqstp->rq_arg.len + ch_bytes;
158
head->arg.buflen = rqstp->rq_arg.buflen + ch_bytes;
159
head->count++;
160
chl_map->ch[0].start = 0;
161
while (byte_count) {
162
rpl_map->sge[sge_no].iov_base =
163
page_address(rqstp->rq_arg.pages[page_no]) + page_off;
164
sge_bytes = min_t(int, PAGE_SIZE-page_off, ch_bytes);
165
rpl_map->sge[sge_no].iov_len = sge_bytes;
166
/*
167
* Don't bump head->count here because the same page
168
* may be used by multiple SGE.
169
*/
170
head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no];
171
rqstp->rq_respages = &rqstp->rq_arg.pages[page_no+1];
172
173
byte_count -= sge_bytes;
174
ch_bytes -= sge_bytes;
175
sge_no++;
176
/*
177
* If all bytes for this chunk have been mapped to an
178
* SGE, move to the next SGE
179
*/
180
if (ch_bytes == 0) {
181
chl_map->ch[ch_no].count =
182
sge_no - chl_map->ch[ch_no].start;
183
ch_no++;
184
ch++;
185
chl_map->ch[ch_no].start = sge_no;
186
ch_bytes = ch->rc_target.rs_length;
187
/* If bytes remaining account for next chunk */
188
if (byte_count) {
189
head->arg.page_len += ch_bytes;
190
head->arg.len += ch_bytes;
191
head->arg.buflen += ch_bytes;
192
}
193
}
194
/*
195
* If this SGE consumed all of the page, move to the
196
* next page
197
*/
198
if ((sge_bytes + page_off) == PAGE_SIZE) {
199
page_no++;
200
page_off = 0;
201
/*
202
* If there are still bytes left to map, bump
203
* the page count
204
*/
205
if (byte_count)
206
head->count++;
207
} else
208
page_off += sge_bytes;
209
}
210
BUG_ON(byte_count != 0);
211
return sge_no;
212
}
213
214
/* Map a read-chunk-list to an XDR and fast register the page-list.
215
*
216
* Assumptions:
217
* - chunk[0] position points to pages[0] at an offset of 0
218
* - pages[] will be made physically contiguous by creating a one-off memory
219
* region using the fastreg verb.
220
* - byte_count is # of bytes in read-chunk-list
221
* - ch_count is # of chunks in read-chunk-list
222
*
223
* Output:
224
* - sge array pointing into pages[] array.
225
* - chunk_sge array specifying sge index and count for each
226
* chunk in the read list
227
*/
228
static int fast_reg_read_chunks(struct svcxprt_rdma *xprt,
229
struct svc_rqst *rqstp,
230
struct svc_rdma_op_ctxt *head,
231
struct rpcrdma_msg *rmsgp,
232
struct svc_rdma_req_map *rpl_map,
233
struct svc_rdma_req_map *chl_map,
234
int ch_count,
235
int byte_count)
236
{
237
int page_no;
238
int ch_no;
239
u32 offset;
240
struct rpcrdma_read_chunk *ch;
241
struct svc_rdma_fastreg_mr *frmr;
242
int ret = 0;
243
244
frmr = svc_rdma_get_frmr(xprt);
245
if (IS_ERR(frmr))
246
return -ENOMEM;
247
248
head->frmr = frmr;
249
head->arg.head[0] = rqstp->rq_arg.head[0];
250
head->arg.tail[0] = rqstp->rq_arg.tail[0];
251
head->arg.pages = &head->pages[head->count];
252
head->hdr_count = head->count; /* save count of hdr pages */
253
head->arg.page_base = 0;
254
head->arg.page_len = byte_count;
255
head->arg.len = rqstp->rq_arg.len + byte_count;
256
head->arg.buflen = rqstp->rq_arg.buflen + byte_count;
257
258
/* Fast register the page list */
259
frmr->kva = page_address(rqstp->rq_arg.pages[0]);
260
frmr->direction = DMA_FROM_DEVICE;
261
frmr->access_flags = (IB_ACCESS_LOCAL_WRITE|IB_ACCESS_REMOTE_WRITE);
262
frmr->map_len = byte_count;
263
frmr->page_list_len = PAGE_ALIGN(byte_count) >> PAGE_SHIFT;
264
for (page_no = 0; page_no < frmr->page_list_len; page_no++) {
265
frmr->page_list->page_list[page_no] =
266
ib_dma_map_page(xprt->sc_cm_id->device,
267
rqstp->rq_arg.pages[page_no], 0,
268
PAGE_SIZE, DMA_FROM_DEVICE);
269
if (ib_dma_mapping_error(xprt->sc_cm_id->device,
270
frmr->page_list->page_list[page_no]))
271
goto fatal_err;
272
atomic_inc(&xprt->sc_dma_used);
273
head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no];
274
}
275
head->count += page_no;
276
277
/* rq_respages points one past arg pages */
278
rqstp->rq_respages = &rqstp->rq_arg.pages[page_no];
279
280
/* Create the reply and chunk maps */
281
offset = 0;
282
ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
283
for (ch_no = 0; ch_no < ch_count; ch_no++) {
284
rpl_map->sge[ch_no].iov_base = frmr->kva + offset;
285
rpl_map->sge[ch_no].iov_len = ch->rc_target.rs_length;
286
chl_map->ch[ch_no].count = 1;
287
chl_map->ch[ch_no].start = ch_no;
288
offset += ch->rc_target.rs_length;
289
ch++;
290
}
291
292
ret = svc_rdma_fastreg(xprt, frmr);
293
if (ret)
294
goto fatal_err;
295
296
return ch_no;
297
298
fatal_err:
299
printk("svcrdma: error fast registering xdr for xprt %p", xprt);
300
svc_rdma_put_frmr(xprt, frmr);
301
return -EIO;
302
}
303
304
static int rdma_set_ctxt_sge(struct svcxprt_rdma *xprt,
305
struct svc_rdma_op_ctxt *ctxt,
306
struct svc_rdma_fastreg_mr *frmr,
307
struct kvec *vec,
308
u64 *sgl_offset,
309
int count)
310
{
311
int i;
312
unsigned long off;
313
314
ctxt->count = count;
315
ctxt->direction = DMA_FROM_DEVICE;
316
for (i = 0; i < count; i++) {
317
ctxt->sge[i].length = 0; /* in case map fails */
318
if (!frmr) {
319
BUG_ON(0 == virt_to_page(vec[i].iov_base));
320
off = (unsigned long)vec[i].iov_base & ~PAGE_MASK;
321
ctxt->sge[i].addr =
322
ib_dma_map_page(xprt->sc_cm_id->device,
323
virt_to_page(vec[i].iov_base),
324
off,
325
vec[i].iov_len,
326
DMA_FROM_DEVICE);
327
if (ib_dma_mapping_error(xprt->sc_cm_id->device,
328
ctxt->sge[i].addr))
329
return -EINVAL;
330
ctxt->sge[i].lkey = xprt->sc_dma_lkey;
331
atomic_inc(&xprt->sc_dma_used);
332
} else {
333
ctxt->sge[i].addr = (unsigned long)vec[i].iov_base;
334
ctxt->sge[i].lkey = frmr->mr->lkey;
335
}
336
ctxt->sge[i].length = vec[i].iov_len;
337
*sgl_offset = *sgl_offset + vec[i].iov_len;
338
}
339
return 0;
340
}
341
342
static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count)
343
{
344
if ((rdma_node_get_transport(xprt->sc_cm_id->device->node_type) ==
345
RDMA_TRANSPORT_IWARP) &&
346
sge_count > 1)
347
return 1;
348
else
349
return min_t(int, sge_count, xprt->sc_max_sge);
350
}
351
352
/*
353
* Use RDMA_READ to read data from the advertised client buffer into the
354
* XDR stream starting at rq_arg.head[0].iov_base.
355
* Each chunk in the array
356
* contains the following fields:
357
* discrim - '1', This isn't used for data placement
358
* position - The xdr stream offset (the same for every chunk)
359
* handle - RMR for client memory region
360
* length - data transfer length
361
* offset - 64 bit tagged offset in remote memory region
362
*
363
* On our side, we need to read into a pagelist. The first page immediately
364
* follows the RPC header.
365
*
366
* This function returns:
367
* 0 - No error and no read-list found.
368
*
369
* 1 - Successful read-list processing. The data is not yet in
370
* the pagelist and therefore the RPC request must be deferred. The
371
* I/O completion will enqueue the transport again and
372
* svc_rdma_recvfrom will complete the request.
373
*
374
* <0 - Error processing/posting read-list.
375
*
376
* NOTE: The ctxt must not be touched after the last WR has been posted
377
* because the I/O completion processing may occur on another
378
* processor and free / modify the context. Ne touche pas!
379
*/
380
static int rdma_read_xdr(struct svcxprt_rdma *xprt,
381
struct rpcrdma_msg *rmsgp,
382
struct svc_rqst *rqstp,
383
struct svc_rdma_op_ctxt *hdr_ctxt)
384
{
385
struct ib_send_wr read_wr;
386
struct ib_send_wr inv_wr;
387
int err = 0;
388
int ch_no;
389
int ch_count;
390
int byte_count;
391
int sge_count;
392
u64 sgl_offset;
393
struct rpcrdma_read_chunk *ch;
394
struct svc_rdma_op_ctxt *ctxt = NULL;
395
struct svc_rdma_req_map *rpl_map;
396
struct svc_rdma_req_map *chl_map;
397
398
/* If no read list is present, return 0 */
399
ch = svc_rdma_get_read_chunk(rmsgp);
400
if (!ch)
401
return 0;
402
403
svc_rdma_rcl_chunk_counts(ch, &ch_count, &byte_count);
404
if (ch_count > RPCSVC_MAXPAGES)
405
return -EINVAL;
406
407
/* Allocate temporary reply and chunk maps */
408
rpl_map = svc_rdma_get_req_map();
409
chl_map = svc_rdma_get_req_map();
410
411
if (!xprt->sc_frmr_pg_list_len)
412
sge_count = map_read_chunks(xprt, rqstp, hdr_ctxt, rmsgp,
413
rpl_map, chl_map, ch_count,
414
byte_count);
415
else
416
sge_count = fast_reg_read_chunks(xprt, rqstp, hdr_ctxt, rmsgp,
417
rpl_map, chl_map, ch_count,
418
byte_count);
419
if (sge_count < 0) {
420
err = -EIO;
421
goto out;
422
}
423
424
sgl_offset = 0;
425
ch_no = 0;
426
427
for (ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
428
ch->rc_discrim != 0; ch++, ch_no++) {
429
next_sge:
430
ctxt = svc_rdma_get_context(xprt);
431
ctxt->direction = DMA_FROM_DEVICE;
432
ctxt->frmr = hdr_ctxt->frmr;
433
ctxt->read_hdr = NULL;
434
clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
435
clear_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
436
437
/* Prepare READ WR */
438
memset(&read_wr, 0, sizeof read_wr);
439
read_wr.wr_id = (unsigned long)ctxt;
440
read_wr.opcode = IB_WR_RDMA_READ;
441
ctxt->wr_op = read_wr.opcode;
442
read_wr.send_flags = IB_SEND_SIGNALED;
443
read_wr.wr.rdma.rkey = ch->rc_target.rs_handle;
444
read_wr.wr.rdma.remote_addr =
445
get_unaligned(&(ch->rc_target.rs_offset)) +
446
sgl_offset;
447
read_wr.sg_list = ctxt->sge;
448
read_wr.num_sge =
449
rdma_read_max_sge(xprt, chl_map->ch[ch_no].count);
450
err = rdma_set_ctxt_sge(xprt, ctxt, hdr_ctxt->frmr,
451
&rpl_map->sge[chl_map->ch[ch_no].start],
452
&sgl_offset,
453
read_wr.num_sge);
454
if (err) {
455
svc_rdma_unmap_dma(ctxt);
456
svc_rdma_put_context(ctxt, 0);
457
goto out;
458
}
459
if (((ch+1)->rc_discrim == 0) &&
460
(read_wr.num_sge == chl_map->ch[ch_no].count)) {
461
/*
462
* Mark the last RDMA_READ with a bit to
463
* indicate all RPC data has been fetched from
464
* the client and the RPC needs to be enqueued.
465
*/
466
set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
467
if (hdr_ctxt->frmr) {
468
set_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
469
/*
470
* Invalidate the local MR used to map the data
471
* sink.
472
*/
473
if (xprt->sc_dev_caps &
474
SVCRDMA_DEVCAP_READ_W_INV) {
475
read_wr.opcode =
476
IB_WR_RDMA_READ_WITH_INV;
477
ctxt->wr_op = read_wr.opcode;
478
read_wr.ex.invalidate_rkey =
479
ctxt->frmr->mr->lkey;
480
} else {
481
/* Prepare INVALIDATE WR */
482
memset(&inv_wr, 0, sizeof inv_wr);
483
inv_wr.opcode = IB_WR_LOCAL_INV;
484
inv_wr.send_flags = IB_SEND_SIGNALED;
485
inv_wr.ex.invalidate_rkey =
486
hdr_ctxt->frmr->mr->lkey;
487
read_wr.next = &inv_wr;
488
}
489
}
490
ctxt->read_hdr = hdr_ctxt;
491
}
492
/* Post the read */
493
err = svc_rdma_send(xprt, &read_wr);
494
if (err) {
495
printk(KERN_ERR "svcrdma: Error %d posting RDMA_READ\n",
496
err);
497
set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
498
svc_rdma_unmap_dma(ctxt);
499
svc_rdma_put_context(ctxt, 0);
500
goto out;
501
}
502
atomic_inc(&rdma_stat_read);
503
504
if (read_wr.num_sge < chl_map->ch[ch_no].count) {
505
chl_map->ch[ch_no].count -= read_wr.num_sge;
506
chl_map->ch[ch_no].start += read_wr.num_sge;
507
goto next_sge;
508
}
509
sgl_offset = 0;
510
err = 1;
511
}
512
513
out:
514
svc_rdma_put_req_map(rpl_map);
515
svc_rdma_put_req_map(chl_map);
516
517
/* Detach arg pages. svc_recv will replenish them */
518
for (ch_no = 0; &rqstp->rq_pages[ch_no] < rqstp->rq_respages; ch_no++)
519
rqstp->rq_pages[ch_no] = NULL;
520
521
/*
522
* Detach res pages. svc_release must see a resused count of
523
* zero or it will attempt to put them.
524
*/
525
while (rqstp->rq_resused)
526
rqstp->rq_respages[--rqstp->rq_resused] = NULL;
527
528
return err;
529
}
530
531
static int rdma_read_complete(struct svc_rqst *rqstp,
532
struct svc_rdma_op_ctxt *head)
533
{
534
int page_no;
535
int ret;
536
537
BUG_ON(!head);
538
539
/* Copy RPC pages */
540
for (page_no = 0; page_no < head->count; page_no++) {
541
put_page(rqstp->rq_pages[page_no]);
542
rqstp->rq_pages[page_no] = head->pages[page_no];
543
}
544
/* Point rq_arg.pages past header */
545
rqstp->rq_arg.pages = &rqstp->rq_pages[head->hdr_count];
546
rqstp->rq_arg.page_len = head->arg.page_len;
547
rqstp->rq_arg.page_base = head->arg.page_base;
548
549
/* rq_respages starts after the last arg page */
550
rqstp->rq_respages = &rqstp->rq_arg.pages[page_no];
551
rqstp->rq_resused = 0;
552
553
/* Rebuild rq_arg head and tail. */
554
rqstp->rq_arg.head[0] = head->arg.head[0];
555
rqstp->rq_arg.tail[0] = head->arg.tail[0];
556
rqstp->rq_arg.len = head->arg.len;
557
rqstp->rq_arg.buflen = head->arg.buflen;
558
559
/* Free the context */
560
svc_rdma_put_context(head, 0);
561
562
/* XXX: What should this be? */
563
rqstp->rq_prot = IPPROTO_MAX;
564
svc_xprt_copy_addrs(rqstp, rqstp->rq_xprt);
565
566
ret = rqstp->rq_arg.head[0].iov_len
567
+ rqstp->rq_arg.page_len
568
+ rqstp->rq_arg.tail[0].iov_len;
569
dprintk("svcrdma: deferred read ret=%d, rq_arg.len =%d, "
570
"rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len = %zd\n",
571
ret, rqstp->rq_arg.len, rqstp->rq_arg.head[0].iov_base,
572
rqstp->rq_arg.head[0].iov_len);
573
574
return ret;
575
}
576
577
/*
578
* Set up the rqstp thread context to point to the RQ buffer. If
579
* necessary, pull additional data from the client with an RDMA_READ
580
* request.
581
*/
582
int svc_rdma_recvfrom(struct svc_rqst *rqstp)
583
{
584
struct svc_xprt *xprt = rqstp->rq_xprt;
585
struct svcxprt_rdma *rdma_xprt =
586
container_of(xprt, struct svcxprt_rdma, sc_xprt);
587
struct svc_rdma_op_ctxt *ctxt = NULL;
588
struct rpcrdma_msg *rmsgp;
589
int ret = 0;
590
int len;
591
592
dprintk("svcrdma: rqstp=%p\n", rqstp);
593
594
spin_lock_bh(&rdma_xprt->sc_rq_dto_lock);
595
if (!list_empty(&rdma_xprt->sc_read_complete_q)) {
596
ctxt = list_entry(rdma_xprt->sc_read_complete_q.next,
597
struct svc_rdma_op_ctxt,
598
dto_q);
599
list_del_init(&ctxt->dto_q);
600
}
601
if (ctxt) {
602
spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
603
return rdma_read_complete(rqstp, ctxt);
604
}
605
606
if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {
607
ctxt = list_entry(rdma_xprt->sc_rq_dto_q.next,
608
struct svc_rdma_op_ctxt,
609
dto_q);
610
list_del_init(&ctxt->dto_q);
611
} else {
612
atomic_inc(&rdma_stat_rq_starve);
613
clear_bit(XPT_DATA, &xprt->xpt_flags);
614
ctxt = NULL;
615
}
616
spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
617
if (!ctxt) {
618
/* This is the EAGAIN path. The svc_recv routine will
619
* return -EAGAIN, the nfsd thread will go to call into
620
* svc_recv again and we shouldn't be on the active
621
* transport list
622
*/
623
if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
624
goto close_out;
625
626
BUG_ON(ret);
627
goto out;
628
}
629
dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p, status=%d\n",
630
ctxt, rdma_xprt, rqstp, ctxt->wc_status);
631
BUG_ON(ctxt->wc_status != IB_WC_SUCCESS);
632
atomic_inc(&rdma_stat_recv);
633
634
/* Build up the XDR from the receive buffers. */
635
rdma_build_arg_xdr(rqstp, ctxt, ctxt->byte_len);
636
637
/* Decode the RDMA header. */
638
len = svc_rdma_xdr_decode_req(&rmsgp, rqstp);
639
rqstp->rq_xprt_hlen = len;
640
641
/* If the request is invalid, reply with an error */
642
if (len < 0) {
643
if (len == -ENOSYS)
644
svc_rdma_send_error(rdma_xprt, rmsgp, ERR_VERS);
645
goto close_out;
646
}
647
648
/* Read read-list data. */
649
ret = rdma_read_xdr(rdma_xprt, rmsgp, rqstp, ctxt);
650
if (ret > 0) {
651
/* read-list posted, defer until data received from client. */
652
goto defer;
653
}
654
if (ret < 0) {
655
/* Post of read-list failed, free context. */
656
svc_rdma_put_context(ctxt, 1);
657
return 0;
658
}
659
660
ret = rqstp->rq_arg.head[0].iov_len
661
+ rqstp->rq_arg.page_len
662
+ rqstp->rq_arg.tail[0].iov_len;
663
svc_rdma_put_context(ctxt, 0);
664
out:
665
dprintk("svcrdma: ret = %d, rq_arg.len =%d, "
666
"rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len = %zd\n",
667
ret, rqstp->rq_arg.len,
668
rqstp->rq_arg.head[0].iov_base,
669
rqstp->rq_arg.head[0].iov_len);
670
rqstp->rq_prot = IPPROTO_MAX;
671
svc_xprt_copy_addrs(rqstp, xprt);
672
return ret;
673
674
close_out:
675
if (ctxt)
676
svc_rdma_put_context(ctxt, 1);
677
dprintk("svcrdma: transport %p is closing\n", xprt);
678
/*
679
* Set the close bit and enqueue it. svc_recv will see the
680
* close bit and call svc_xprt_delete
681
*/
682
set_bit(XPT_CLOSE, &xprt->xpt_flags);
683
defer:
684
return 0;
685
}
686
687