Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
awilliam
GitHub Repository: awilliam/linux-vfio
Path: blob/master/net/sunrpc/xprtrdma/transport.c
15109 views
1
/*
2
* Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3
*
4
* This software is available to you under a choice of one of two
5
* licenses. You may choose to be licensed under the terms of the GNU
6
* General Public License (GPL) Version 2, available from the file
7
* COPYING in the main directory of this source tree, or the BSD-type
8
* license below:
9
*
10
* Redistribution and use in source and binary forms, with or without
11
* modification, are permitted provided that the following conditions
12
* are met:
13
*
14
* Redistributions of source code must retain the above copyright
15
* notice, this list of conditions and the following disclaimer.
16
*
17
* Redistributions in binary form must reproduce the above
18
* copyright notice, this list of conditions and the following
19
* disclaimer in the documentation and/or other materials provided
20
* with the distribution.
21
*
22
* Neither the name of the Network Appliance, Inc. nor the names of
23
* its contributors may be used to endorse or promote products
24
* derived from this software without specific prior written
25
* permission.
26
*
27
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38
*/
39
40
/*
41
* transport.c
42
*
43
* This file contains the top-level implementation of an RPC RDMA
44
* transport.
45
*
46
* Naming convention: functions beginning with xprt_ are part of the
47
* transport switch. All others are RPC RDMA internal.
48
*/
49
50
#include <linux/module.h>
51
#include <linux/init.h>
52
#include <linux/slab.h>
53
#include <linux/seq_file.h>
54
55
#include "xprt_rdma.h"
56
57
#ifdef RPC_DEBUG
58
# define RPCDBG_FACILITY RPCDBG_TRANS
59
#endif
60
61
MODULE_LICENSE("Dual BSD/GPL");
62
63
MODULE_DESCRIPTION("RPC/RDMA Transport for Linux kernel NFS");
64
MODULE_AUTHOR("Network Appliance, Inc.");
65
66
/*
67
* tunables
68
*/
69
70
static unsigned int xprt_rdma_slot_table_entries = RPCRDMA_DEF_SLOT_TABLE;
71
static unsigned int xprt_rdma_max_inline_read = RPCRDMA_DEF_INLINE;
72
static unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE;
73
static unsigned int xprt_rdma_inline_write_padding;
74
static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_FRMR;
75
int xprt_rdma_pad_optimize = 0;
76
77
#ifdef RPC_DEBUG
78
79
static unsigned int min_slot_table_size = RPCRDMA_MIN_SLOT_TABLE;
80
static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE;
81
static unsigned int zero;
82
static unsigned int max_padding = PAGE_SIZE;
83
static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS;
84
static unsigned int max_memreg = RPCRDMA_LAST - 1;
85
86
static struct ctl_table_header *sunrpc_table_header;
87
88
static ctl_table xr_tunables_table[] = {
89
{
90
.procname = "rdma_slot_table_entries",
91
.data = &xprt_rdma_slot_table_entries,
92
.maxlen = sizeof(unsigned int),
93
.mode = 0644,
94
.proc_handler = proc_dointvec_minmax,
95
.extra1 = &min_slot_table_size,
96
.extra2 = &max_slot_table_size
97
},
98
{
99
.procname = "rdma_max_inline_read",
100
.data = &xprt_rdma_max_inline_read,
101
.maxlen = sizeof(unsigned int),
102
.mode = 0644,
103
.proc_handler = proc_dointvec,
104
},
105
{
106
.procname = "rdma_max_inline_write",
107
.data = &xprt_rdma_max_inline_write,
108
.maxlen = sizeof(unsigned int),
109
.mode = 0644,
110
.proc_handler = proc_dointvec,
111
},
112
{
113
.procname = "rdma_inline_write_padding",
114
.data = &xprt_rdma_inline_write_padding,
115
.maxlen = sizeof(unsigned int),
116
.mode = 0644,
117
.proc_handler = proc_dointvec_minmax,
118
.extra1 = &zero,
119
.extra2 = &max_padding,
120
},
121
{
122
.procname = "rdma_memreg_strategy",
123
.data = &xprt_rdma_memreg_strategy,
124
.maxlen = sizeof(unsigned int),
125
.mode = 0644,
126
.proc_handler = proc_dointvec_minmax,
127
.extra1 = &min_memreg,
128
.extra2 = &max_memreg,
129
},
130
{
131
.procname = "rdma_pad_optimize",
132
.data = &xprt_rdma_pad_optimize,
133
.maxlen = sizeof(unsigned int),
134
.mode = 0644,
135
.proc_handler = proc_dointvec,
136
},
137
{ },
138
};
139
140
static ctl_table sunrpc_table[] = {
141
{
142
.procname = "sunrpc",
143
.mode = 0555,
144
.child = xr_tunables_table
145
},
146
{ },
147
};
148
149
#endif
150
151
static struct rpc_xprt_ops xprt_rdma_procs; /* forward reference */
152
153
static void
154
xprt_rdma_format_addresses(struct rpc_xprt *xprt)
155
{
156
struct sockaddr *sap = (struct sockaddr *)
157
&rpcx_to_rdmad(xprt).addr;
158
struct sockaddr_in *sin = (struct sockaddr_in *)sap;
159
char buf[64];
160
161
(void)rpc_ntop(sap, buf, sizeof(buf));
162
xprt->address_strings[RPC_DISPLAY_ADDR] = kstrdup(buf, GFP_KERNEL);
163
164
snprintf(buf, sizeof(buf), "%u", rpc_get_port(sap));
165
xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL);
166
167
xprt->address_strings[RPC_DISPLAY_PROTO] = "rdma";
168
169
snprintf(buf, sizeof(buf), "%08x", ntohl(sin->sin_addr.s_addr));
170
xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
171
172
snprintf(buf, sizeof(buf), "%4hx", rpc_get_port(sap));
173
xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL);
174
175
/* netid */
176
xprt->address_strings[RPC_DISPLAY_NETID] = "rdma";
177
}
178
179
static void
180
xprt_rdma_free_addresses(struct rpc_xprt *xprt)
181
{
182
unsigned int i;
183
184
for (i = 0; i < RPC_DISPLAY_MAX; i++)
185
switch (i) {
186
case RPC_DISPLAY_PROTO:
187
case RPC_DISPLAY_NETID:
188
continue;
189
default:
190
kfree(xprt->address_strings[i]);
191
}
192
}
193
194
static void
195
xprt_rdma_connect_worker(struct work_struct *work)
196
{
197
struct rpcrdma_xprt *r_xprt =
198
container_of(work, struct rpcrdma_xprt, rdma_connect.work);
199
struct rpc_xprt *xprt = &r_xprt->xprt;
200
int rc = 0;
201
202
if (!xprt->shutdown) {
203
xprt_clear_connected(xprt);
204
205
dprintk("RPC: %s: %sconnect\n", __func__,
206
r_xprt->rx_ep.rep_connected != 0 ? "re" : "");
207
rc = rpcrdma_ep_connect(&r_xprt->rx_ep, &r_xprt->rx_ia);
208
if (rc)
209
goto out;
210
}
211
goto out_clear;
212
213
out:
214
xprt_wake_pending_tasks(xprt, rc);
215
216
out_clear:
217
dprintk("RPC: %s: exit\n", __func__);
218
xprt_clear_connecting(xprt);
219
}
220
221
/*
222
* xprt_rdma_destroy
223
*
224
* Destroy the xprt.
225
* Free all memory associated with the object, including its own.
226
* NOTE: none of the *destroy methods free memory for their top-level
227
* objects, even though they may have allocated it (they do free
228
* private memory). It's up to the caller to handle it. In this
229
* case (RDMA transport), all structure memory is inlined with the
230
* struct rpcrdma_xprt.
231
*/
232
static void
233
xprt_rdma_destroy(struct rpc_xprt *xprt)
234
{
235
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
236
int rc;
237
238
dprintk("RPC: %s: called\n", __func__);
239
240
cancel_delayed_work_sync(&r_xprt->rdma_connect);
241
242
xprt_clear_connected(xprt);
243
244
rpcrdma_buffer_destroy(&r_xprt->rx_buf);
245
rc = rpcrdma_ep_destroy(&r_xprt->rx_ep, &r_xprt->rx_ia);
246
if (rc)
247
dprintk("RPC: %s: rpcrdma_ep_destroy returned %i\n",
248
__func__, rc);
249
rpcrdma_ia_close(&r_xprt->rx_ia);
250
251
xprt_rdma_free_addresses(xprt);
252
253
xprt_free(xprt);
254
255
dprintk("RPC: %s: returning\n", __func__);
256
257
module_put(THIS_MODULE);
258
}
259
260
static const struct rpc_timeout xprt_rdma_default_timeout = {
261
.to_initval = 60 * HZ,
262
.to_maxval = 60 * HZ,
263
};
264
265
/**
266
* xprt_setup_rdma - Set up transport to use RDMA
267
*
268
* @args: rpc transport arguments
269
*/
270
static struct rpc_xprt *
271
xprt_setup_rdma(struct xprt_create *args)
272
{
273
struct rpcrdma_create_data_internal cdata;
274
struct rpc_xprt *xprt;
275
struct rpcrdma_xprt *new_xprt;
276
struct rpcrdma_ep *new_ep;
277
struct sockaddr_in *sin;
278
int rc;
279
280
if (args->addrlen > sizeof(xprt->addr)) {
281
dprintk("RPC: %s: address too large\n", __func__);
282
return ERR_PTR(-EBADF);
283
}
284
285
xprt = xprt_alloc(args->net, sizeof(struct rpcrdma_xprt),
286
xprt_rdma_slot_table_entries);
287
if (xprt == NULL) {
288
dprintk("RPC: %s: couldn't allocate rpcrdma_xprt\n",
289
__func__);
290
return ERR_PTR(-ENOMEM);
291
}
292
293
/* 60 second timeout, no retries */
294
xprt->timeout = &xprt_rdma_default_timeout;
295
xprt->bind_timeout = (60U * HZ);
296
xprt->reestablish_timeout = (5U * HZ);
297
xprt->idle_timeout = (5U * 60 * HZ);
298
299
xprt->resvport = 0; /* privileged port not needed */
300
xprt->tsh_size = 0; /* RPC-RDMA handles framing */
301
xprt->max_payload = RPCRDMA_MAX_DATA_SEGS * PAGE_SIZE;
302
xprt->ops = &xprt_rdma_procs;
303
304
/*
305
* Set up RDMA-specific connect data.
306
*/
307
308
/* Put server RDMA address in local cdata */
309
memcpy(&cdata.addr, args->dstaddr, args->addrlen);
310
311
/* Ensure xprt->addr holds valid server TCP (not RDMA)
312
* address, for any side protocols which peek at it */
313
xprt->prot = IPPROTO_TCP;
314
xprt->addrlen = args->addrlen;
315
memcpy(&xprt->addr, &cdata.addr, xprt->addrlen);
316
317
sin = (struct sockaddr_in *)&cdata.addr;
318
if (ntohs(sin->sin_port) != 0)
319
xprt_set_bound(xprt);
320
321
dprintk("RPC: %s: %pI4:%u\n",
322
__func__, &sin->sin_addr.s_addr, ntohs(sin->sin_port));
323
324
/* Set max requests */
325
cdata.max_requests = xprt->max_reqs;
326
327
/* Set some length limits */
328
cdata.rsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA write max */
329
cdata.wsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA read max */
330
331
cdata.inline_wsize = xprt_rdma_max_inline_write;
332
if (cdata.inline_wsize > cdata.wsize)
333
cdata.inline_wsize = cdata.wsize;
334
335
cdata.inline_rsize = xprt_rdma_max_inline_read;
336
if (cdata.inline_rsize > cdata.rsize)
337
cdata.inline_rsize = cdata.rsize;
338
339
cdata.padding = xprt_rdma_inline_write_padding;
340
341
/*
342
* Create new transport instance, which includes initialized
343
* o ia
344
* o endpoint
345
* o buffers
346
*/
347
348
new_xprt = rpcx_to_rdmax(xprt);
349
350
rc = rpcrdma_ia_open(new_xprt, (struct sockaddr *) &cdata.addr,
351
xprt_rdma_memreg_strategy);
352
if (rc)
353
goto out1;
354
355
/*
356
* initialize and create ep
357
*/
358
new_xprt->rx_data = cdata;
359
new_ep = &new_xprt->rx_ep;
360
new_ep->rep_remote_addr = cdata.addr;
361
362
rc = rpcrdma_ep_create(&new_xprt->rx_ep,
363
&new_xprt->rx_ia, &new_xprt->rx_data);
364
if (rc)
365
goto out2;
366
367
/*
368
* Allocate pre-registered send and receive buffers for headers and
369
* any inline data. Also specify any padding which will be provided
370
* from a preregistered zero buffer.
371
*/
372
rc = rpcrdma_buffer_create(&new_xprt->rx_buf, new_ep, &new_xprt->rx_ia,
373
&new_xprt->rx_data);
374
if (rc)
375
goto out3;
376
377
/*
378
* Register a callback for connection events. This is necessary because
379
* connection loss notification is async. We also catch connection loss
380
* when reaping receives.
381
*/
382
INIT_DELAYED_WORK(&new_xprt->rdma_connect, xprt_rdma_connect_worker);
383
new_ep->rep_func = rpcrdma_conn_func;
384
new_ep->rep_xprt = xprt;
385
386
xprt_rdma_format_addresses(xprt);
387
388
if (!try_module_get(THIS_MODULE))
389
goto out4;
390
391
return xprt;
392
393
out4:
394
xprt_rdma_free_addresses(xprt);
395
rc = -EINVAL;
396
out3:
397
(void) rpcrdma_ep_destroy(new_ep, &new_xprt->rx_ia);
398
out2:
399
rpcrdma_ia_close(&new_xprt->rx_ia);
400
out1:
401
xprt_free(xprt);
402
return ERR_PTR(rc);
403
}
404
405
/*
406
* Close a connection, during shutdown or timeout/reconnect
407
*/
408
static void
409
xprt_rdma_close(struct rpc_xprt *xprt)
410
{
411
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
412
413
dprintk("RPC: %s: closing\n", __func__);
414
if (r_xprt->rx_ep.rep_connected > 0)
415
xprt->reestablish_timeout = 0;
416
xprt_disconnect_done(xprt);
417
(void) rpcrdma_ep_disconnect(&r_xprt->rx_ep, &r_xprt->rx_ia);
418
}
419
420
static void
421
xprt_rdma_set_port(struct rpc_xprt *xprt, u16 port)
422
{
423
struct sockaddr_in *sap;
424
425
sap = (struct sockaddr_in *)&xprt->addr;
426
sap->sin_port = htons(port);
427
sap = (struct sockaddr_in *)&rpcx_to_rdmad(xprt).addr;
428
sap->sin_port = htons(port);
429
dprintk("RPC: %s: %u\n", __func__, port);
430
}
431
432
static void
433
xprt_rdma_connect(struct rpc_task *task)
434
{
435
struct rpc_xprt *xprt = (struct rpc_xprt *)task->tk_xprt;
436
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
437
438
if (r_xprt->rx_ep.rep_connected != 0) {
439
/* Reconnect */
440
schedule_delayed_work(&r_xprt->rdma_connect,
441
xprt->reestablish_timeout);
442
xprt->reestablish_timeout <<= 1;
443
if (xprt->reestablish_timeout > (30 * HZ))
444
xprt->reestablish_timeout = (30 * HZ);
445
else if (xprt->reestablish_timeout < (5 * HZ))
446
xprt->reestablish_timeout = (5 * HZ);
447
} else {
448
schedule_delayed_work(&r_xprt->rdma_connect, 0);
449
if (!RPC_IS_ASYNC(task))
450
flush_delayed_work(&r_xprt->rdma_connect);
451
}
452
}
453
454
static int
455
xprt_rdma_reserve_xprt(struct rpc_task *task)
456
{
457
struct rpc_xprt *xprt = task->tk_xprt;
458
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
459
int credits = atomic_read(&r_xprt->rx_buf.rb_credits);
460
461
/* == RPC_CWNDSCALE @ init, but *after* setup */
462
if (r_xprt->rx_buf.rb_cwndscale == 0UL) {
463
r_xprt->rx_buf.rb_cwndscale = xprt->cwnd;
464
dprintk("RPC: %s: cwndscale %lu\n", __func__,
465
r_xprt->rx_buf.rb_cwndscale);
466
BUG_ON(r_xprt->rx_buf.rb_cwndscale <= 0);
467
}
468
xprt->cwnd = credits * r_xprt->rx_buf.rb_cwndscale;
469
return xprt_reserve_xprt_cong(task);
470
}
471
472
/*
473
* The RDMA allocate/free functions need the task structure as a place
474
* to hide the struct rpcrdma_req, which is necessary for the actual send/recv
475
* sequence. For this reason, the recv buffers are attached to send
476
* buffers for portions of the RPC. Note that the RPC layer allocates
477
* both send and receive buffers in the same call. We may register
478
* the receive buffer portion when using reply chunks.
479
*/
480
static void *
481
xprt_rdma_allocate(struct rpc_task *task, size_t size)
482
{
483
struct rpc_xprt *xprt = task->tk_xprt;
484
struct rpcrdma_req *req, *nreq;
485
486
req = rpcrdma_buffer_get(&rpcx_to_rdmax(xprt)->rx_buf);
487
BUG_ON(NULL == req);
488
489
if (size > req->rl_size) {
490
dprintk("RPC: %s: size %zd too large for buffer[%zd]: "
491
"prog %d vers %d proc %d\n",
492
__func__, size, req->rl_size,
493
task->tk_client->cl_prog, task->tk_client->cl_vers,
494
task->tk_msg.rpc_proc->p_proc);
495
/*
496
* Outgoing length shortage. Our inline write max must have
497
* been configured to perform direct i/o.
498
*
499
* This is therefore a large metadata operation, and the
500
* allocate call was made on the maximum possible message,
501
* e.g. containing long filename(s) or symlink data. In
502
* fact, while these metadata operations *might* carry
503
* large outgoing payloads, they rarely *do*. However, we
504
* have to commit to the request here, so reallocate and
505
* register it now. The data path will never require this
506
* reallocation.
507
*
508
* If the allocation or registration fails, the RPC framework
509
* will (doggedly) retry.
510
*/
511
if (rpcx_to_rdmax(xprt)->rx_ia.ri_memreg_strategy ==
512
RPCRDMA_BOUNCEBUFFERS) {
513
/* forced to "pure inline" */
514
dprintk("RPC: %s: too much data (%zd) for inline "
515
"(r/w max %d/%d)\n", __func__, size,
516
rpcx_to_rdmad(xprt).inline_rsize,
517
rpcx_to_rdmad(xprt).inline_wsize);
518
size = req->rl_size;
519
rpc_exit(task, -EIO); /* fail the operation */
520
rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++;
521
goto out;
522
}
523
if (task->tk_flags & RPC_TASK_SWAPPER)
524
nreq = kmalloc(sizeof *req + size, GFP_ATOMIC);
525
else
526
nreq = kmalloc(sizeof *req + size, GFP_NOFS);
527
if (nreq == NULL)
528
goto outfail;
529
530
if (rpcrdma_register_internal(&rpcx_to_rdmax(xprt)->rx_ia,
531
nreq->rl_base, size + sizeof(struct rpcrdma_req)
532
- offsetof(struct rpcrdma_req, rl_base),
533
&nreq->rl_handle, &nreq->rl_iov)) {
534
kfree(nreq);
535
goto outfail;
536
}
537
rpcx_to_rdmax(xprt)->rx_stats.hardway_register_count += size;
538
nreq->rl_size = size;
539
nreq->rl_niovs = 0;
540
nreq->rl_nchunks = 0;
541
nreq->rl_buffer = (struct rpcrdma_buffer *)req;
542
nreq->rl_reply = req->rl_reply;
543
memcpy(nreq->rl_segments,
544
req->rl_segments, sizeof nreq->rl_segments);
545
/* flag the swap with an unused field */
546
nreq->rl_iov.length = 0;
547
req->rl_reply = NULL;
548
req = nreq;
549
}
550
dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req);
551
out:
552
req->rl_connect_cookie = 0; /* our reserved value */
553
return req->rl_xdr_buf;
554
555
outfail:
556
rpcrdma_buffer_put(req);
557
rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++;
558
return NULL;
559
}
560
561
/*
562
* This function returns all RDMA resources to the pool.
563
*/
564
static void
565
xprt_rdma_free(void *buffer)
566
{
567
struct rpcrdma_req *req;
568
struct rpcrdma_xprt *r_xprt;
569
struct rpcrdma_rep *rep;
570
int i;
571
572
if (buffer == NULL)
573
return;
574
575
req = container_of(buffer, struct rpcrdma_req, rl_xdr_buf[0]);
576
if (req->rl_iov.length == 0) { /* see allocate above */
577
r_xprt = container_of(((struct rpcrdma_req *) req->rl_buffer)->rl_buffer,
578
struct rpcrdma_xprt, rx_buf);
579
} else
580
r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf);
581
rep = req->rl_reply;
582
583
dprintk("RPC: %s: called on 0x%p%s\n",
584
__func__, rep, (rep && rep->rr_func) ? " (with waiter)" : "");
585
586
/*
587
* Finish the deregistration. When using mw bind, this was
588
* begun in rpcrdma_reply_handler(). In all other modes, we
589
* do it here, in thread context. The process is considered
590
* complete when the rr_func vector becomes NULL - this
591
* was put in place during rpcrdma_reply_handler() - the wait
592
* call below will not block if the dereg is "done". If
593
* interrupted, our framework will clean up.
594
*/
595
for (i = 0; req->rl_nchunks;) {
596
--req->rl_nchunks;
597
i += rpcrdma_deregister_external(
598
&req->rl_segments[i], r_xprt, NULL);
599
}
600
601
if (rep && wait_event_interruptible(rep->rr_unbind, !rep->rr_func)) {
602
rep->rr_func = NULL; /* abandon the callback */
603
req->rl_reply = NULL;
604
}
605
606
if (req->rl_iov.length == 0) { /* see allocate above */
607
struct rpcrdma_req *oreq = (struct rpcrdma_req *)req->rl_buffer;
608
oreq->rl_reply = req->rl_reply;
609
(void) rpcrdma_deregister_internal(&r_xprt->rx_ia,
610
req->rl_handle,
611
&req->rl_iov);
612
kfree(req);
613
req = oreq;
614
}
615
616
/* Put back request+reply buffers */
617
rpcrdma_buffer_put(req);
618
}
619
620
/*
621
* send_request invokes the meat of RPC RDMA. It must do the following:
622
* 1. Marshal the RPC request into an RPC RDMA request, which means
623
* putting a header in front of data, and creating IOVs for RDMA
624
* from those in the request.
625
* 2. In marshaling, detect opportunities for RDMA, and use them.
626
* 3. Post a recv message to set up asynch completion, then send
627
* the request (rpcrdma_ep_post).
628
* 4. No partial sends are possible in the RPC-RDMA protocol (as in UDP).
629
*/
630
631
static int
632
xprt_rdma_send_request(struct rpc_task *task)
633
{
634
struct rpc_rqst *rqst = task->tk_rqstp;
635
struct rpc_xprt *xprt = task->tk_xprt;
636
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
637
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
638
639
/* marshal the send itself */
640
if (req->rl_niovs == 0 && rpcrdma_marshal_req(rqst) != 0) {
641
r_xprt->rx_stats.failed_marshal_count++;
642
dprintk("RPC: %s: rpcrdma_marshal_req failed\n",
643
__func__);
644
return -EIO;
645
}
646
647
if (req->rl_reply == NULL) /* e.g. reconnection */
648
rpcrdma_recv_buffer_get(req);
649
650
if (req->rl_reply) {
651
req->rl_reply->rr_func = rpcrdma_reply_handler;
652
/* this need only be done once, but... */
653
req->rl_reply->rr_xprt = xprt;
654
}
655
656
/* Must suppress retransmit to maintain credits */
657
if (req->rl_connect_cookie == xprt->connect_cookie)
658
goto drop_connection;
659
req->rl_connect_cookie = xprt->connect_cookie;
660
661
if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req))
662
goto drop_connection;
663
664
rqst->rq_xmit_bytes_sent += rqst->rq_snd_buf.len;
665
rqst->rq_bytes_sent = 0;
666
return 0;
667
668
drop_connection:
669
xprt_disconnect_done(xprt);
670
return -ENOTCONN; /* implies disconnect */
671
}
672
673
static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
674
{
675
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
676
long idle_time = 0;
677
678
if (xprt_connected(xprt))
679
idle_time = (long)(jiffies - xprt->last_used) / HZ;
680
681
seq_printf(seq,
682
"\txprt:\trdma %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu "
683
"%lu %lu %lu %Lu %Lu %Lu %Lu %lu %lu %lu\n",
684
685
0, /* need a local port? */
686
xprt->stat.bind_count,
687
xprt->stat.connect_count,
688
xprt->stat.connect_time,
689
idle_time,
690
xprt->stat.sends,
691
xprt->stat.recvs,
692
xprt->stat.bad_xids,
693
xprt->stat.req_u,
694
xprt->stat.bklog_u,
695
696
r_xprt->rx_stats.read_chunk_count,
697
r_xprt->rx_stats.write_chunk_count,
698
r_xprt->rx_stats.reply_chunk_count,
699
r_xprt->rx_stats.total_rdma_request,
700
r_xprt->rx_stats.total_rdma_reply,
701
r_xprt->rx_stats.pullup_copy_count,
702
r_xprt->rx_stats.fixup_copy_count,
703
r_xprt->rx_stats.hardway_register_count,
704
r_xprt->rx_stats.failed_marshal_count,
705
r_xprt->rx_stats.bad_reply_count);
706
}
707
708
/*
709
* Plumbing for rpc transport switch and kernel module
710
*/
711
712
static struct rpc_xprt_ops xprt_rdma_procs = {
713
.reserve_xprt = xprt_rdma_reserve_xprt,
714
.release_xprt = xprt_release_xprt_cong, /* sunrpc/xprt.c */
715
.release_request = xprt_release_rqst_cong, /* ditto */
716
.set_retrans_timeout = xprt_set_retrans_timeout_def, /* ditto */
717
.rpcbind = rpcb_getport_async, /* sunrpc/rpcb_clnt.c */
718
.set_port = xprt_rdma_set_port,
719
.connect = xprt_rdma_connect,
720
.buf_alloc = xprt_rdma_allocate,
721
.buf_free = xprt_rdma_free,
722
.send_request = xprt_rdma_send_request,
723
.close = xprt_rdma_close,
724
.destroy = xprt_rdma_destroy,
725
.print_stats = xprt_rdma_print_stats
726
};
727
728
static struct xprt_class xprt_rdma = {
729
.list = LIST_HEAD_INIT(xprt_rdma.list),
730
.name = "rdma",
731
.owner = THIS_MODULE,
732
.ident = XPRT_TRANSPORT_RDMA,
733
.setup = xprt_setup_rdma,
734
};
735
736
static void __exit xprt_rdma_cleanup(void)
737
{
738
int rc;
739
740
dprintk(KERN_INFO "RPCRDMA Module Removed, deregister RPC RDMA transport\n");
741
#ifdef RPC_DEBUG
742
if (sunrpc_table_header) {
743
unregister_sysctl_table(sunrpc_table_header);
744
sunrpc_table_header = NULL;
745
}
746
#endif
747
rc = xprt_unregister_transport(&xprt_rdma);
748
if (rc)
749
dprintk("RPC: %s: xprt_unregister returned %i\n",
750
__func__, rc);
751
}
752
753
static int __init xprt_rdma_init(void)
754
{
755
int rc;
756
757
rc = xprt_register_transport(&xprt_rdma);
758
759
if (rc)
760
return rc;
761
762
dprintk(KERN_INFO "RPCRDMA Module Init, register RPC RDMA transport\n");
763
764
dprintk(KERN_INFO "Defaults:\n");
765
dprintk(KERN_INFO "\tSlots %d\n"
766
"\tMaxInlineRead %d\n\tMaxInlineWrite %d\n",
767
xprt_rdma_slot_table_entries,
768
xprt_rdma_max_inline_read, xprt_rdma_max_inline_write);
769
dprintk(KERN_INFO "\tPadding %d\n\tMemreg %d\n",
770
xprt_rdma_inline_write_padding, xprt_rdma_memreg_strategy);
771
772
#ifdef RPC_DEBUG
773
if (!sunrpc_table_header)
774
sunrpc_table_header = register_sysctl_table(sunrpc_table);
775
#endif
776
return 0;
777
}
778
779
module_init(xprt_rdma_init);
780
module_exit(xprt_rdma_cleanup);
781
782