Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
awilliam
GitHub Repository: awilliam/linux-vfio
Path: blob/master/net/rds/ib_send.c
15109 views
1
/*
2
* Copyright (c) 2006 Oracle. All rights reserved.
3
*
4
* This software is available to you under a choice of one of two
5
* licenses. You may choose to be licensed under the terms of the GNU
6
* General Public License (GPL) Version 2, available from the file
7
* COPYING in the main directory of this source tree, or the
8
* OpenIB.org BSD license below:
9
*
10
* Redistribution and use in source and binary forms, with or
11
* without modification, are permitted provided that the following
12
* conditions are met:
13
*
14
* - Redistributions of source code must retain the above
15
* copyright notice, this list of conditions and the following
16
* disclaimer.
17
*
18
* - Redistributions in binary form must reproduce the above
19
* copyright notice, this list of conditions and the following
20
* disclaimer in the documentation and/or other materials
21
* provided with the distribution.
22
*
23
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30
* SOFTWARE.
31
*
32
*/
33
#include <linux/kernel.h>
34
#include <linux/in.h>
35
#include <linux/device.h>
36
#include <linux/dmapool.h>
37
38
#include "rds.h"
39
#include "ib.h"
40
41
static char *rds_ib_wc_status_strings[] = {
42
#define RDS_IB_WC_STATUS_STR(foo) \
43
[IB_WC_##foo] = __stringify(IB_WC_##foo)
44
RDS_IB_WC_STATUS_STR(SUCCESS),
45
RDS_IB_WC_STATUS_STR(LOC_LEN_ERR),
46
RDS_IB_WC_STATUS_STR(LOC_QP_OP_ERR),
47
RDS_IB_WC_STATUS_STR(LOC_EEC_OP_ERR),
48
RDS_IB_WC_STATUS_STR(LOC_PROT_ERR),
49
RDS_IB_WC_STATUS_STR(WR_FLUSH_ERR),
50
RDS_IB_WC_STATUS_STR(MW_BIND_ERR),
51
RDS_IB_WC_STATUS_STR(BAD_RESP_ERR),
52
RDS_IB_WC_STATUS_STR(LOC_ACCESS_ERR),
53
RDS_IB_WC_STATUS_STR(REM_INV_REQ_ERR),
54
RDS_IB_WC_STATUS_STR(REM_ACCESS_ERR),
55
RDS_IB_WC_STATUS_STR(REM_OP_ERR),
56
RDS_IB_WC_STATUS_STR(RETRY_EXC_ERR),
57
RDS_IB_WC_STATUS_STR(RNR_RETRY_EXC_ERR),
58
RDS_IB_WC_STATUS_STR(LOC_RDD_VIOL_ERR),
59
RDS_IB_WC_STATUS_STR(REM_INV_RD_REQ_ERR),
60
RDS_IB_WC_STATUS_STR(REM_ABORT_ERR),
61
RDS_IB_WC_STATUS_STR(INV_EECN_ERR),
62
RDS_IB_WC_STATUS_STR(INV_EEC_STATE_ERR),
63
RDS_IB_WC_STATUS_STR(FATAL_ERR),
64
RDS_IB_WC_STATUS_STR(RESP_TIMEOUT_ERR),
65
RDS_IB_WC_STATUS_STR(GENERAL_ERR),
66
#undef RDS_IB_WC_STATUS_STR
67
};
68
69
char *rds_ib_wc_status_str(enum ib_wc_status status)
70
{
71
return rds_str_array(rds_ib_wc_status_strings,
72
ARRAY_SIZE(rds_ib_wc_status_strings), status);
73
}
74
75
/*
76
* Convert IB-specific error message to RDS error message and call core
77
* completion handler.
78
*/
79
static void rds_ib_send_complete(struct rds_message *rm,
80
int wc_status,
81
void (*complete)(struct rds_message *rm, int status))
82
{
83
int notify_status;
84
85
switch (wc_status) {
86
case IB_WC_WR_FLUSH_ERR:
87
return;
88
89
case IB_WC_SUCCESS:
90
notify_status = RDS_RDMA_SUCCESS;
91
break;
92
93
case IB_WC_REM_ACCESS_ERR:
94
notify_status = RDS_RDMA_REMOTE_ERROR;
95
break;
96
97
default:
98
notify_status = RDS_RDMA_OTHER_ERROR;
99
break;
100
}
101
complete(rm, notify_status);
102
}
103
104
static void rds_ib_send_unmap_data(struct rds_ib_connection *ic,
105
struct rm_data_op *op,
106
int wc_status)
107
{
108
if (op->op_nents)
109
ib_dma_unmap_sg(ic->i_cm_id->device,
110
op->op_sg, op->op_nents,
111
DMA_TO_DEVICE);
112
}
113
114
static void rds_ib_send_unmap_rdma(struct rds_ib_connection *ic,
115
struct rm_rdma_op *op,
116
int wc_status)
117
{
118
if (op->op_mapped) {
119
ib_dma_unmap_sg(ic->i_cm_id->device,
120
op->op_sg, op->op_nents,
121
op->op_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE);
122
op->op_mapped = 0;
123
}
124
125
/* If the user asked for a completion notification on this
126
* message, we can implement three different semantics:
127
* 1. Notify when we received the ACK on the RDS message
128
* that was queued with the RDMA. This provides reliable
129
* notification of RDMA status at the expense of a one-way
130
* packet delay.
131
* 2. Notify when the IB stack gives us the completion event for
132
* the RDMA operation.
133
* 3. Notify when the IB stack gives us the completion event for
134
* the accompanying RDS messages.
135
* Here, we implement approach #3. To implement approach #2,
136
* we would need to take an event for the rdma WR. To implement #1,
137
* don't call rds_rdma_send_complete at all, and fall back to the notify
138
* handling in the ACK processing code.
139
*
140
* Note: There's no need to explicitly sync any RDMA buffers using
141
* ib_dma_sync_sg_for_cpu - the completion for the RDMA
142
* operation itself unmapped the RDMA buffers, which takes care
143
* of synching.
144
*/
145
rds_ib_send_complete(container_of(op, struct rds_message, rdma),
146
wc_status, rds_rdma_send_complete);
147
148
if (op->op_write)
149
rds_stats_add(s_send_rdma_bytes, op->op_bytes);
150
else
151
rds_stats_add(s_recv_rdma_bytes, op->op_bytes);
152
}
153
154
static void rds_ib_send_unmap_atomic(struct rds_ib_connection *ic,
155
struct rm_atomic_op *op,
156
int wc_status)
157
{
158
/* unmap atomic recvbuf */
159
if (op->op_mapped) {
160
ib_dma_unmap_sg(ic->i_cm_id->device, op->op_sg, 1,
161
DMA_FROM_DEVICE);
162
op->op_mapped = 0;
163
}
164
165
rds_ib_send_complete(container_of(op, struct rds_message, atomic),
166
wc_status, rds_atomic_send_complete);
167
168
if (op->op_type == RDS_ATOMIC_TYPE_CSWP)
169
rds_ib_stats_inc(s_ib_atomic_cswp);
170
else
171
rds_ib_stats_inc(s_ib_atomic_fadd);
172
}
173
174
/*
175
* Unmap the resources associated with a struct send_work.
176
*
177
* Returns the rm for no good reason other than it is unobtainable
178
* other than by switching on wr.opcode, currently, and the caller,
179
* the event handler, needs it.
180
*/
181
static struct rds_message *rds_ib_send_unmap_op(struct rds_ib_connection *ic,
182
struct rds_ib_send_work *send,
183
int wc_status)
184
{
185
struct rds_message *rm = NULL;
186
187
/* In the error case, wc.opcode sometimes contains garbage */
188
switch (send->s_wr.opcode) {
189
case IB_WR_SEND:
190
if (send->s_op) {
191
rm = container_of(send->s_op, struct rds_message, data);
192
rds_ib_send_unmap_data(ic, send->s_op, wc_status);
193
}
194
break;
195
case IB_WR_RDMA_WRITE:
196
case IB_WR_RDMA_READ:
197
if (send->s_op) {
198
rm = container_of(send->s_op, struct rds_message, rdma);
199
rds_ib_send_unmap_rdma(ic, send->s_op, wc_status);
200
}
201
break;
202
case IB_WR_ATOMIC_FETCH_AND_ADD:
203
case IB_WR_ATOMIC_CMP_AND_SWP:
204
if (send->s_op) {
205
rm = container_of(send->s_op, struct rds_message, atomic);
206
rds_ib_send_unmap_atomic(ic, send->s_op, wc_status);
207
}
208
break;
209
default:
210
if (printk_ratelimit())
211
printk(KERN_NOTICE
212
"RDS/IB: %s: unexpected opcode 0x%x in WR!\n",
213
__func__, send->s_wr.opcode);
214
break;
215
}
216
217
send->s_wr.opcode = 0xdead;
218
219
return rm;
220
}
221
222
void rds_ib_send_init_ring(struct rds_ib_connection *ic)
223
{
224
struct rds_ib_send_work *send;
225
u32 i;
226
227
for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
228
struct ib_sge *sge;
229
230
send->s_op = NULL;
231
232
send->s_wr.wr_id = i;
233
send->s_wr.sg_list = send->s_sge;
234
send->s_wr.ex.imm_data = 0;
235
236
sge = &send->s_sge[0];
237
sge->addr = ic->i_send_hdrs_dma + (i * sizeof(struct rds_header));
238
sge->length = sizeof(struct rds_header);
239
sge->lkey = ic->i_mr->lkey;
240
241
send->s_sge[1].lkey = ic->i_mr->lkey;
242
}
243
}
244
245
void rds_ib_send_clear_ring(struct rds_ib_connection *ic)
246
{
247
struct rds_ib_send_work *send;
248
u32 i;
249
250
for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
251
if (send->s_op && send->s_wr.opcode != 0xdead)
252
rds_ib_send_unmap_op(ic, send, IB_WC_WR_FLUSH_ERR);
253
}
254
}
255
256
/*
257
* The only fast path caller always has a non-zero nr, so we don't
258
* bother testing nr before performing the atomic sub.
259
*/
260
static void rds_ib_sub_signaled(struct rds_ib_connection *ic, int nr)
261
{
262
if ((atomic_sub_return(nr, &ic->i_signaled_sends) == 0) &&
263
waitqueue_active(&rds_ib_ring_empty_wait))
264
wake_up(&rds_ib_ring_empty_wait);
265
BUG_ON(atomic_read(&ic->i_signaled_sends) < 0);
266
}
267
268
/*
269
* The _oldest/_free ring operations here race cleanly with the alloc/unalloc
270
* operations performed in the send path. As the sender allocs and potentially
271
* unallocs the next free entry in the ring it doesn't alter which is
272
* the next to be freed, which is what this is concerned with.
273
*/
274
void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
275
{
276
struct rds_connection *conn = context;
277
struct rds_ib_connection *ic = conn->c_transport_data;
278
struct rds_message *rm = NULL;
279
struct ib_wc wc;
280
struct rds_ib_send_work *send;
281
u32 completed;
282
u32 oldest;
283
u32 i = 0;
284
int ret;
285
int nr_sig = 0;
286
287
rdsdebug("cq %p conn %p\n", cq, conn);
288
rds_ib_stats_inc(s_ib_tx_cq_call);
289
ret = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
290
if (ret)
291
rdsdebug("ib_req_notify_cq send failed: %d\n", ret);
292
293
while (ib_poll_cq(cq, 1, &wc) > 0) {
294
rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n",
295
(unsigned long long)wc.wr_id, wc.status,
296
rds_ib_wc_status_str(wc.status), wc.byte_len,
297
be32_to_cpu(wc.ex.imm_data));
298
rds_ib_stats_inc(s_ib_tx_cq_event);
299
300
if (wc.wr_id == RDS_IB_ACK_WR_ID) {
301
if (ic->i_ack_queued + HZ/2 < jiffies)
302
rds_ib_stats_inc(s_ib_tx_stalled);
303
rds_ib_ack_send_complete(ic);
304
continue;
305
}
306
307
oldest = rds_ib_ring_oldest(&ic->i_send_ring);
308
309
completed = rds_ib_ring_completed(&ic->i_send_ring, wc.wr_id, oldest);
310
311
for (i = 0; i < completed; i++) {
312
send = &ic->i_sends[oldest];
313
if (send->s_wr.send_flags & IB_SEND_SIGNALED)
314
nr_sig++;
315
316
rm = rds_ib_send_unmap_op(ic, send, wc.status);
317
318
if (send->s_queued + HZ/2 < jiffies)
319
rds_ib_stats_inc(s_ib_tx_stalled);
320
321
if (send->s_op) {
322
if (send->s_op == rm->m_final_op) {
323
/* If anyone waited for this message to get flushed out, wake
324
* them up now */
325
rds_message_unmapped(rm);
326
}
327
rds_message_put(rm);
328
send->s_op = NULL;
329
}
330
331
oldest = (oldest + 1) % ic->i_send_ring.w_nr;
332
}
333
334
rds_ib_ring_free(&ic->i_send_ring, completed);
335
rds_ib_sub_signaled(ic, nr_sig);
336
nr_sig = 0;
337
338
if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
339
test_bit(0, &conn->c_map_queued))
340
queue_delayed_work(rds_wq, &conn->c_send_w, 0);
341
342
/* We expect errors as the qp is drained during shutdown */
343
if (wc.status != IB_WC_SUCCESS && rds_conn_up(conn)) {
344
rds_ib_conn_error(conn, "send completion on %pI4 had status "
345
"%u (%s), disconnecting and reconnecting\n",
346
&conn->c_faddr, wc.status,
347
rds_ib_wc_status_str(wc.status));
348
}
349
}
350
}
351
352
/*
353
* This is the main function for allocating credits when sending
354
* messages.
355
*
356
* Conceptually, we have two counters:
357
* - send credits: this tells us how many WRs we're allowed
358
* to submit without overruning the receiver's queue. For
359
* each SEND WR we post, we decrement this by one.
360
*
361
* - posted credits: this tells us how many WRs we recently
362
* posted to the receive queue. This value is transferred
363
* to the peer as a "credit update" in a RDS header field.
364
* Every time we transmit credits to the peer, we subtract
365
* the amount of transferred credits from this counter.
366
*
367
* It is essential that we avoid situations where both sides have
368
* exhausted their send credits, and are unable to send new credits
369
* to the peer. We achieve this by requiring that we send at least
370
* one credit update to the peer before exhausting our credits.
371
* When new credits arrive, we subtract one credit that is withheld
372
* until we've posted new buffers and are ready to transmit these
373
* credits (see rds_ib_send_add_credits below).
374
*
375
* The RDS send code is essentially single-threaded; rds_send_xmit
376
* sets RDS_IN_XMIT to ensure exclusive access to the send ring.
377
* However, the ACK sending code is independent and can race with
378
* message SENDs.
379
*
380
* In the send path, we need to update the counters for send credits
381
* and the counter of posted buffers atomically - when we use the
382
* last available credit, we cannot allow another thread to race us
383
* and grab the posted credits counter. Hence, we have to use a
384
* spinlock to protect the credit counter, or use atomics.
385
*
386
* Spinlocks shared between the send and the receive path are bad,
387
* because they create unnecessary delays. An early implementation
388
* using a spinlock showed a 5% degradation in throughput at some
389
* loads.
390
*
391
* This implementation avoids spinlocks completely, putting both
392
* counters into a single atomic, and updating that atomic using
393
* atomic_add (in the receive path, when receiving fresh credits),
394
* and using atomic_cmpxchg when updating the two counters.
395
*/
396
int rds_ib_send_grab_credits(struct rds_ib_connection *ic,
397
u32 wanted, u32 *adv_credits, int need_posted, int max_posted)
398
{
399
unsigned int avail, posted, got = 0, advertise;
400
long oldval, newval;
401
402
*adv_credits = 0;
403
if (!ic->i_flowctl)
404
return wanted;
405
406
try_again:
407
advertise = 0;
408
oldval = newval = atomic_read(&ic->i_credits);
409
posted = IB_GET_POST_CREDITS(oldval);
410
avail = IB_GET_SEND_CREDITS(oldval);
411
412
rdsdebug("rds_ib_send_grab_credits(%u): credits=%u posted=%u\n",
413
wanted, avail, posted);
414
415
/* The last credit must be used to send a credit update. */
416
if (avail && !posted)
417
avail--;
418
419
if (avail < wanted) {
420
struct rds_connection *conn = ic->i_cm_id->context;
421
422
/* Oops, there aren't that many credits left! */
423
set_bit(RDS_LL_SEND_FULL, &conn->c_flags);
424
got = avail;
425
} else {
426
/* Sometimes you get what you want, lalala. */
427
got = wanted;
428
}
429
newval -= IB_SET_SEND_CREDITS(got);
430
431
/*
432
* If need_posted is non-zero, then the caller wants
433
* the posted regardless of whether any send credits are
434
* available.
435
*/
436
if (posted && (got || need_posted)) {
437
advertise = min_t(unsigned int, posted, max_posted);
438
newval -= IB_SET_POST_CREDITS(advertise);
439
}
440
441
/* Finally bill everything */
442
if (atomic_cmpxchg(&ic->i_credits, oldval, newval) != oldval)
443
goto try_again;
444
445
*adv_credits = advertise;
446
return got;
447
}
448
449
void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits)
450
{
451
struct rds_ib_connection *ic = conn->c_transport_data;
452
453
if (credits == 0)
454
return;
455
456
rdsdebug("rds_ib_send_add_credits(%u): current=%u%s\n",
457
credits,
458
IB_GET_SEND_CREDITS(atomic_read(&ic->i_credits)),
459
test_bit(RDS_LL_SEND_FULL, &conn->c_flags) ? ", ll_send_full" : "");
460
461
atomic_add(IB_SET_SEND_CREDITS(credits), &ic->i_credits);
462
if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags))
463
queue_delayed_work(rds_wq, &conn->c_send_w, 0);
464
465
WARN_ON(IB_GET_SEND_CREDITS(credits) >= 16384);
466
467
rds_ib_stats_inc(s_ib_rx_credit_updates);
468
}
469
470
void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted)
471
{
472
struct rds_ib_connection *ic = conn->c_transport_data;
473
474
if (posted == 0)
475
return;
476
477
atomic_add(IB_SET_POST_CREDITS(posted), &ic->i_credits);
478
479
/* Decide whether to send an update to the peer now.
480
* If we would send a credit update for every single buffer we
481
* post, we would end up with an ACK storm (ACK arrives,
482
* consumes buffer, we refill the ring, send ACK to remote
483
* advertising the newly posted buffer... ad inf)
484
*
485
* Performance pretty much depends on how often we send
486
* credit updates - too frequent updates mean lots of ACKs.
487
* Too infrequent updates, and the peer will run out of
488
* credits and has to throttle.
489
* For the time being, 16 seems to be a good compromise.
490
*/
491
if (IB_GET_POST_CREDITS(atomic_read(&ic->i_credits)) >= 16)
492
set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
493
}
494
495
static inline int rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
496
struct rds_ib_send_work *send,
497
bool notify)
498
{
499
/*
500
* We want to delay signaling completions just enough to get
501
* the batching benefits but not so much that we create dead time
502
* on the wire.
503
*/
504
if (ic->i_unsignaled_wrs-- == 0 || notify) {
505
ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs;
506
send->s_wr.send_flags |= IB_SEND_SIGNALED;
507
return 1;
508
}
509
return 0;
510
}
511
512
/*
513
* This can be called multiple times for a given message. The first time
514
* we see a message we map its scatterlist into the IB device so that
515
* we can provide that mapped address to the IB scatter gather entries
516
* in the IB work requests. We translate the scatterlist into a series
517
* of work requests that fragment the message. These work requests complete
518
* in order so we pass ownership of the message to the completion handler
519
* once we send the final fragment.
520
*
521
* The RDS core uses the c_send_lock to only enter this function once
522
* per connection. This makes sure that the tx ring alloc/unalloc pairs
523
* don't get out of sync and confuse the ring.
524
*/
525
int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
526
unsigned int hdr_off, unsigned int sg, unsigned int off)
527
{
528
struct rds_ib_connection *ic = conn->c_transport_data;
529
struct ib_device *dev = ic->i_cm_id->device;
530
struct rds_ib_send_work *send = NULL;
531
struct rds_ib_send_work *first;
532
struct rds_ib_send_work *prev;
533
struct ib_send_wr *failed_wr;
534
struct scatterlist *scat;
535
u32 pos;
536
u32 i;
537
u32 work_alloc;
538
u32 credit_alloc = 0;
539
u32 posted;
540
u32 adv_credits = 0;
541
int send_flags = 0;
542
int bytes_sent = 0;
543
int ret;
544
int flow_controlled = 0;
545
int nr_sig = 0;
546
547
BUG_ON(off % RDS_FRAG_SIZE);
548
BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header));
549
550
/* Do not send cong updates to IB loopback */
551
if (conn->c_loopback
552
&& rm->m_inc.i_hdr.h_flags & RDS_FLAG_CONG_BITMAP) {
553
rds_cong_map_updated(conn->c_fcong, ~(u64) 0);
554
scat = &rm->data.op_sg[sg];
555
ret = sizeof(struct rds_header) + RDS_CONG_MAP_BYTES;
556
ret = min_t(int, ret, scat->length - conn->c_xmit_data_off);
557
return ret;
558
}
559
560
/* FIXME we may overallocate here */
561
if (be32_to_cpu(rm->m_inc.i_hdr.h_len) == 0)
562
i = 1;
563
else
564
i = ceil(be32_to_cpu(rm->m_inc.i_hdr.h_len), RDS_FRAG_SIZE);
565
566
work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, i, &pos);
567
if (work_alloc == 0) {
568
set_bit(RDS_LL_SEND_FULL, &conn->c_flags);
569
rds_ib_stats_inc(s_ib_tx_ring_full);
570
ret = -ENOMEM;
571
goto out;
572
}
573
574
if (ic->i_flowctl) {
575
credit_alloc = rds_ib_send_grab_credits(ic, work_alloc, &posted, 0, RDS_MAX_ADV_CREDIT);
576
adv_credits += posted;
577
if (credit_alloc < work_alloc) {
578
rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - credit_alloc);
579
work_alloc = credit_alloc;
580
flow_controlled = 1;
581
}
582
if (work_alloc == 0) {
583
set_bit(RDS_LL_SEND_FULL, &conn->c_flags);
584
rds_ib_stats_inc(s_ib_tx_throttle);
585
ret = -ENOMEM;
586
goto out;
587
}
588
}
589
590
/* map the message the first time we see it */
591
if (!ic->i_data_op) {
592
if (rm->data.op_nents) {
593
rm->data.op_count = ib_dma_map_sg(dev,
594
rm->data.op_sg,
595
rm->data.op_nents,
596
DMA_TO_DEVICE);
597
rdsdebug("ic %p mapping rm %p: %d\n", ic, rm, rm->data.op_count);
598
if (rm->data.op_count == 0) {
599
rds_ib_stats_inc(s_ib_tx_sg_mapping_failure);
600
rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
601
ret = -ENOMEM; /* XXX ? */
602
goto out;
603
}
604
} else {
605
rm->data.op_count = 0;
606
}
607
608
rds_message_addref(rm);
609
ic->i_data_op = &rm->data;
610
611
/* Finalize the header */
612
if (test_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags))
613
rm->m_inc.i_hdr.h_flags |= RDS_FLAG_ACK_REQUIRED;
614
if (test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags))
615
rm->m_inc.i_hdr.h_flags |= RDS_FLAG_RETRANSMITTED;
616
617
/* If it has a RDMA op, tell the peer we did it. This is
618
* used by the peer to release use-once RDMA MRs. */
619
if (rm->rdma.op_active) {
620
struct rds_ext_header_rdma ext_hdr;
621
622
ext_hdr.h_rdma_rkey = cpu_to_be32(rm->rdma.op_rkey);
623
rds_message_add_extension(&rm->m_inc.i_hdr,
624
RDS_EXTHDR_RDMA, &ext_hdr, sizeof(ext_hdr));
625
}
626
if (rm->m_rdma_cookie) {
627
rds_message_add_rdma_dest_extension(&rm->m_inc.i_hdr,
628
rds_rdma_cookie_key(rm->m_rdma_cookie),
629
rds_rdma_cookie_offset(rm->m_rdma_cookie));
630
}
631
632
/* Note - rds_ib_piggyb_ack clears the ACK_REQUIRED bit, so
633
* we should not do this unless we have a chance of at least
634
* sticking the header into the send ring. Which is why we
635
* should call rds_ib_ring_alloc first. */
636
rm->m_inc.i_hdr.h_ack = cpu_to_be64(rds_ib_piggyb_ack(ic));
637
rds_message_make_checksum(&rm->m_inc.i_hdr);
638
639
/*
640
* Update adv_credits since we reset the ACK_REQUIRED bit.
641
*/
642
if (ic->i_flowctl) {
643
rds_ib_send_grab_credits(ic, 0, &posted, 1, RDS_MAX_ADV_CREDIT - adv_credits);
644
adv_credits += posted;
645
BUG_ON(adv_credits > 255);
646
}
647
}
648
649
/* Sometimes you want to put a fence between an RDMA
650
* READ and the following SEND.
651
* We could either do this all the time
652
* or when requested by the user. Right now, we let
653
* the application choose.
654
*/
655
if (rm->rdma.op_active && rm->rdma.op_fence)
656
send_flags = IB_SEND_FENCE;
657
658
/* Each frag gets a header. Msgs may be 0 bytes */
659
send = &ic->i_sends[pos];
660
first = send;
661
prev = NULL;
662
scat = &ic->i_data_op->op_sg[sg];
663
i = 0;
664
do {
665
unsigned int len = 0;
666
667
/* Set up the header */
668
send->s_wr.send_flags = send_flags;
669
send->s_wr.opcode = IB_WR_SEND;
670
send->s_wr.num_sge = 1;
671
send->s_wr.next = NULL;
672
send->s_queued = jiffies;
673
send->s_op = NULL;
674
675
send->s_sge[0].addr = ic->i_send_hdrs_dma
676
+ (pos * sizeof(struct rds_header));
677
send->s_sge[0].length = sizeof(struct rds_header);
678
679
memcpy(&ic->i_send_hdrs[pos], &rm->m_inc.i_hdr, sizeof(struct rds_header));
680
681
/* Set up the data, if present */
682
if (i < work_alloc
683
&& scat != &rm->data.op_sg[rm->data.op_count]) {
684
len = min(RDS_FRAG_SIZE, ib_sg_dma_len(dev, scat) - off);
685
send->s_wr.num_sge = 2;
686
687
send->s_sge[1].addr = ib_sg_dma_address(dev, scat) + off;
688
send->s_sge[1].length = len;
689
690
bytes_sent += len;
691
off += len;
692
if (off == ib_sg_dma_len(dev, scat)) {
693
scat++;
694
off = 0;
695
}
696
}
697
698
rds_ib_set_wr_signal_state(ic, send, 0);
699
700
/*
701
* Always signal the last one if we're stopping due to flow control.
702
*/
703
if (ic->i_flowctl && flow_controlled && i == (work_alloc-1))
704
send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
705
706
if (send->s_wr.send_flags & IB_SEND_SIGNALED)
707
nr_sig++;
708
709
rdsdebug("send %p wr %p num_sge %u next %p\n", send,
710
&send->s_wr, send->s_wr.num_sge, send->s_wr.next);
711
712
if (ic->i_flowctl && adv_credits) {
713
struct rds_header *hdr = &ic->i_send_hdrs[pos];
714
715
/* add credit and redo the header checksum */
716
hdr->h_credit = adv_credits;
717
rds_message_make_checksum(hdr);
718
adv_credits = 0;
719
rds_ib_stats_inc(s_ib_tx_credit_updates);
720
}
721
722
if (prev)
723
prev->s_wr.next = &send->s_wr;
724
prev = send;
725
726
pos = (pos + 1) % ic->i_send_ring.w_nr;
727
send = &ic->i_sends[pos];
728
i++;
729
730
} while (i < work_alloc
731
&& scat != &rm->data.op_sg[rm->data.op_count]);
732
733
/* Account the RDS header in the number of bytes we sent, but just once.
734
* The caller has no concept of fragmentation. */
735
if (hdr_off == 0)
736
bytes_sent += sizeof(struct rds_header);
737
738
/* if we finished the message then send completion owns it */
739
if (scat == &rm->data.op_sg[rm->data.op_count]) {
740
prev->s_op = ic->i_data_op;
741
prev->s_wr.send_flags |= IB_SEND_SOLICITED;
742
ic->i_data_op = NULL;
743
}
744
745
/* Put back wrs & credits we didn't use */
746
if (i < work_alloc) {
747
rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i);
748
work_alloc = i;
749
}
750
if (ic->i_flowctl && i < credit_alloc)
751
rds_ib_send_add_credits(conn, credit_alloc - i);
752
753
if (nr_sig)
754
atomic_add(nr_sig, &ic->i_signaled_sends);
755
756
/* XXX need to worry about failed_wr and partial sends. */
757
failed_wr = &first->s_wr;
758
ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
759
rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
760
first, &first->s_wr, ret, failed_wr);
761
BUG_ON(failed_wr != &first->s_wr);
762
if (ret) {
763
printk(KERN_WARNING "RDS/IB: ib_post_send to %pI4 "
764
"returned %d\n", &conn->c_faddr, ret);
765
rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
766
rds_ib_sub_signaled(ic, nr_sig);
767
if (prev->s_op) {
768
ic->i_data_op = prev->s_op;
769
prev->s_op = NULL;
770
}
771
772
rds_ib_conn_error(ic->conn, "ib_post_send failed\n");
773
goto out;
774
}
775
776
ret = bytes_sent;
777
out:
778
BUG_ON(adv_credits);
779
return ret;
780
}
781
782
/*
783
* Issue atomic operation.
784
* A simplified version of the rdma case, we always map 1 SG, and
785
* only 8 bytes, for the return value from the atomic operation.
786
*/
787
int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
788
{
789
struct rds_ib_connection *ic = conn->c_transport_data;
790
struct rds_ib_send_work *send = NULL;
791
struct ib_send_wr *failed_wr;
792
struct rds_ib_device *rds_ibdev;
793
u32 pos;
794
u32 work_alloc;
795
int ret;
796
int nr_sig = 0;
797
798
rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
799
800
work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, 1, &pos);
801
if (work_alloc != 1) {
802
rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
803
rds_ib_stats_inc(s_ib_tx_ring_full);
804
ret = -ENOMEM;
805
goto out;
806
}
807
808
/* address of send request in ring */
809
send = &ic->i_sends[pos];
810
send->s_queued = jiffies;
811
812
if (op->op_type == RDS_ATOMIC_TYPE_CSWP) {
813
send->s_wr.opcode = IB_WR_MASKED_ATOMIC_CMP_AND_SWP;
814
send->s_wr.wr.atomic.compare_add = op->op_m_cswp.compare;
815
send->s_wr.wr.atomic.swap = op->op_m_cswp.swap;
816
send->s_wr.wr.atomic.compare_add_mask = op->op_m_cswp.compare_mask;
817
send->s_wr.wr.atomic.swap_mask = op->op_m_cswp.swap_mask;
818
} else { /* FADD */
819
send->s_wr.opcode = IB_WR_MASKED_ATOMIC_FETCH_AND_ADD;
820
send->s_wr.wr.atomic.compare_add = op->op_m_fadd.add;
821
send->s_wr.wr.atomic.swap = 0;
822
send->s_wr.wr.atomic.compare_add_mask = op->op_m_fadd.nocarry_mask;
823
send->s_wr.wr.atomic.swap_mask = 0;
824
}
825
nr_sig = rds_ib_set_wr_signal_state(ic, send, op->op_notify);
826
send->s_wr.num_sge = 1;
827
send->s_wr.next = NULL;
828
send->s_wr.wr.atomic.remote_addr = op->op_remote_addr;
829
send->s_wr.wr.atomic.rkey = op->op_rkey;
830
send->s_op = op;
831
rds_message_addref(container_of(send->s_op, struct rds_message, atomic));
832
833
/* map 8 byte retval buffer to the device */
834
ret = ib_dma_map_sg(ic->i_cm_id->device, op->op_sg, 1, DMA_FROM_DEVICE);
835
rdsdebug("ic %p mapping atomic op %p. mapped %d pg\n", ic, op, ret);
836
if (ret != 1) {
837
rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
838
rds_ib_stats_inc(s_ib_tx_sg_mapping_failure);
839
ret = -ENOMEM; /* XXX ? */
840
goto out;
841
}
842
843
/* Convert our struct scatterlist to struct ib_sge */
844
send->s_sge[0].addr = ib_sg_dma_address(ic->i_cm_id->device, op->op_sg);
845
send->s_sge[0].length = ib_sg_dma_len(ic->i_cm_id->device, op->op_sg);
846
send->s_sge[0].lkey = ic->i_mr->lkey;
847
848
rdsdebug("rva %Lx rpa %Lx len %u\n", op->op_remote_addr,
849
send->s_sge[0].addr, send->s_sge[0].length);
850
851
if (nr_sig)
852
atomic_add(nr_sig, &ic->i_signaled_sends);
853
854
failed_wr = &send->s_wr;
855
ret = ib_post_send(ic->i_cm_id->qp, &send->s_wr, &failed_wr);
856
rdsdebug("ic %p send %p (wr %p) ret %d wr %p\n", ic,
857
send, &send->s_wr, ret, failed_wr);
858
BUG_ON(failed_wr != &send->s_wr);
859
if (ret) {
860
printk(KERN_WARNING "RDS/IB: atomic ib_post_send to %pI4 "
861
"returned %d\n", &conn->c_faddr, ret);
862
rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
863
rds_ib_sub_signaled(ic, nr_sig);
864
goto out;
865
}
866
867
if (unlikely(failed_wr != &send->s_wr)) {
868
printk(KERN_WARNING "RDS/IB: atomic ib_post_send() rc=%d, but failed_wqe updated!\n", ret);
869
BUG_ON(failed_wr != &send->s_wr);
870
}
871
872
out:
873
return ret;
874
}
875
876
int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
877
{
878
struct rds_ib_connection *ic = conn->c_transport_data;
879
struct rds_ib_send_work *send = NULL;
880
struct rds_ib_send_work *first;
881
struct rds_ib_send_work *prev;
882
struct ib_send_wr *failed_wr;
883
struct scatterlist *scat;
884
unsigned long len;
885
u64 remote_addr = op->op_remote_addr;
886
u32 max_sge = ic->rds_ibdev->max_sge;
887
u32 pos;
888
u32 work_alloc;
889
u32 i;
890
u32 j;
891
int sent;
892
int ret;
893
int num_sge;
894
int nr_sig = 0;
895
896
/* map the op the first time we see it */
897
if (!op->op_mapped) {
898
op->op_count = ib_dma_map_sg(ic->i_cm_id->device,
899
op->op_sg, op->op_nents, (op->op_write) ?
900
DMA_TO_DEVICE : DMA_FROM_DEVICE);
901
rdsdebug("ic %p mapping op %p: %d\n", ic, op, op->op_count);
902
if (op->op_count == 0) {
903
rds_ib_stats_inc(s_ib_tx_sg_mapping_failure);
904
ret = -ENOMEM; /* XXX ? */
905
goto out;
906
}
907
908
op->op_mapped = 1;
909
}
910
911
/*
912
* Instead of knowing how to return a partial rdma read/write we insist that there
913
* be enough work requests to send the entire message.
914
*/
915
i = ceil(op->op_count, max_sge);
916
917
work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, i, &pos);
918
if (work_alloc != i) {
919
rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
920
rds_ib_stats_inc(s_ib_tx_ring_full);
921
ret = -ENOMEM;
922
goto out;
923
}
924
925
send = &ic->i_sends[pos];
926
first = send;
927
prev = NULL;
928
scat = &op->op_sg[0];
929
sent = 0;
930
num_sge = op->op_count;
931
932
for (i = 0; i < work_alloc && scat != &op->op_sg[op->op_count]; i++) {
933
send->s_wr.send_flags = 0;
934
send->s_queued = jiffies;
935
send->s_op = NULL;
936
937
nr_sig += rds_ib_set_wr_signal_state(ic, send, op->op_notify);
938
939
send->s_wr.opcode = op->op_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ;
940
send->s_wr.wr.rdma.remote_addr = remote_addr;
941
send->s_wr.wr.rdma.rkey = op->op_rkey;
942
943
if (num_sge > max_sge) {
944
send->s_wr.num_sge = max_sge;
945
num_sge -= max_sge;
946
} else {
947
send->s_wr.num_sge = num_sge;
948
}
949
950
send->s_wr.next = NULL;
951
952
if (prev)
953
prev->s_wr.next = &send->s_wr;
954
955
for (j = 0; j < send->s_wr.num_sge && scat != &op->op_sg[op->op_count]; j++) {
956
len = ib_sg_dma_len(ic->i_cm_id->device, scat);
957
send->s_sge[j].addr =
958
ib_sg_dma_address(ic->i_cm_id->device, scat);
959
send->s_sge[j].length = len;
960
send->s_sge[j].lkey = ic->i_mr->lkey;
961
962
sent += len;
963
rdsdebug("ic %p sent %d remote_addr %llu\n", ic, sent, remote_addr);
964
965
remote_addr += len;
966
scat++;
967
}
968
969
rdsdebug("send %p wr %p num_sge %u next %p\n", send,
970
&send->s_wr, send->s_wr.num_sge, send->s_wr.next);
971
972
prev = send;
973
if (++send == &ic->i_sends[ic->i_send_ring.w_nr])
974
send = ic->i_sends;
975
}
976
977
/* give a reference to the last op */
978
if (scat == &op->op_sg[op->op_count]) {
979
prev->s_op = op;
980
rds_message_addref(container_of(op, struct rds_message, rdma));
981
}
982
983
if (i < work_alloc) {
984
rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i);
985
work_alloc = i;
986
}
987
988
if (nr_sig)
989
atomic_add(nr_sig, &ic->i_signaled_sends);
990
991
failed_wr = &first->s_wr;
992
ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
993
rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
994
first, &first->s_wr, ret, failed_wr);
995
BUG_ON(failed_wr != &first->s_wr);
996
if (ret) {
997
printk(KERN_WARNING "RDS/IB: rdma ib_post_send to %pI4 "
998
"returned %d\n", &conn->c_faddr, ret);
999
rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
1000
rds_ib_sub_signaled(ic, nr_sig);
1001
goto out;
1002
}
1003
1004
if (unlikely(failed_wr != &first->s_wr)) {
1005
printk(KERN_WARNING "RDS/IB: ib_post_send() rc=%d, but failed_wqe updated!\n", ret);
1006
BUG_ON(failed_wr != &first->s_wr);
1007
}
1008
1009
1010
out:
1011
return ret;
1012
}
1013
1014
void rds_ib_xmit_complete(struct rds_connection *conn)
1015
{
1016
struct rds_ib_connection *ic = conn->c_transport_data;
1017
1018
/* We may have a pending ACK or window update we were unable
1019
* to send previously (due to flow control). Try again. */
1020
rds_ib_attempt_ack(ic);
1021
}
1022
1023