Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/freebsd-src
Path: blob/main/sys/ofed/drivers/infiniband/ulp/sdp/sdp_zcopy.c
39566 views
1
/*-
2
* SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0
3
*
4
* Copyright (c) 2006 Mellanox Technologies Ltd. All rights reserved.
5
*
6
* This software is available to you under a choice of one of two
7
* licenses. You may choose to be licensed under the terms of the GNU
8
* General Public License (GPL) Version 2, available from the file
9
* COPYING in the main directory of this source tree, or the
10
* OpenIB.org BSD license below:
11
*
12
* Redistribution and use in source and binary forms, with or
13
* without modification, are permitted provided that the following
14
* conditions are met:
15
*
16
* - Redistributions of source code must retain the above
17
* copyright notice, this list of conditions and the following
18
* disclaimer.
19
*
20
* - Redistributions in binary form must reproduce the above
21
* copyright notice, this list of conditions and the following
22
* disclaimer in the documentation and/or other materials
23
* provided with the distribution.
24
*
25
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
26
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
27
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
28
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
29
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
30
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
31
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
32
* SOFTWARE.
33
*/
34
#include <linux/tcp.h>
35
#include <asm/ioctls.h>
36
#include <linux/workqueue.h>
37
#include <linux/net.h>
38
#include <linux/socket.h>
39
#include <net/protocol.h>
40
#include <net/inet_common.h>
41
#include <rdma/rdma_cm.h>
42
#include <rdma/ib_verbs.h>
43
#include <rdma/ib_fmr_pool.h>
44
#include <rdma/ib_umem.h>
45
#include <net/tcp.h> /* for memcpy_toiovec */
46
#include <asm/io.h>
47
#include <asm/uaccess.h>
48
#include <linux/delay.h>
49
#include "sdp.h"
50
51
static int sdp_post_srcavail(struct socket *sk, struct tx_srcavail_state *tx_sa)
52
{
53
struct sdp_sock *ssk = sdp_sk(sk);
54
struct mbuf *mb;
55
int payload_len;
56
struct page *payload_pg;
57
int off, len;
58
struct ib_umem_chunk *chunk;
59
60
WARN_ON(ssk->tx_sa);
61
62
BUG_ON(!tx_sa);
63
BUG_ON(!tx_sa->fmr || !tx_sa->fmr->fmr->lkey);
64
BUG_ON(!tx_sa->umem);
65
BUG_ON(!tx_sa->umem->chunk_list.next);
66
67
chunk = list_entry(tx_sa->umem->chunk_list.next, struct ib_umem_chunk, list);
68
BUG_ON(!chunk->nmap);
69
70
off = tx_sa->umem->offset;
71
len = tx_sa->umem->length;
72
73
tx_sa->bytes_sent = tx_sa->bytes_acked = 0;
74
75
mb = sdp_alloc_mb_srcavail(sk, len, tx_sa->fmr->fmr->lkey, off, 0);
76
if (!mb) {
77
return -ENOMEM;
78
}
79
sdp_dbg_data(sk, "sending SrcAvail\n");
80
81
TX_SRCAVAIL_STATE(mb) = tx_sa; /* tx_sa is hanged on the mb
82
* but continue to live after mb is freed */
83
ssk->tx_sa = tx_sa;
84
85
/* must have payload inlined in SrcAvail packet in combined mode */
86
payload_len = MIN(tx_sa->umem->page_size - off, len);
87
payload_len = MIN(payload_len, ssk->xmit_size_goal - sizeof(struct sdp_srcah));
88
payload_pg = sg_page(&chunk->page_list[0]);
89
get_page(payload_pg);
90
91
sdp_dbg_data(sk, "payload: off: 0x%x, pg: %p, len: 0x%x\n",
92
off, payload_pg, payload_len);
93
94
mb_fill_page_desc(mb, mb_shinfo(mb)->nr_frags,
95
payload_pg, off, payload_len);
96
97
mb->len += payload_len;
98
mb->data_len = payload_len;
99
mb->truesize += payload_len;
100
// sk->sk_wmem_queued += payload_len;
101
// sk->sk_forward_alloc -= payload_len;
102
103
mb_entail(sk, ssk, mb);
104
105
ssk->write_seq += payload_len;
106
SDP_SKB_CB(mb)->end_seq += payload_len;
107
108
tx_sa->bytes_sent = tx_sa->umem->length;
109
tx_sa->bytes_acked = payload_len;
110
111
/* TODO: pushing the mb into the tx_queue should be enough */
112
113
return 0;
114
}
115
116
static int sdp_post_srcavail_cancel(struct socket *sk)
117
{
118
struct sdp_sock *ssk = sdp_sk(sk);
119
struct mbuf *mb;
120
121
sdp_dbg_data(ssk->socket, "Posting srcavail cancel\n");
122
123
mb = sdp_alloc_mb_srcavail_cancel(sk, 0);
124
mb_entail(sk, ssk, mb);
125
126
sdp_post_sends(ssk, 0);
127
128
schedule_delayed_work(&ssk->srcavail_cancel_work,
129
SDP_SRCAVAIL_CANCEL_TIMEOUT);
130
131
return 0;
132
}
133
134
void srcavail_cancel_timeout(struct work_struct *work)
135
{
136
struct sdp_sock *ssk =
137
container_of(work, struct sdp_sock, srcavail_cancel_work.work);
138
struct socket *sk = ssk->socket;
139
140
lock_sock(sk);
141
142
sdp_dbg_data(sk, "both SrcAvail and SrcAvailCancel timedout."
143
" closing connection\n");
144
sdp_set_error(sk, -ECONNRESET);
145
wake_up(&ssk->wq);
146
147
release_sock(sk);
148
}
149
150
static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p,
151
int ignore_signals)
152
{
153
struct socket *sk = ssk->socket;
154
int err = 0;
155
long vm_wait = 0;
156
long current_timeo = *timeo_p;
157
struct tx_srcavail_state *tx_sa = ssk->tx_sa;
158
DEFINE_WAIT(wait);
159
160
sdp_dbg_data(sk, "sleep till RdmaRdCompl. timeo = %ld.\n", *timeo_p);
161
sdp_prf1(sk, NULL, "Going to sleep");
162
while (ssk->qp_active) {
163
prepare_to_wait(sk->sk_sleep, &wait, TASK_INTERRUPTIBLE);
164
165
if (unlikely(!*timeo_p)) {
166
err = -ETIME;
167
tx_sa->abort_flags |= TX_SA_TIMEDOUT;
168
sdp_prf1(sk, NULL, "timeout");
169
SDPSTATS_COUNTER_INC(zcopy_tx_timeout);
170
break;
171
}
172
173
else if (tx_sa->bytes_acked > tx_sa->bytes_sent) {
174
err = -EINVAL;
175
sdp_dbg_data(sk, "acked bytes > sent bytes\n");
176
tx_sa->abort_flags |= TX_SA_ERROR;
177
break;
178
}
179
180
if (tx_sa->abort_flags & TX_SA_SENDSM) {
181
sdp_prf1(sk, NULL, "Aborting SrcAvail sending");
182
SDPSTATS_COUNTER_INC(zcopy_tx_aborted);
183
err = -EAGAIN;
184
break ;
185
}
186
187
if (!ignore_signals) {
188
if (signal_pending(current)) {
189
err = -EINTR;
190
sdp_prf1(sk, NULL, "signalled");
191
tx_sa->abort_flags |= TX_SA_INTRRUPTED;
192
break;
193
}
194
195
if (ssk->rx_sa && (tx_sa->bytes_acked < tx_sa->bytes_sent)) {
196
sdp_dbg_data(sk, "Crossing SrcAvail - aborting this\n");
197
tx_sa->abort_flags |= TX_SA_CROSS_SEND;
198
SDPSTATS_COUNTER_INC(zcopy_cross_send);
199
err = -ETIME;
200
break ;
201
}
202
}
203
204
posts_handler_put(ssk);
205
206
sk_wait_event(sk, &current_timeo,
207
tx_sa->abort_flags &&
208
ssk->rx_sa &&
209
(tx_sa->bytes_acked < tx_sa->bytes_sent) &&
210
vm_wait);
211
sdp_dbg_data(ssk->socket, "woke up sleepers\n");
212
213
posts_handler_get(ssk);
214
215
if (tx_sa->bytes_acked == tx_sa->bytes_sent)
216
break;
217
218
if (vm_wait) {
219
vm_wait -= current_timeo;
220
current_timeo = *timeo_p;
221
if (current_timeo != MAX_SCHEDULE_TIMEOUT &&
222
(current_timeo -= vm_wait) < 0)
223
current_timeo = 0;
224
vm_wait = 0;
225
}
226
*timeo_p = current_timeo;
227
}
228
229
finish_wait(sk->sk_sleep, &wait);
230
231
sdp_dbg_data(sk, "Finished waiting - RdmaRdCompl: %d/%d bytes, flags: 0x%x\n",
232
tx_sa->bytes_acked, tx_sa->bytes_sent, tx_sa->abort_flags);
233
234
if (!ssk->qp_active) {
235
sdp_dbg(sk, "QP destroyed while waiting\n");
236
return -EINVAL;
237
}
238
return err;
239
}
240
241
static void sdp_wait_rdma_wr_finished(struct sdp_sock *ssk)
242
{
243
struct socket *sk = ssk->socket;
244
long timeo = HZ * 5; /* Timeout for RDMA read */
245
DEFINE_WAIT(wait);
246
247
sdp_dbg_data(sk, "Sleep till RDMA wr finished.\n");
248
while (1) {
249
prepare_to_wait(sk->sk_sleep, &wait, TASK_UNINTERRUPTIBLE);
250
251
if (!ssk->tx_ring.rdma_inflight->busy) {
252
sdp_dbg_data(sk, "got rdma cqe\n");
253
break;
254
}
255
256
if (!ssk->qp_active) {
257
sdp_dbg_data(sk, "QP destroyed\n");
258
break;
259
}
260
261
if (!timeo) {
262
sdp_warn(sk, "Panic: Timed out waiting for RDMA read\n");
263
WARN_ON(1);
264
break;
265
}
266
267
posts_handler_put(ssk);
268
269
sdp_prf1(sk, NULL, "Going to sleep");
270
sk_wait_event(sk, &timeo,
271
!ssk->tx_ring.rdma_inflight->busy);
272
sdp_prf1(sk, NULL, "Woke up");
273
sdp_dbg_data(ssk->socket, "woke up sleepers\n");
274
275
posts_handler_get(ssk);
276
}
277
278
finish_wait(sk->sk_sleep, &wait);
279
280
sdp_dbg_data(sk, "Finished waiting\n");
281
}
282
283
int sdp_post_rdma_rd_compl(struct sdp_sock *ssk,
284
struct rx_srcavail_state *rx_sa)
285
{
286
struct mbuf *mb;
287
int copied = rx_sa->used - rx_sa->reported;
288
289
if (rx_sa->used <= rx_sa->reported)
290
return 0;
291
292
mb = sdp_alloc_mb_rdmardcompl(ssk->socket, copied, 0);
293
294
rx_sa->reported += copied;
295
296
/* TODO: What if no tx_credits available? */
297
sdp_post_send(ssk, mb);
298
299
return 0;
300
}
301
302
int sdp_post_sendsm(struct socket *sk)
303
{
304
struct mbuf *mb = sdp_alloc_mb_sendsm(sk, 0);
305
306
sdp_post_send(sdp_sk(sk), mb);
307
308
return 0;
309
}
310
311
static int sdp_update_iov_used(struct socket *sk, struct iovec *iov, int len)
312
{
313
sdp_dbg_data(sk, "updating consumed 0x%x bytes from iov\n", len);
314
while (len > 0) {
315
if (iov->iov_len) {
316
int copy = min_t(unsigned int, iov->iov_len, len);
317
len -= copy;
318
iov->iov_len -= copy;
319
iov->iov_base += copy;
320
}
321
iov++;
322
}
323
324
return 0;
325
}
326
327
static inline int sge_bytes(struct ib_sge *sge, int sge_cnt)
328
{
329
int bytes = 0;
330
331
while (sge_cnt > 0) {
332
bytes += sge->length;
333
sge++;
334
sge_cnt--;
335
}
336
337
return bytes;
338
}
339
void sdp_handle_sendsm(struct sdp_sock *ssk, u32 mseq_ack)
340
{
341
struct socket *sk = ssk->socket;
342
unsigned long flags;
343
344
spin_lock_irqsave(&ssk->tx_sa_lock, flags);
345
346
if (!ssk->tx_sa) {
347
sdp_prf1(sk, NULL, "SendSM for cancelled/finished SrcAvail");
348
goto out;
349
}
350
351
if (ssk->tx_sa->mseq > mseq_ack) {
352
sdp_dbg_data(sk, "SendSM arrived for old SrcAvail. "
353
"SendSM mseq_ack: 0x%x, SrcAvail mseq: 0x%x\n",
354
mseq_ack, ssk->tx_sa->mseq);
355
goto out;
356
}
357
358
sdp_dbg_data(sk, "Got SendSM - aborting SrcAvail\n");
359
360
ssk->tx_sa->abort_flags |= TX_SA_SENDSM;
361
cancel_delayed_work(&ssk->srcavail_cancel_work);
362
363
wake_up(sk->sk_sleep);
364
sdp_dbg_data(sk, "woke up sleepers\n");
365
366
out:
367
spin_unlock_irqrestore(&ssk->tx_sa_lock, flags);
368
}
369
370
void sdp_handle_rdma_read_compl(struct sdp_sock *ssk, u32 mseq_ack,
371
u32 bytes_completed)
372
{
373
struct socket *sk = ssk->socket;
374
unsigned long flags;
375
376
sdp_prf1(sk, NULL, "RdmaRdCompl ssk=%p tx_sa=%p", ssk, ssk->tx_sa);
377
sdp_dbg_data(sk, "RdmaRdCompl ssk=%p tx_sa=%p\n", ssk, ssk->tx_sa);
378
379
spin_lock_irqsave(&ssk->tx_sa_lock, flags);
380
381
BUG_ON(!ssk);
382
383
if (!ssk->tx_sa) {
384
sdp_dbg_data(sk, "Got RdmaRdCompl for aborted SrcAvail\n");
385
goto out;
386
}
387
388
if (ssk->tx_sa->mseq > mseq_ack) {
389
sdp_dbg_data(sk, "RdmaRdCompl arrived for old SrcAvail. "
390
"SendSM mseq_ack: 0x%x, SrcAvail mseq: 0x%x\n",
391
mseq_ack, ssk->tx_sa->mseq);
392
goto out;
393
}
394
395
ssk->tx_sa->bytes_acked += bytes_completed;
396
397
wake_up(sk->sk_sleep);
398
sdp_dbg_data(sk, "woke up sleepers\n");
399
400
out:
401
spin_unlock_irqrestore(&ssk->tx_sa_lock, flags);
402
return;
403
}
404
405
static unsigned long sdp_get_max_memlockable_bytes(unsigned long offset)
406
{
407
unsigned long avail;
408
unsigned long lock_limit;
409
410
if (capable(CAP_IPC_LOCK))
411
return ULONG_MAX;
412
413
lock_limit = current->signal->rlim[RLIMIT_MEMLOCK].rlim_cur;
414
avail = lock_limit - (current->mm->locked_vm << PAGE_SHIFT);
415
416
return avail - offset;
417
}
418
419
static int sdp_alloc_fmr(struct socket *sk, void *uaddr, size_t len,
420
struct ib_pool_fmr **_fmr, struct ib_umem **_umem)
421
{
422
struct ib_pool_fmr *fmr;
423
struct ib_umem *umem;
424
struct ib_device *dev;
425
u64 *pages;
426
struct ib_umem_chunk *chunk;
427
int n, j, k;
428
int rc = 0;
429
unsigned long max_lockable_bytes;
430
431
if (unlikely(len > SDP_MAX_RDMA_READ_LEN)) {
432
sdp_dbg_data(sk, "len:0x%lx > FMR_SIZE: 0x%lx\n",
433
len, SDP_MAX_RDMA_READ_LEN);
434
len = SDP_MAX_RDMA_READ_LEN;
435
}
436
437
max_lockable_bytes = sdp_get_max_memlockable_bytes((unsigned long)uaddr & ~PAGE_MASK);
438
if (unlikely(len > max_lockable_bytes)) {
439
sdp_dbg_data(sk, "len:0x%lx > RLIMIT_MEMLOCK available: 0x%lx\n",
440
len, max_lockable_bytes);
441
len = max_lockable_bytes;
442
}
443
444
sdp_dbg_data(sk, "user buf: %p, len:0x%lx max_lockable_bytes: 0x%lx\n",
445
uaddr, len, max_lockable_bytes);
446
447
umem = ib_umem_get(&sdp_sk(sk)->context, (unsigned long)uaddr, len,
448
IB_ACCESS_REMOTE_WRITE, 0);
449
450
if (IS_ERR(umem)) {
451
rc = PTR_ERR(umem);
452
sdp_warn(sk, "Error doing umem_get 0x%lx bytes: %d\n", len, rc);
453
sdp_warn(sk, "RLIMIT_MEMLOCK: 0x%lx[cur] 0x%lx[max] CAP_IPC_LOCK: %d\n",
454
current->signal->rlim[RLIMIT_MEMLOCK].rlim_cur,
455
current->signal->rlim[RLIMIT_MEMLOCK].rlim_max,
456
capable(CAP_IPC_LOCK));
457
goto err_umem_get;
458
}
459
460
sdp_dbg_data(sk, "umem->offset = 0x%x, length = 0x%lx\n",
461
umem->offset, umem->length);
462
463
pages = (u64 *) __get_free_page(GFP_KERNEL);
464
if (!pages)
465
goto err_pages_alloc;
466
467
n = 0;
468
469
dev = sdp_sk(sk)->ib_device;
470
list_for_each_entry(chunk, &umem->chunk_list, list) {
471
for (j = 0; j < chunk->nmap; ++j) {
472
len = ib_sg_dma_len(dev,
473
&chunk->page_list[j]) >> PAGE_SHIFT;
474
475
for (k = 0; k < len; ++k) {
476
pages[n++] = ib_sg_dma_address(dev,
477
&chunk->page_list[j]) +
478
umem->page_size * k;
479
480
}
481
}
482
}
483
484
fmr = ib_fmr_pool_map_phys(sdp_sk(sk)->sdp_dev->fmr_pool, pages, n, 0);
485
if (IS_ERR(fmr)) {
486
sdp_warn(sk, "Error allocating fmr: %ld\n", PTR_ERR(fmr));
487
goto err_fmr_alloc;
488
}
489
490
free_page((unsigned long) pages);
491
492
*_umem = umem;
493
*_fmr = fmr;
494
495
return 0;
496
497
err_fmr_alloc:
498
free_page((unsigned long) pages);
499
500
err_pages_alloc:
501
ib_umem_release(umem);
502
503
err_umem_get:
504
505
return rc;
506
}
507
508
void sdp_free_fmr(struct socket *sk, struct ib_pool_fmr **_fmr, struct ib_umem **_umem)
509
{
510
if (!sdp_sk(sk)->qp_active)
511
return;
512
513
ib_fmr_pool_unmap(*_fmr);
514
*_fmr = NULL;
515
516
ib_umem_release(*_umem);
517
*_umem = NULL;
518
}
519
520
static int sdp_post_rdma_read(struct socket *sk, struct rx_srcavail_state *rx_sa)
521
{
522
struct sdp_sock *ssk = sdp_sk(sk);
523
struct ib_send_wr *bad_wr;
524
struct ib_send_wr wr = { NULL };
525
struct ib_sge sge;
526
527
wr.opcode = IB_WR_RDMA_READ;
528
wr.next = NULL;
529
wr.wr_id = SDP_OP_RDMA;
530
wr.wr.rdma.rkey = rx_sa->rkey;
531
wr.send_flags = 0;
532
533
ssk->tx_ring.rdma_inflight = rx_sa;
534
535
sge.addr = rx_sa->umem->offset;
536
sge.length = rx_sa->umem->length;
537
sge.lkey = rx_sa->fmr->fmr->lkey;
538
539
wr.wr.rdma.remote_addr = rx_sa->vaddr + rx_sa->used;
540
wr.num_sge = 1;
541
wr.sg_list = &sge;
542
rx_sa->busy++;
543
544
wr.send_flags = IB_SEND_SIGNALED;
545
546
return ib_post_send(ssk->qp, &wr, &bad_wr);
547
}
548
549
int sdp_rdma_to_iovec(struct socket *sk, struct iovec *iov, struct mbuf *mb,
550
unsigned long *used)
551
{
552
struct sdp_sock *ssk = sdp_sk(sk);
553
struct rx_srcavail_state *rx_sa = RX_SRCAVAIL_STATE(mb);
554
int got_srcavail_cancel;
555
int rc = 0;
556
int len = *used;
557
int copied;
558
559
sdp_dbg_data(ssk->socket, "preparing RDMA read."
560
" len: 0x%x. buffer len: 0x%lx\n", len, iov->iov_len);
561
562
sock_hold(sk, SOCK_REF_RDMA_RD);
563
564
if (len > rx_sa->len) {
565
sdp_warn(sk, "len:0x%x > rx_sa->len: 0x%x\n", len, rx_sa->len);
566
WARN_ON(1);
567
len = rx_sa->len;
568
}
569
570
rc = sdp_alloc_fmr(sk, iov->iov_base, len, &rx_sa->fmr, &rx_sa->umem);
571
if (rc) {
572
sdp_warn(sk, "Error allocating fmr: %d\n", rc);
573
goto err_alloc_fmr;
574
}
575
576
rc = sdp_post_rdma_read(sk, rx_sa);
577
if (unlikely(rc)) {
578
sdp_warn(sk, "ib_post_send failed with status %d.\n", rc);
579
sdp_set_error(ssk->socket, -ECONNRESET);
580
wake_up(&ssk->wq);
581
goto err_post_send;
582
}
583
584
sdp_prf(sk, mb, "Finished posting(rc=%d), now to wait", rc);
585
586
got_srcavail_cancel = ssk->srcavail_cancel_mseq > rx_sa->mseq;
587
588
sdp_arm_tx_cq(sk);
589
590
sdp_wait_rdma_wr_finished(ssk);
591
592
sdp_prf(sk, mb, "Finished waiting(rc=%d)", rc);
593
if (!ssk->qp_active) {
594
sdp_dbg_data(sk, "QP destroyed during RDMA read\n");
595
rc = -EPIPE;
596
goto err_post_send;
597
}
598
599
copied = rx_sa->umem->length;
600
601
sdp_update_iov_used(sk, iov, copied);
602
rx_sa->used += copied;
603
atomic_add(copied, &ssk->rcv_nxt);
604
*used = copied;
605
606
ssk->tx_ring.rdma_inflight = NULL;
607
608
err_post_send:
609
sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem);
610
611
err_alloc_fmr:
612
if (rc && ssk->qp_active) {
613
sdp_warn(sk, "Couldn't do RDMA - post sendsm\n");
614
rx_sa->flags |= RX_SA_ABORTED;
615
}
616
617
sock_put(sk, SOCK_REF_RDMA_RD);
618
619
return rc;
620
}
621
622
static inline int wait_for_sndbuf(struct socket *sk, long *timeo_p)
623
{
624
struct sdp_sock *ssk = sdp_sk(sk);
625
int ret = 0;
626
int credits_needed = 1;
627
628
sdp_dbg_data(sk, "Wait for mem\n");
629
630
set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
631
632
SDPSTATS_COUNTER_INC(send_wait_for_mem);
633
634
sdp_do_posts(ssk);
635
636
sdp_xmit_poll(ssk, 1);
637
638
ret = sdp_tx_wait_memory(ssk, timeo_p, &credits_needed);
639
640
return ret;
641
}
642
643
static int do_sdp_sendmsg_zcopy(struct socket *sk, struct tx_srcavail_state *tx_sa,
644
struct iovec *iov, long *timeo)
645
{
646
struct sdp_sock *ssk = sdp_sk(sk);
647
int rc = 0;
648
unsigned long lock_flags;
649
650
rc = sdp_alloc_fmr(sk, iov->iov_base, iov->iov_len,
651
&tx_sa->fmr, &tx_sa->umem);
652
if (rc) {
653
sdp_warn(sk, "Error allocating fmr: %d\n", rc);
654
goto err_alloc_fmr;
655
}
656
657
if (tx_slots_free(ssk) == 0) {
658
rc = wait_for_sndbuf(sk, timeo);
659
if (rc) {
660
sdp_warn(sk, "Couldn't get send buffer\n");
661
goto err_no_tx_slots;
662
}
663
}
664
665
rc = sdp_post_srcavail(sk, tx_sa);
666
if (rc) {
667
sdp_dbg(sk, "Error posting SrcAvail\n");
668
goto err_abort_send;
669
}
670
671
rc = sdp_wait_rdmardcompl(ssk, timeo, 0);
672
if (unlikely(rc)) {
673
enum tx_sa_flag f = tx_sa->abort_flags;
674
675
if (f & TX_SA_SENDSM) {
676
sdp_dbg_data(sk, "Got SendSM. use SEND verb.\n");
677
} else if (f & TX_SA_ERROR) {
678
sdp_dbg_data(sk, "SrcAvail error completion\n");
679
sdp_reset(sk);
680
SDPSTATS_COUNTER_INC(zcopy_tx_error);
681
} else if (ssk->qp_active) {
682
sdp_post_srcavail_cancel(sk);
683
684
/* Wait for RdmaRdCompl/SendSM to
685
* finish the transaction */
686
*timeo = 2 * HZ;
687
sdp_dbg_data(sk, "Waiting for SendSM\n");
688
sdp_wait_rdmardcompl(ssk, timeo, 1);
689
sdp_dbg_data(sk, "finished waiting\n");
690
691
cancel_delayed_work(&ssk->srcavail_cancel_work);
692
} else {
693
sdp_dbg_data(sk, "QP was destroyed while waiting\n");
694
}
695
} else {
696
sdp_dbg_data(sk, "got RdmaRdCompl\n");
697
}
698
699
spin_lock_irqsave(&ssk->tx_sa_lock, lock_flags);
700
ssk->tx_sa = NULL;
701
spin_unlock_irqrestore(&ssk->tx_sa_lock, lock_flags);
702
703
err_abort_send:
704
sdp_update_iov_used(sk, iov, tx_sa->bytes_acked);
705
706
err_no_tx_slots:
707
sdp_free_fmr(sk, &tx_sa->fmr, &tx_sa->umem);
708
709
err_alloc_fmr:
710
return rc;
711
}
712
713
int sdp_sendmsg_zcopy(struct kiocb *iocb, struct socket *sk, struct iovec *iov)
714
{
715
struct sdp_sock *ssk = sdp_sk(sk);
716
int rc = 0;
717
long timeo;
718
struct tx_srcavail_state *tx_sa;
719
int offset;
720
size_t bytes_to_copy = 0;
721
int copied = 0;
722
723
sdp_dbg_data(sk, "Sending iov: %p, iov_len: 0x%lx\n",
724
iov->iov_base, iov->iov_len);
725
sdp_prf1(sk, NULL, "sdp_sendmsg_zcopy start");
726
if (ssk->rx_sa) {
727
sdp_dbg_data(sk, "Deadlock prevent: crossing SrcAvail\n");
728
return 0;
729
}
730
731
sock_hold(ssk->socket, SOCK_REF_ZCOPY);
732
733
SDPSTATS_COUNTER_INC(sendmsg_zcopy_segment);
734
735
timeo = SDP_SRCAVAIL_ADV_TIMEOUT ;
736
737
/* Ok commence sending. */
738
offset = (unsigned long)iov->iov_base & (PAGE_SIZE - 1);
739
740
tx_sa = kmalloc(sizeof(struct tx_srcavail_state), GFP_KERNEL);
741
if (!tx_sa) {
742
sdp_warn(sk, "Error allocating zcopy context\n");
743
rc = -EAGAIN; /* Buffer too big - fallback to bcopy */
744
goto err_alloc_tx_sa;
745
}
746
747
bytes_to_copy = iov->iov_len;
748
do {
749
tx_sa_reset(tx_sa);
750
751
rc = do_sdp_sendmsg_zcopy(sk, tx_sa, iov, &timeo);
752
753
if (iov->iov_len && iov->iov_len < sdp_zcopy_thresh) {
754
sdp_dbg_data(sk, "0x%lx bytes left, switching to bcopy\n",
755
iov->iov_len);
756
break;
757
}
758
} while (!rc && iov->iov_len > 0 && !tx_sa->abort_flags);
759
760
kfree(tx_sa);
761
err_alloc_tx_sa:
762
copied = bytes_to_copy - iov->iov_len;
763
764
sdp_prf1(sk, NULL, "sdp_sendmsg_zcopy end rc: %d copied: %d", rc, copied);
765
766
sock_put(ssk->socket, SOCK_REF_ZCOPY);
767
768
if (rc < 0 && rc != -EAGAIN && rc != -ETIME)
769
return rc;
770
771
return copied;
772
}
773
774
void sdp_abort_srcavail(struct socket *sk)
775
{
776
struct sdp_sock *ssk = sdp_sk(sk);
777
struct tx_srcavail_state *tx_sa = ssk->tx_sa;
778
unsigned long flags;
779
780
if (!tx_sa)
781
return;
782
783
cancel_delayed_work(&ssk->srcavail_cancel_work);
784
flush_scheduled_work();
785
786
spin_lock_irqsave(&ssk->tx_sa_lock, flags);
787
788
sdp_free_fmr(sk, &tx_sa->fmr, &tx_sa->umem);
789
790
ssk->tx_sa = NULL;
791
792
spin_unlock_irqrestore(&ssk->tx_sa_lock, flags);
793
}
794
795
void sdp_abort_rdma_read(struct socket *sk)
796
{
797
struct sdp_sock *ssk = sdp_sk(sk);
798
struct rx_srcavail_state *rx_sa = ssk->rx_sa;
799
800
if (!rx_sa)
801
return;
802
803
sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem);
804
805
ssk->rx_sa = NULL;
806
}
807
808