Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
awilliam
GitHub Repository: awilliam/linux-vfio
Path: blob/master/net/sunrpc/xprtrdma/rpc_rdma.c
15109 views
1
/*
2
* Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3
*
4
* This software is available to you under a choice of one of two
5
* licenses. You may choose to be licensed under the terms of the GNU
6
* General Public License (GPL) Version 2, available from the file
7
* COPYING in the main directory of this source tree, or the BSD-type
8
* license below:
9
*
10
* Redistribution and use in source and binary forms, with or without
11
* modification, are permitted provided that the following conditions
12
* are met:
13
*
14
* Redistributions of source code must retain the above copyright
15
* notice, this list of conditions and the following disclaimer.
16
*
17
* Redistributions in binary form must reproduce the above
18
* copyright notice, this list of conditions and the following
19
* disclaimer in the documentation and/or other materials provided
20
* with the distribution.
21
*
22
* Neither the name of the Network Appliance, Inc. nor the names of
23
* its contributors may be used to endorse or promote products
24
* derived from this software without specific prior written
25
* permission.
26
*
27
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38
*/
39
40
/*
41
* rpc_rdma.c
42
*
43
* This file contains the guts of the RPC RDMA protocol, and
44
* does marshaling/unmarshaling, etc. It is also where interfacing
45
* to the Linux RPC framework lives.
46
*/
47
48
#include "xprt_rdma.h"
49
50
#include <linux/highmem.h>
51
52
#ifdef RPC_DEBUG
53
# define RPCDBG_FACILITY RPCDBG_TRANS
54
#endif
55
56
enum rpcrdma_chunktype {
57
rpcrdma_noch = 0,
58
rpcrdma_readch,
59
rpcrdma_areadch,
60
rpcrdma_writech,
61
rpcrdma_replych
62
};
63
64
#ifdef RPC_DEBUG
65
static const char transfertypes[][12] = {
66
"pure inline", /* no chunks */
67
" read chunk", /* some argument via rdma read */
68
"*read chunk", /* entire request via rdma read */
69
"write chunk", /* some result via rdma write */
70
"reply chunk" /* entire reply via rdma write */
71
};
72
#endif
73
74
/*
75
* Chunk assembly from upper layer xdr_buf.
76
*
77
* Prepare the passed-in xdr_buf into representation as RPC/RDMA chunk
78
* elements. Segments are then coalesced when registered, if possible
79
* within the selected memreg mode.
80
*
81
* Note, this routine is never called if the connection's memory
82
* registration strategy is 0 (bounce buffers).
83
*/
84
85
static int
86
rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
87
enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg, int nsegs)
88
{
89
int len, n = 0, p;
90
int page_base;
91
struct page **ppages;
92
93
if (pos == 0 && xdrbuf->head[0].iov_len) {
94
seg[n].mr_page = NULL;
95
seg[n].mr_offset = xdrbuf->head[0].iov_base;
96
seg[n].mr_len = xdrbuf->head[0].iov_len;
97
++n;
98
}
99
100
len = xdrbuf->page_len;
101
ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT);
102
page_base = xdrbuf->page_base & ~PAGE_MASK;
103
p = 0;
104
while (len && n < nsegs) {
105
seg[n].mr_page = ppages[p];
106
seg[n].mr_offset = (void *)(unsigned long) page_base;
107
seg[n].mr_len = min_t(u32, PAGE_SIZE - page_base, len);
108
BUG_ON(seg[n].mr_len > PAGE_SIZE);
109
len -= seg[n].mr_len;
110
++n;
111
++p;
112
page_base = 0; /* page offset only applies to first page */
113
}
114
115
/* Message overflows the seg array */
116
if (len && n == nsegs)
117
return 0;
118
119
if (xdrbuf->tail[0].iov_len) {
120
/* the rpcrdma protocol allows us to omit any trailing
121
* xdr pad bytes, saving the server an RDMA operation. */
122
if (xdrbuf->tail[0].iov_len < 4 && xprt_rdma_pad_optimize)
123
return n;
124
if (n == nsegs)
125
/* Tail remains, but we're out of segments */
126
return 0;
127
seg[n].mr_page = NULL;
128
seg[n].mr_offset = xdrbuf->tail[0].iov_base;
129
seg[n].mr_len = xdrbuf->tail[0].iov_len;
130
++n;
131
}
132
133
return n;
134
}
135
136
/*
137
* Create read/write chunk lists, and reply chunks, for RDMA
138
*
139
* Assume check against THRESHOLD has been done, and chunks are required.
140
* Assume only encoding one list entry for read|write chunks. The NFSv3
141
* protocol is simple enough to allow this as it only has a single "bulk
142
* result" in each procedure - complicated NFSv4 COMPOUNDs are not. (The
143
* RDMA/Sessions NFSv4 proposal addresses this for future v4 revs.)
144
*
145
* When used for a single reply chunk (which is a special write
146
* chunk used for the entire reply, rather than just the data), it
147
* is used primarily for READDIR and READLINK which would otherwise
148
* be severely size-limited by a small rdma inline read max. The server
149
* response will come back as an RDMA Write, followed by a message
150
* of type RDMA_NOMSG carrying the xid and length. As a result, reply
151
* chunks do not provide data alignment, however they do not require
152
* "fixup" (moving the response to the upper layer buffer) either.
153
*
154
* Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
155
*
156
* Read chunklist (a linked list):
157
* N elements, position P (same P for all chunks of same arg!):
158
* 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
159
*
160
* Write chunklist (a list of (one) counted array):
161
* N elements:
162
* 1 - N - HLOO - HLOO - ... - HLOO - 0
163
*
164
* Reply chunk (a counted array):
165
* N elements:
166
* 1 - N - HLOO - HLOO - ... - HLOO
167
*/
168
169
static unsigned int
170
rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
171
struct rpcrdma_msg *headerp, enum rpcrdma_chunktype type)
172
{
173
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
174
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_task->tk_xprt);
175
int nsegs, nchunks = 0;
176
unsigned int pos;
177
struct rpcrdma_mr_seg *seg = req->rl_segments;
178
struct rpcrdma_read_chunk *cur_rchunk = NULL;
179
struct rpcrdma_write_array *warray = NULL;
180
struct rpcrdma_write_chunk *cur_wchunk = NULL;
181
__be32 *iptr = headerp->rm_body.rm_chunks;
182
183
if (type == rpcrdma_readch || type == rpcrdma_areadch) {
184
/* a read chunk - server will RDMA Read our memory */
185
cur_rchunk = (struct rpcrdma_read_chunk *) iptr;
186
} else {
187
/* a write or reply chunk - server will RDMA Write our memory */
188
*iptr++ = xdr_zero; /* encode a NULL read chunk list */
189
if (type == rpcrdma_replych)
190
*iptr++ = xdr_zero; /* a NULL write chunk list */
191
warray = (struct rpcrdma_write_array *) iptr;
192
cur_wchunk = (struct rpcrdma_write_chunk *) (warray + 1);
193
}
194
195
if (type == rpcrdma_replych || type == rpcrdma_areadch)
196
pos = 0;
197
else
198
pos = target->head[0].iov_len;
199
200
nsegs = rpcrdma_convert_iovs(target, pos, type, seg, RPCRDMA_MAX_SEGS);
201
if (nsegs == 0)
202
return 0;
203
204
do {
205
/* bind/register the memory, then build chunk from result. */
206
int n = rpcrdma_register_external(seg, nsegs,
207
cur_wchunk != NULL, r_xprt);
208
if (n <= 0)
209
goto out;
210
if (cur_rchunk) { /* read */
211
cur_rchunk->rc_discrim = xdr_one;
212
/* all read chunks have the same "position" */
213
cur_rchunk->rc_position = htonl(pos);
214
cur_rchunk->rc_target.rs_handle = htonl(seg->mr_rkey);
215
cur_rchunk->rc_target.rs_length = htonl(seg->mr_len);
216
xdr_encode_hyper(
217
(__be32 *)&cur_rchunk->rc_target.rs_offset,
218
seg->mr_base);
219
dprintk("RPC: %s: read chunk "
220
"elem %d@0x%llx:0x%x pos %u (%s)\n", __func__,
221
seg->mr_len, (unsigned long long)seg->mr_base,
222
seg->mr_rkey, pos, n < nsegs ? "more" : "last");
223
cur_rchunk++;
224
r_xprt->rx_stats.read_chunk_count++;
225
} else { /* write/reply */
226
cur_wchunk->wc_target.rs_handle = htonl(seg->mr_rkey);
227
cur_wchunk->wc_target.rs_length = htonl(seg->mr_len);
228
xdr_encode_hyper(
229
(__be32 *)&cur_wchunk->wc_target.rs_offset,
230
seg->mr_base);
231
dprintk("RPC: %s: %s chunk "
232
"elem %d@0x%llx:0x%x (%s)\n", __func__,
233
(type == rpcrdma_replych) ? "reply" : "write",
234
seg->mr_len, (unsigned long long)seg->mr_base,
235
seg->mr_rkey, n < nsegs ? "more" : "last");
236
cur_wchunk++;
237
if (type == rpcrdma_replych)
238
r_xprt->rx_stats.reply_chunk_count++;
239
else
240
r_xprt->rx_stats.write_chunk_count++;
241
r_xprt->rx_stats.total_rdma_request += seg->mr_len;
242
}
243
nchunks++;
244
seg += n;
245
nsegs -= n;
246
} while (nsegs);
247
248
/* success. all failures return above */
249
req->rl_nchunks = nchunks;
250
251
BUG_ON(nchunks == 0);
252
BUG_ON((r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_FRMR)
253
&& (nchunks > 3));
254
255
/*
256
* finish off header. If write, marshal discrim and nchunks.
257
*/
258
if (cur_rchunk) {
259
iptr = (__be32 *) cur_rchunk;
260
*iptr++ = xdr_zero; /* finish the read chunk list */
261
*iptr++ = xdr_zero; /* encode a NULL write chunk list */
262
*iptr++ = xdr_zero; /* encode a NULL reply chunk */
263
} else {
264
warray->wc_discrim = xdr_one;
265
warray->wc_nchunks = htonl(nchunks);
266
iptr = (__be32 *) cur_wchunk;
267
if (type == rpcrdma_writech) {
268
*iptr++ = xdr_zero; /* finish the write chunk list */
269
*iptr++ = xdr_zero; /* encode a NULL reply chunk */
270
}
271
}
272
273
/*
274
* Return header size.
275
*/
276
return (unsigned char *)iptr - (unsigned char *)headerp;
277
278
out:
279
for (pos = 0; nchunks--;)
280
pos += rpcrdma_deregister_external(
281
&req->rl_segments[pos], r_xprt, NULL);
282
return 0;
283
}
284
285
/*
286
* Copy write data inline.
287
* This function is used for "small" requests. Data which is passed
288
* to RPC via iovecs (or page list) is copied directly into the
289
* pre-registered memory buffer for this request. For small amounts
290
* of data, this is efficient. The cutoff value is tunable.
291
*/
292
static int
293
rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad)
294
{
295
int i, npages, curlen;
296
int copy_len;
297
unsigned char *srcp, *destp;
298
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
299
int page_base;
300
struct page **ppages;
301
302
destp = rqst->rq_svec[0].iov_base;
303
curlen = rqst->rq_svec[0].iov_len;
304
destp += curlen;
305
/*
306
* Do optional padding where it makes sense. Alignment of write
307
* payload can help the server, if our setting is accurate.
308
*/
309
pad -= (curlen + 36/*sizeof(struct rpcrdma_msg_padded)*/);
310
if (pad < 0 || rqst->rq_slen - curlen < RPCRDMA_INLINE_PAD_THRESH)
311
pad = 0; /* don't pad this request */
312
313
dprintk("RPC: %s: pad %d destp 0x%p len %d hdrlen %d\n",
314
__func__, pad, destp, rqst->rq_slen, curlen);
315
316
copy_len = rqst->rq_snd_buf.page_len;
317
318
if (rqst->rq_snd_buf.tail[0].iov_len) {
319
curlen = rqst->rq_snd_buf.tail[0].iov_len;
320
if (destp + copy_len != rqst->rq_snd_buf.tail[0].iov_base) {
321
memmove(destp + copy_len,
322
rqst->rq_snd_buf.tail[0].iov_base, curlen);
323
r_xprt->rx_stats.pullup_copy_count += curlen;
324
}
325
dprintk("RPC: %s: tail destp 0x%p len %d\n",
326
__func__, destp + copy_len, curlen);
327
rqst->rq_svec[0].iov_len += curlen;
328
}
329
r_xprt->rx_stats.pullup_copy_count += copy_len;
330
331
page_base = rqst->rq_snd_buf.page_base;
332
ppages = rqst->rq_snd_buf.pages + (page_base >> PAGE_SHIFT);
333
page_base &= ~PAGE_MASK;
334
npages = PAGE_ALIGN(page_base+copy_len) >> PAGE_SHIFT;
335
for (i = 0; copy_len && i < npages; i++) {
336
curlen = PAGE_SIZE - page_base;
337
if (curlen > copy_len)
338
curlen = copy_len;
339
dprintk("RPC: %s: page %d destp 0x%p len %d curlen %d\n",
340
__func__, i, destp, copy_len, curlen);
341
srcp = kmap_atomic(ppages[i], KM_SKB_SUNRPC_DATA);
342
memcpy(destp, srcp+page_base, curlen);
343
kunmap_atomic(srcp, KM_SKB_SUNRPC_DATA);
344
rqst->rq_svec[0].iov_len += curlen;
345
destp += curlen;
346
copy_len -= curlen;
347
page_base = 0;
348
}
349
/* header now contains entire send message */
350
return pad;
351
}
352
353
/*
354
* Marshal a request: the primary job of this routine is to choose
355
* the transfer modes. See comments below.
356
*
357
* Uses multiple RDMA IOVs for a request:
358
* [0] -- RPC RDMA header, which uses memory from the *start* of the
359
* preregistered buffer that already holds the RPC data in
360
* its middle.
361
* [1] -- the RPC header/data, marshaled by RPC and the NFS protocol.
362
* [2] -- optional padding.
363
* [3] -- if padded, header only in [1] and data here.
364
*/
365
366
int
367
rpcrdma_marshal_req(struct rpc_rqst *rqst)
368
{
369
struct rpc_xprt *xprt = rqst->rq_task->tk_xprt;
370
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
371
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
372
char *base;
373
size_t hdrlen, rpclen, padlen;
374
enum rpcrdma_chunktype rtype, wtype;
375
struct rpcrdma_msg *headerp;
376
377
/*
378
* rpclen gets amount of data in first buffer, which is the
379
* pre-registered buffer.
380
*/
381
base = rqst->rq_svec[0].iov_base;
382
rpclen = rqst->rq_svec[0].iov_len;
383
384
/* build RDMA header in private area at front */
385
headerp = (struct rpcrdma_msg *) req->rl_base;
386
/* don't htonl XID, it's already done in request */
387
headerp->rm_xid = rqst->rq_xid;
388
headerp->rm_vers = xdr_one;
389
headerp->rm_credit = htonl(r_xprt->rx_buf.rb_max_requests);
390
headerp->rm_type = htonl(RDMA_MSG);
391
392
/*
393
* Chunks needed for results?
394
*
395
* o If the expected result is under the inline threshold, all ops
396
* return as inline (but see later).
397
* o Large non-read ops return as a single reply chunk.
398
* o Large read ops return data as write chunk(s), header as inline.
399
*
400
* Note: the NFS code sending down multiple result segments implies
401
* the op is one of read, readdir[plus], readlink or NFSv4 getacl.
402
*/
403
404
/*
405
* This code can handle read chunks, write chunks OR reply
406
* chunks -- only one type. If the request is too big to fit
407
* inline, then we will choose read chunks. If the request is
408
* a READ, then use write chunks to separate the file data
409
* into pages; otherwise use reply chunks.
410
*/
411
if (rqst->rq_rcv_buf.buflen <= RPCRDMA_INLINE_READ_THRESHOLD(rqst))
412
wtype = rpcrdma_noch;
413
else if (rqst->rq_rcv_buf.page_len == 0)
414
wtype = rpcrdma_replych;
415
else if (rqst->rq_rcv_buf.flags & XDRBUF_READ)
416
wtype = rpcrdma_writech;
417
else
418
wtype = rpcrdma_replych;
419
420
/*
421
* Chunks needed for arguments?
422
*
423
* o If the total request is under the inline threshold, all ops
424
* are sent as inline.
425
* o Large non-write ops are sent with the entire message as a
426
* single read chunk (protocol 0-position special case).
427
* o Large write ops transmit data as read chunk(s), header as
428
* inline.
429
*
430
* Note: the NFS code sending down multiple argument segments
431
* implies the op is a write.
432
* TBD check NFSv4 setacl
433
*/
434
if (rqst->rq_snd_buf.len <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst))
435
rtype = rpcrdma_noch;
436
else if (rqst->rq_snd_buf.page_len == 0)
437
rtype = rpcrdma_areadch;
438
else
439
rtype = rpcrdma_readch;
440
441
/* The following simplification is not true forever */
442
if (rtype != rpcrdma_noch && wtype == rpcrdma_replych)
443
wtype = rpcrdma_noch;
444
BUG_ON(rtype != rpcrdma_noch && wtype != rpcrdma_noch);
445
446
if (r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS &&
447
(rtype != rpcrdma_noch || wtype != rpcrdma_noch)) {
448
/* forced to "pure inline"? */
449
dprintk("RPC: %s: too much data (%d/%d) for inline\n",
450
__func__, rqst->rq_rcv_buf.len, rqst->rq_snd_buf.len);
451
return -1;
452
}
453
454
hdrlen = 28; /*sizeof *headerp;*/
455
padlen = 0;
456
457
/*
458
* Pull up any extra send data into the preregistered buffer.
459
* When padding is in use and applies to the transfer, insert
460
* it and change the message type.
461
*/
462
if (rtype == rpcrdma_noch) {
463
464
padlen = rpcrdma_inline_pullup(rqst,
465
RPCRDMA_INLINE_PAD_VALUE(rqst));
466
467
if (padlen) {
468
headerp->rm_type = htonl(RDMA_MSGP);
469
headerp->rm_body.rm_padded.rm_align =
470
htonl(RPCRDMA_INLINE_PAD_VALUE(rqst));
471
headerp->rm_body.rm_padded.rm_thresh =
472
htonl(RPCRDMA_INLINE_PAD_THRESH);
473
headerp->rm_body.rm_padded.rm_pempty[0] = xdr_zero;
474
headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero;
475
headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero;
476
hdrlen += 2 * sizeof(u32); /* extra words in padhdr */
477
BUG_ON(wtype != rpcrdma_noch);
478
479
} else {
480
headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero;
481
headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero;
482
headerp->rm_body.rm_nochunks.rm_empty[2] = xdr_zero;
483
/* new length after pullup */
484
rpclen = rqst->rq_svec[0].iov_len;
485
/*
486
* Currently we try to not actually use read inline.
487
* Reply chunks have the desirable property that
488
* they land, packed, directly in the target buffers
489
* without headers, so they require no fixup. The
490
* additional RDMA Write op sends the same amount
491
* of data, streams on-the-wire and adds no overhead
492
* on receive. Therefore, we request a reply chunk
493
* for non-writes wherever feasible and efficient.
494
*/
495
if (wtype == rpcrdma_noch &&
496
r_xprt->rx_ia.ri_memreg_strategy > RPCRDMA_REGISTER)
497
wtype = rpcrdma_replych;
498
}
499
}
500
501
/*
502
* Marshal chunks. This routine will return the header length
503
* consumed by marshaling.
504
*/
505
if (rtype != rpcrdma_noch) {
506
hdrlen = rpcrdma_create_chunks(rqst,
507
&rqst->rq_snd_buf, headerp, rtype);
508
wtype = rtype; /* simplify dprintk */
509
510
} else if (wtype != rpcrdma_noch) {
511
hdrlen = rpcrdma_create_chunks(rqst,
512
&rqst->rq_rcv_buf, headerp, wtype);
513
}
514
515
if (hdrlen == 0)
516
return -1;
517
518
dprintk("RPC: %s: %s: hdrlen %zd rpclen %zd padlen %zd"
519
" headerp 0x%p base 0x%p lkey 0x%x\n",
520
__func__, transfertypes[wtype], hdrlen, rpclen, padlen,
521
headerp, base, req->rl_iov.lkey);
522
523
/*
524
* initialize send_iov's - normally only two: rdma chunk header and
525
* single preregistered RPC header buffer, but if padding is present,
526
* then use a preregistered (and zeroed) pad buffer between the RPC
527
* header and any write data. In all non-rdma cases, any following
528
* data has been copied into the RPC header buffer.
529
*/
530
req->rl_send_iov[0].addr = req->rl_iov.addr;
531
req->rl_send_iov[0].length = hdrlen;
532
req->rl_send_iov[0].lkey = req->rl_iov.lkey;
533
534
req->rl_send_iov[1].addr = req->rl_iov.addr + (base - req->rl_base);
535
req->rl_send_iov[1].length = rpclen;
536
req->rl_send_iov[1].lkey = req->rl_iov.lkey;
537
538
req->rl_niovs = 2;
539
540
if (padlen) {
541
struct rpcrdma_ep *ep = &r_xprt->rx_ep;
542
543
req->rl_send_iov[2].addr = ep->rep_pad.addr;
544
req->rl_send_iov[2].length = padlen;
545
req->rl_send_iov[2].lkey = ep->rep_pad.lkey;
546
547
req->rl_send_iov[3].addr = req->rl_send_iov[1].addr + rpclen;
548
req->rl_send_iov[3].length = rqst->rq_slen - rpclen;
549
req->rl_send_iov[3].lkey = req->rl_iov.lkey;
550
551
req->rl_niovs = 4;
552
}
553
554
return 0;
555
}
556
557
/*
558
* Chase down a received write or reply chunklist to get length
559
* RDMA'd by server. See map at rpcrdma_create_chunks()! :-)
560
*/
561
static int
562
rpcrdma_count_chunks(struct rpcrdma_rep *rep, unsigned int max, int wrchunk, __be32 **iptrp)
563
{
564
unsigned int i, total_len;
565
struct rpcrdma_write_chunk *cur_wchunk;
566
567
i = ntohl(**iptrp); /* get array count */
568
if (i > max)
569
return -1;
570
cur_wchunk = (struct rpcrdma_write_chunk *) (*iptrp + 1);
571
total_len = 0;
572
while (i--) {
573
struct rpcrdma_segment *seg = &cur_wchunk->wc_target;
574
ifdebug(FACILITY) {
575
u64 off;
576
xdr_decode_hyper((__be32 *)&seg->rs_offset, &off);
577
dprintk("RPC: %s: chunk %d@0x%llx:0x%x\n",
578
__func__,
579
ntohl(seg->rs_length),
580
(unsigned long long)off,
581
ntohl(seg->rs_handle));
582
}
583
total_len += ntohl(seg->rs_length);
584
++cur_wchunk;
585
}
586
/* check and adjust for properly terminated write chunk */
587
if (wrchunk) {
588
__be32 *w = (__be32 *) cur_wchunk;
589
if (*w++ != xdr_zero)
590
return -1;
591
cur_wchunk = (struct rpcrdma_write_chunk *) w;
592
}
593
if ((char *) cur_wchunk > rep->rr_base + rep->rr_len)
594
return -1;
595
596
*iptrp = (__be32 *) cur_wchunk;
597
return total_len;
598
}
599
600
/*
601
* Scatter inline received data back into provided iov's.
602
*/
603
static void
604
rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
605
{
606
int i, npages, curlen, olen;
607
char *destp;
608
struct page **ppages;
609
int page_base;
610
611
curlen = rqst->rq_rcv_buf.head[0].iov_len;
612
if (curlen > copy_len) { /* write chunk header fixup */
613
curlen = copy_len;
614
rqst->rq_rcv_buf.head[0].iov_len = curlen;
615
}
616
617
dprintk("RPC: %s: srcp 0x%p len %d hdrlen %d\n",
618
__func__, srcp, copy_len, curlen);
619
620
/* Shift pointer for first receive segment only */
621
rqst->rq_rcv_buf.head[0].iov_base = srcp;
622
srcp += curlen;
623
copy_len -= curlen;
624
625
olen = copy_len;
626
i = 0;
627
rpcx_to_rdmax(rqst->rq_xprt)->rx_stats.fixup_copy_count += olen;
628
page_base = rqst->rq_rcv_buf.page_base;
629
ppages = rqst->rq_rcv_buf.pages + (page_base >> PAGE_SHIFT);
630
page_base &= ~PAGE_MASK;
631
632
if (copy_len && rqst->rq_rcv_buf.page_len) {
633
npages = PAGE_ALIGN(page_base +
634
rqst->rq_rcv_buf.page_len) >> PAGE_SHIFT;
635
for (; i < npages; i++) {
636
curlen = PAGE_SIZE - page_base;
637
if (curlen > copy_len)
638
curlen = copy_len;
639
dprintk("RPC: %s: page %d"
640
" srcp 0x%p len %d curlen %d\n",
641
__func__, i, srcp, copy_len, curlen);
642
destp = kmap_atomic(ppages[i], KM_SKB_SUNRPC_DATA);
643
memcpy(destp + page_base, srcp, curlen);
644
flush_dcache_page(ppages[i]);
645
kunmap_atomic(destp, KM_SKB_SUNRPC_DATA);
646
srcp += curlen;
647
copy_len -= curlen;
648
if (copy_len == 0)
649
break;
650
page_base = 0;
651
}
652
rqst->rq_rcv_buf.page_len = olen - copy_len;
653
} else
654
rqst->rq_rcv_buf.page_len = 0;
655
656
if (copy_len && rqst->rq_rcv_buf.tail[0].iov_len) {
657
curlen = copy_len;
658
if (curlen > rqst->rq_rcv_buf.tail[0].iov_len)
659
curlen = rqst->rq_rcv_buf.tail[0].iov_len;
660
if (rqst->rq_rcv_buf.tail[0].iov_base != srcp)
661
memmove(rqst->rq_rcv_buf.tail[0].iov_base, srcp, curlen);
662
dprintk("RPC: %s: tail srcp 0x%p len %d curlen %d\n",
663
__func__, srcp, copy_len, curlen);
664
rqst->rq_rcv_buf.tail[0].iov_len = curlen;
665
copy_len -= curlen; ++i;
666
} else
667
rqst->rq_rcv_buf.tail[0].iov_len = 0;
668
669
if (pad) {
670
/* implicit padding on terminal chunk */
671
unsigned char *p = rqst->rq_rcv_buf.tail[0].iov_base;
672
while (pad--)
673
p[rqst->rq_rcv_buf.tail[0].iov_len++] = 0;
674
}
675
676
if (copy_len)
677
dprintk("RPC: %s: %d bytes in"
678
" %d extra segments (%d lost)\n",
679
__func__, olen, i, copy_len);
680
681
/* TBD avoid a warning from call_decode() */
682
rqst->rq_private_buf = rqst->rq_rcv_buf;
683
}
684
685
/*
686
* This function is called when an async event is posted to
687
* the connection which changes the connection state. All it
688
* does at this point is mark the connection up/down, the rpc
689
* timers do the rest.
690
*/
691
void
692
rpcrdma_conn_func(struct rpcrdma_ep *ep)
693
{
694
struct rpc_xprt *xprt = ep->rep_xprt;
695
696
spin_lock_bh(&xprt->transport_lock);
697
if (++xprt->connect_cookie == 0) /* maintain a reserved value */
698
++xprt->connect_cookie;
699
if (ep->rep_connected > 0) {
700
if (!xprt_test_and_set_connected(xprt))
701
xprt_wake_pending_tasks(xprt, 0);
702
} else {
703
if (xprt_test_and_clear_connected(xprt))
704
xprt_wake_pending_tasks(xprt, -ENOTCONN);
705
}
706
spin_unlock_bh(&xprt->transport_lock);
707
}
708
709
/*
710
* This function is called when memory window unbind which we are waiting
711
* for completes. Just use rr_func (zeroed by upcall) to signal completion.
712
*/
713
static void
714
rpcrdma_unbind_func(struct rpcrdma_rep *rep)
715
{
716
wake_up(&rep->rr_unbind);
717
}
718
719
/*
720
* Called as a tasklet to do req/reply match and complete a request
721
* Errors must result in the RPC task either being awakened, or
722
* allowed to timeout, to discover the errors at that time.
723
*/
724
void
725
rpcrdma_reply_handler(struct rpcrdma_rep *rep)
726
{
727
struct rpcrdma_msg *headerp;
728
struct rpcrdma_req *req;
729
struct rpc_rqst *rqst;
730
struct rpc_xprt *xprt = rep->rr_xprt;
731
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
732
__be32 *iptr;
733
int i, rdmalen, status;
734
735
/* Check status. If bad, signal disconnect and return rep to pool */
736
if (rep->rr_len == ~0U) {
737
rpcrdma_recv_buffer_put(rep);
738
if (r_xprt->rx_ep.rep_connected == 1) {
739
r_xprt->rx_ep.rep_connected = -EIO;
740
rpcrdma_conn_func(&r_xprt->rx_ep);
741
}
742
return;
743
}
744
if (rep->rr_len < 28) {
745
dprintk("RPC: %s: short/invalid reply\n", __func__);
746
goto repost;
747
}
748
headerp = (struct rpcrdma_msg *) rep->rr_base;
749
if (headerp->rm_vers != xdr_one) {
750
dprintk("RPC: %s: invalid version %d\n",
751
__func__, ntohl(headerp->rm_vers));
752
goto repost;
753
}
754
755
/* Get XID and try for a match. */
756
spin_lock(&xprt->transport_lock);
757
rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
758
if (rqst == NULL) {
759
spin_unlock(&xprt->transport_lock);
760
dprintk("RPC: %s: reply 0x%p failed "
761
"to match any request xid 0x%08x len %d\n",
762
__func__, rep, headerp->rm_xid, rep->rr_len);
763
repost:
764
r_xprt->rx_stats.bad_reply_count++;
765
rep->rr_func = rpcrdma_reply_handler;
766
if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
767
rpcrdma_recv_buffer_put(rep);
768
769
return;
770
}
771
772
/* get request object */
773
req = rpcr_to_rdmar(rqst);
774
775
dprintk("RPC: %s: reply 0x%p completes request 0x%p\n"
776
" RPC request 0x%p xid 0x%08x\n",
777
__func__, rep, req, rqst, headerp->rm_xid);
778
779
BUG_ON(!req || req->rl_reply);
780
781
/* from here on, the reply is no longer an orphan */
782
req->rl_reply = rep;
783
784
/* check for expected message types */
785
/* The order of some of these tests is important. */
786
switch (headerp->rm_type) {
787
case htonl(RDMA_MSG):
788
/* never expect read chunks */
789
/* never expect reply chunks (two ways to check) */
790
/* never expect write chunks without having offered RDMA */
791
if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
792
(headerp->rm_body.rm_chunks[1] == xdr_zero &&
793
headerp->rm_body.rm_chunks[2] != xdr_zero) ||
794
(headerp->rm_body.rm_chunks[1] != xdr_zero &&
795
req->rl_nchunks == 0))
796
goto badheader;
797
if (headerp->rm_body.rm_chunks[1] != xdr_zero) {
798
/* count any expected write chunks in read reply */
799
/* start at write chunk array count */
800
iptr = &headerp->rm_body.rm_chunks[2];
801
rdmalen = rpcrdma_count_chunks(rep,
802
req->rl_nchunks, 1, &iptr);
803
/* check for validity, and no reply chunk after */
804
if (rdmalen < 0 || *iptr++ != xdr_zero)
805
goto badheader;
806
rep->rr_len -=
807
((unsigned char *)iptr - (unsigned char *)headerp);
808
status = rep->rr_len + rdmalen;
809
r_xprt->rx_stats.total_rdma_reply += rdmalen;
810
/* special case - last chunk may omit padding */
811
if (rdmalen &= 3) {
812
rdmalen = 4 - rdmalen;
813
status += rdmalen;
814
}
815
} else {
816
/* else ordinary inline */
817
rdmalen = 0;
818
iptr = (__be32 *)((unsigned char *)headerp + 28);
819
rep->rr_len -= 28; /*sizeof *headerp;*/
820
status = rep->rr_len;
821
}
822
/* Fix up the rpc results for upper layer */
823
rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len, rdmalen);
824
break;
825
826
case htonl(RDMA_NOMSG):
827
/* never expect read or write chunks, always reply chunks */
828
if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
829
headerp->rm_body.rm_chunks[1] != xdr_zero ||
830
headerp->rm_body.rm_chunks[2] != xdr_one ||
831
req->rl_nchunks == 0)
832
goto badheader;
833
iptr = (__be32 *)((unsigned char *)headerp + 28);
834
rdmalen = rpcrdma_count_chunks(rep, req->rl_nchunks, 0, &iptr);
835
if (rdmalen < 0)
836
goto badheader;
837
r_xprt->rx_stats.total_rdma_reply += rdmalen;
838
/* Reply chunk buffer already is the reply vector - no fixup. */
839
status = rdmalen;
840
break;
841
842
badheader:
843
default:
844
dprintk("%s: invalid rpcrdma reply header (type %d):"
845
" chunks[012] == %d %d %d"
846
" expected chunks <= %d\n",
847
__func__, ntohl(headerp->rm_type),
848
headerp->rm_body.rm_chunks[0],
849
headerp->rm_body.rm_chunks[1],
850
headerp->rm_body.rm_chunks[2],
851
req->rl_nchunks);
852
status = -EIO;
853
r_xprt->rx_stats.bad_reply_count++;
854
break;
855
}
856
857
/* If using mw bind, start the deregister process now. */
858
/* (Note: if mr_free(), cannot perform it here, in tasklet context) */
859
if (req->rl_nchunks) switch (r_xprt->rx_ia.ri_memreg_strategy) {
860
case RPCRDMA_MEMWINDOWS:
861
for (i = 0; req->rl_nchunks-- > 1;)
862
i += rpcrdma_deregister_external(
863
&req->rl_segments[i], r_xprt, NULL);
864
/* Optionally wait (not here) for unbinds to complete */
865
rep->rr_func = rpcrdma_unbind_func;
866
(void) rpcrdma_deregister_external(&req->rl_segments[i],
867
r_xprt, rep);
868
break;
869
case RPCRDMA_MEMWINDOWS_ASYNC:
870
for (i = 0; req->rl_nchunks--;)
871
i += rpcrdma_deregister_external(&req->rl_segments[i],
872
r_xprt, NULL);
873
break;
874
default:
875
break;
876
}
877
878
dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
879
__func__, xprt, rqst, status);
880
xprt_complete_rqst(rqst->rq_task, status);
881
spin_unlock(&xprt->transport_lock);
882
}
883
884