Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/freebsd-src
Path: blob/main/sys/ofed/drivers/infiniband/ulp/sdp/sdp_rx.c
39566 views
1
/*-
2
* SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0
3
*
4
* Copyright (c) 2009 Mellanox Technologies Ltd. All rights reserved.
5
*
6
* This software is available to you under a choice of one of two
7
* licenses. You may choose to be licensed under the terms of the GNU
8
* General Public License (GPL) Version 2, available from the file
9
* COPYING in the main directory of this source tree, or the
10
* OpenIB.org BSD license below:
11
*
12
* Redistribution and use in source and binary forms, with or
13
* without modification, are permitted provided that the following
14
* conditions are met:
15
*
16
* - Redistributions of source code must retain the above
17
* copyright notice, this list of conditions and the following
18
* disclaimer.
19
*
20
* - Redistributions in binary form must reproduce the above
21
* copyright notice, this list of conditions and the following
22
* disclaimer in the documentation and/or other materials
23
* provided with the distribution.
24
*
25
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
26
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
27
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
28
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
29
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
30
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
31
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
32
* SOFTWARE.
33
*/
34
#include "sdp.h"
35
36
SDP_MODPARAM_INT(rcvbuf_initial_size, 32 * 1024,
37
"Receive buffer initial size in bytes.");
38
SDP_MODPARAM_SINT(rcvbuf_scale, 0x8,
39
"Receive buffer size scale factor.");
40
41
/* Like tcp_fin - called when SDP_MID_DISCONNECT is received */
42
static void
43
sdp_handle_disconn(struct sdp_sock *ssk)
44
{
45
46
sdp_dbg(ssk->socket, "%s\n", __func__);
47
48
SDP_WLOCK_ASSERT(ssk);
49
if (TCPS_HAVERCVDFIN(ssk->state) == 0)
50
socantrcvmore(ssk->socket);
51
52
switch (ssk->state) {
53
case TCPS_SYN_RECEIVED:
54
case TCPS_ESTABLISHED:
55
ssk->state = TCPS_CLOSE_WAIT;
56
break;
57
58
case TCPS_FIN_WAIT_1:
59
/* Received a reply FIN - start Infiniband tear down */
60
sdp_dbg(ssk->socket,
61
"%s: Starting Infiniband tear down sending DREQ\n",
62
__func__);
63
64
sdp_cancel_dreq_wait_timeout(ssk);
65
ssk->qp_active = 0;
66
if (ssk->id) {
67
struct rdma_cm_id *id;
68
69
id = ssk->id;
70
SDP_WUNLOCK(ssk);
71
rdma_disconnect(id);
72
SDP_WLOCK(ssk);
73
} else {
74
sdp_warn(ssk->socket,
75
"%s: ssk->id is NULL\n", __func__);
76
return;
77
}
78
break;
79
case TCPS_TIME_WAIT:
80
/* This is a mutual close situation and we've got the DREQ from
81
the peer before the SDP_MID_DISCONNECT */
82
break;
83
case TCPS_CLOSED:
84
/* FIN arrived after IB teardown started - do nothing */
85
sdp_dbg(ssk->socket, "%s: fin in state %s\n",
86
__func__, sdp_state_str(ssk->state));
87
return;
88
default:
89
sdp_warn(ssk->socket,
90
"%s: FIN in unexpected state. state=%d\n",
91
__func__, ssk->state);
92
break;
93
}
94
}
95
96
static int
97
sdp_post_recv(struct sdp_sock *ssk)
98
{
99
struct sdp_buf *rx_req;
100
int i, rc;
101
u64 addr;
102
struct ib_device *dev;
103
struct ib_recv_wr rx_wr = { NULL };
104
struct ib_sge ibsge[SDP_MAX_RECV_SGES];
105
struct ib_sge *sge = ibsge;
106
const struct ib_recv_wr *bad_wr;
107
struct mbuf *mb, *m;
108
struct sdp_bsdh *h;
109
int id = ring_head(ssk->rx_ring);
110
111
/* Now, allocate and repost recv */
112
sdp_prf(ssk->socket, mb, "Posting mb");
113
mb = m_getm2(NULL, ssk->recv_bytes, M_NOWAIT, MT_DATA, M_PKTHDR);
114
if (mb == NULL) {
115
/* Retry so we can't stall out with no memory. */
116
if (!rx_ring_posted(ssk))
117
queue_work(rx_comp_wq, &ssk->rx_comp_work);
118
return -1;
119
}
120
for (m = mb; m != NULL; m = m->m_next) {
121
m->m_len = M_SIZE(m);
122
mb->m_pkthdr.len += m->m_len;
123
}
124
h = mtod(mb, struct sdp_bsdh *);
125
rx_req = ssk->rx_ring.buffer + (id & (SDP_RX_SIZE - 1));
126
rx_req->mb = mb;
127
dev = ssk->ib_device;
128
for (i = 0; mb != NULL; i++, mb = mb->m_next, sge++) {
129
addr = ib_dma_map_single(dev, mb->m_data, mb->m_len,
130
DMA_TO_DEVICE);
131
/* TODO: proper error handling */
132
BUG_ON(ib_dma_mapping_error(dev, addr));
133
BUG_ON(i >= SDP_MAX_RECV_SGES);
134
rx_req->mapping[i] = addr;
135
sge->addr = addr;
136
sge->length = mb->m_len;
137
sge->lkey = ssk->sdp_dev->pd->local_dma_lkey;
138
}
139
140
rx_wr.next = NULL;
141
rx_wr.wr_id = id | SDP_OP_RECV;
142
rx_wr.sg_list = ibsge;
143
rx_wr.num_sge = i;
144
rc = ib_post_recv(ssk->qp, &rx_wr, &bad_wr);
145
if (unlikely(rc)) {
146
sdp_warn(ssk->socket, "ib_post_recv failed. status %d\n", rc);
147
148
sdp_cleanup_sdp_buf(ssk, rx_req, DMA_FROM_DEVICE);
149
m_freem(mb);
150
151
sdp_notify(ssk, ECONNRESET);
152
153
return -1;
154
}
155
156
atomic_inc(&ssk->rx_ring.head);
157
SDPSTATS_COUNTER_INC(post_recv);
158
159
return 0;
160
}
161
162
static inline int
163
sdp_post_recvs_needed(struct sdp_sock *ssk)
164
{
165
unsigned long bytes_in_process;
166
unsigned long max_bytes;
167
int buffer_size;
168
int posted;
169
170
if (!ssk->qp_active || !ssk->socket)
171
return 0;
172
173
posted = rx_ring_posted(ssk);
174
if (posted >= SDP_RX_SIZE)
175
return 0;
176
if (posted < SDP_MIN_TX_CREDITS)
177
return 1;
178
179
buffer_size = ssk->recv_bytes;
180
max_bytes = max(ssk->socket->so_rcv.sb_hiwat,
181
(1 + SDP_MIN_TX_CREDITS) * buffer_size);
182
max_bytes *= rcvbuf_scale;
183
/*
184
* Compute bytes in the receive queue and socket buffer.
185
*/
186
bytes_in_process = (posted - SDP_MIN_TX_CREDITS) * buffer_size;
187
bytes_in_process += sbused(&ssk->socket->so_rcv);
188
189
return bytes_in_process < max_bytes;
190
}
191
192
static inline void
193
sdp_post_recvs(struct sdp_sock *ssk)
194
{
195
196
while (sdp_post_recvs_needed(ssk))
197
if (sdp_post_recv(ssk))
198
return;
199
}
200
201
static inline struct mbuf *
202
sdp_sock_queue_rcv_mb(struct socket *sk, struct mbuf *mb)
203
{
204
struct sdp_sock *ssk = sdp_sk(sk);
205
struct sdp_bsdh *h;
206
207
h = mtod(mb, struct sdp_bsdh *);
208
209
#ifdef SDP_ZCOPY
210
SDP_SKB_CB(mb)->seq = rcv_nxt(ssk);
211
if (h->mid == SDP_MID_SRCAVAIL) {
212
struct sdp_srcah *srcah = (struct sdp_srcah *)(h+1);
213
struct rx_srcavail_state *rx_sa;
214
215
ssk->srcavail_cancel_mseq = 0;
216
217
ssk->rx_sa = rx_sa = RX_SRCAVAIL_STATE(mb) = kzalloc(
218
sizeof(struct rx_srcavail_state), M_NOWAIT);
219
220
rx_sa->mseq = ntohl(h->mseq);
221
rx_sa->used = 0;
222
rx_sa->len = mb_len = ntohl(srcah->len);
223
rx_sa->rkey = ntohl(srcah->rkey);
224
rx_sa->vaddr = be64_to_cpu(srcah->vaddr);
225
rx_sa->flags = 0;
226
227
if (ssk->tx_sa) {
228
sdp_dbg_data(ssk->socket, "got RX SrcAvail while waiting "
229
"for TX SrcAvail. waking up TX SrcAvail"
230
"to be aborted\n");
231
wake_up(sk->sk_sleep);
232
}
233
234
atomic_add(mb->len, &ssk->rcv_nxt);
235
sdp_dbg_data(sk, "queueing SrcAvail. mb_len = %d vaddr = %lld\n",
236
mb_len, rx_sa->vaddr);
237
} else
238
#endif
239
{
240
atomic_add(mb->m_pkthdr.len, &ssk->rcv_nxt);
241
}
242
243
m_adj(mb, SDP_HEAD_SIZE);
244
SOCKBUF_LOCK(&sk->so_rcv);
245
if (unlikely(h->flags & SDP_OOB_PRES))
246
sdp_urg(ssk, mb);
247
sbappend_locked(&sk->so_rcv, mb, 0);
248
sorwakeup_locked(sk);
249
return mb;
250
}
251
252
static int
253
sdp_get_recv_bytes(struct sdp_sock *ssk, u32 new_size)
254
{
255
256
return MIN(new_size, SDP_MAX_PACKET);
257
}
258
259
int
260
sdp_init_buffers(struct sdp_sock *ssk, u32 new_size)
261
{
262
263
ssk->recv_bytes = sdp_get_recv_bytes(ssk, new_size);
264
sdp_post_recvs(ssk);
265
266
return 0;
267
}
268
269
int
270
sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size)
271
{
272
u32 curr_size = ssk->recv_bytes;
273
u32 max_size = SDP_MAX_PACKET;
274
275
if (new_size > curr_size && new_size <= max_size) {
276
ssk->recv_bytes = sdp_get_recv_bytes(ssk, new_size);
277
return 0;
278
}
279
return -1;
280
}
281
282
static void
283
sdp_handle_resize_request(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
284
{
285
if (sdp_resize_buffers(ssk, ntohl(buf->size)) == 0)
286
ssk->recv_request_head = ring_head(ssk->rx_ring) + 1;
287
else
288
ssk->recv_request_head = ring_tail(ssk->rx_ring);
289
ssk->recv_request = 1;
290
}
291
292
static void
293
sdp_handle_resize_ack(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
294
{
295
u32 new_size = ntohl(buf->size);
296
297
if (new_size > ssk->xmit_size_goal)
298
ssk->xmit_size_goal = new_size;
299
}
300
301
static struct mbuf *
302
sdp_recv_completion(struct sdp_sock *ssk, int id)
303
{
304
struct sdp_buf *rx_req;
305
struct ib_device *dev;
306
struct mbuf *mb;
307
308
if (unlikely(id != ring_tail(ssk->rx_ring))) {
309
printk(KERN_WARNING "Bogus recv completion id %d tail %d\n",
310
id, ring_tail(ssk->rx_ring));
311
return NULL;
312
}
313
314
dev = ssk->ib_device;
315
rx_req = &ssk->rx_ring.buffer[id & (SDP_RX_SIZE - 1)];
316
mb = rx_req->mb;
317
sdp_cleanup_sdp_buf(ssk, rx_req, DMA_FROM_DEVICE);
318
319
atomic_inc(&ssk->rx_ring.tail);
320
atomic_dec(&ssk->remote_credits);
321
return mb;
322
}
323
324
static void
325
sdp_process_rx_ctl_mb(struct sdp_sock *ssk, struct mbuf *mb)
326
{
327
struct sdp_bsdh *h;
328
struct socket *sk;
329
330
SDP_WLOCK_ASSERT(ssk);
331
332
sk = ssk->socket;
333
h = mtod(mb, struct sdp_bsdh *);
334
switch (h->mid) {
335
case SDP_MID_DATA:
336
case SDP_MID_SRCAVAIL:
337
sdp_dbg(sk, "DATA after socket rcv was shutdown\n");
338
339
/* got data in RCV_SHUTDOWN */
340
if (ssk->state == TCPS_FIN_WAIT_1) {
341
sdp_dbg(sk, "RX data when state = FIN_WAIT1\n");
342
sdp_notify(ssk, ECONNRESET);
343
}
344
345
break;
346
#ifdef SDP_ZCOPY
347
case SDP_MID_RDMARDCOMPL:
348
break;
349
case SDP_MID_SENDSM:
350
sdp_handle_sendsm(ssk, ntohl(h->mseq_ack));
351
break;
352
case SDP_MID_SRCAVAIL_CANCEL:
353
sdp_dbg_data(sk, "Handling SrcAvailCancel\n");
354
sdp_prf(sk, NULL, "Handling SrcAvailCancel");
355
if (ssk->rx_sa) {
356
ssk->srcavail_cancel_mseq = ntohl(h->mseq);
357
ssk->rx_sa->flags |= RX_SA_ABORTED;
358
ssk->rx_sa = NULL; /* TODO: change it into SDP_MID_DATA and get
359
the dirty logic from recvmsg */
360
} else {
361
sdp_dbg(sk, "Got SrcAvailCancel - "
362
"but no SrcAvail in process\n");
363
}
364
break;
365
case SDP_MID_SINKAVAIL:
366
sdp_dbg_data(sk, "Got SinkAvail - not supported: ignored\n");
367
sdp_prf(sk, NULL, "Got SinkAvail - not supported: ignored");
368
/* FALLTHROUGH */
369
#endif
370
case SDP_MID_ABORT:
371
sdp_dbg_data(sk, "Handling ABORT\n");
372
sdp_prf(sk, NULL, "Handling ABORT");
373
sdp_notify(ssk, ECONNRESET);
374
break;
375
case SDP_MID_DISCONN:
376
sdp_dbg_data(sk, "Handling DISCONN\n");
377
sdp_prf(sk, NULL, "Handling DISCONN");
378
sdp_handle_disconn(ssk);
379
break;
380
case SDP_MID_CHRCVBUF:
381
sdp_dbg_data(sk, "Handling RX CHRCVBUF\n");
382
sdp_handle_resize_request(ssk, (struct sdp_chrecvbuf *)(h+1));
383
break;
384
case SDP_MID_CHRCVBUF_ACK:
385
sdp_dbg_data(sk, "Handling RX CHRCVBUF_ACK\n");
386
sdp_handle_resize_ack(ssk, (struct sdp_chrecvbuf *)(h+1));
387
break;
388
default:
389
/* TODO: Handle other messages */
390
sdp_warn(sk, "SDP: FIXME MID %d\n", h->mid);
391
break;
392
}
393
m_freem(mb);
394
}
395
396
static int
397
sdp_process_rx_mb(struct sdp_sock *ssk, struct mbuf *mb)
398
{
399
struct socket *sk;
400
struct sdp_bsdh *h;
401
unsigned long mseq_ack;
402
int credits_before;
403
404
h = mtod(mb, struct sdp_bsdh *);
405
sk = ssk->socket;
406
/*
407
* If another thread is in so_pcbfree this may be partially torn
408
* down but no further synchronization is required as the destroying
409
* thread will wait for receive to shutdown before discarding the
410
* socket.
411
*/
412
if (sk == NULL) {
413
m_freem(mb);
414
return 0;
415
}
416
417
SDPSTATS_HIST_LINEAR(credits_before_update, tx_credits(ssk));
418
419
mseq_ack = ntohl(h->mseq_ack);
420
credits_before = tx_credits(ssk);
421
atomic_set(&ssk->tx_ring.credits, mseq_ack - ring_head(ssk->tx_ring) +
422
1 + ntohs(h->bufs));
423
if (mseq_ack >= ssk->nagle_last_unacked)
424
ssk->nagle_last_unacked = 0;
425
426
sdp_prf1(ssk->socket, mb, "RX %s +%d c:%d->%d mseq:%d ack:%d\n",
427
mid2str(h->mid), ntohs(h->bufs), credits_before,
428
tx_credits(ssk), ntohl(h->mseq), ntohl(h->mseq_ack));
429
430
if (unlikely(h->mid == SDP_MID_DATA &&
431
mb->m_pkthdr.len == SDP_HEAD_SIZE)) {
432
/* Credit update is valid even after RCV_SHUTDOWN */
433
m_freem(mb);
434
return 0;
435
}
436
437
if ((h->mid != SDP_MID_DATA && h->mid != SDP_MID_SRCAVAIL) ||
438
TCPS_HAVERCVDFIN(ssk->state)) {
439
sdp_prf(sk, NULL, "Control mb - queing to control queue");
440
#ifdef SDP_ZCOPY
441
if (h->mid == SDP_MID_SRCAVAIL_CANCEL) {
442
sdp_dbg_data(sk, "Got SrcAvailCancel. "
443
"seq: 0x%d seq_ack: 0x%d\n",
444
ntohl(h->mseq), ntohl(h->mseq_ack));
445
ssk->srcavail_cancel_mseq = ntohl(h->mseq);
446
}
447
448
449
if (h->mid == SDP_MID_RDMARDCOMPL) {
450
struct sdp_rrch *rrch = (struct sdp_rrch *)(h+1);
451
sdp_dbg_data(sk, "RdmaRdCompl message arrived\n");
452
sdp_handle_rdma_read_compl(ssk, ntohl(h->mseq_ack),
453
ntohl(rrch->len));
454
}
455
#endif
456
if (mbufq_enqueue(&ssk->rxctlq, mb) != 0)
457
m_freem(mb);
458
return (0);
459
}
460
461
sdp_prf1(sk, NULL, "queueing %s mb\n", mid2str(h->mid));
462
mb = sdp_sock_queue_rcv_mb(sk, mb);
463
464
465
return 0;
466
}
467
468
/* called only from irq */
469
static struct mbuf *
470
sdp_process_rx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
471
{
472
struct mbuf *mb;
473
struct sdp_bsdh *h;
474
struct socket *sk = ssk->socket;
475
int mseq;
476
477
mb = sdp_recv_completion(ssk, wc->wr_id);
478
if (unlikely(!mb))
479
return NULL;
480
481
if (unlikely(wc->status)) {
482
if (ssk->qp_active && sk) {
483
sdp_dbg(sk, "Recv completion with error. "
484
"Status %s (%d), vendor: %d\n",
485
ib_wc_status_msg(wc->status), wc->status,
486
wc->vendor_err);
487
sdp_abort(sk);
488
ssk->qp_active = 0;
489
}
490
m_freem(mb);
491
return NULL;
492
}
493
494
sdp_dbg_data(sk, "Recv completion. ID %d Length %d\n",
495
(int)wc->wr_id, wc->byte_len);
496
if (unlikely(wc->byte_len < sizeof(struct sdp_bsdh))) {
497
sdp_warn(sk, "SDP BUG! byte_len %d < %zd\n",
498
wc->byte_len, sizeof(struct sdp_bsdh));
499
m_freem(mb);
500
return NULL;
501
}
502
/* Use m_adj to trim the tail of data we didn't use. */
503
m_adj(mb, -(mb->m_pkthdr.len - wc->byte_len));
504
h = mtod(mb, struct sdp_bsdh *);
505
506
SDP_DUMP_PACKET(ssk->socket, "RX", mb, h);
507
508
ssk->rx_packets++;
509
ssk->rx_bytes += mb->m_pkthdr.len;
510
511
mseq = ntohl(h->mseq);
512
atomic_set(&ssk->mseq_ack, mseq);
513
if (mseq != (int)wc->wr_id)
514
sdp_warn(sk, "SDP BUG! mseq %d != wrid %d\n",
515
mseq, (int)wc->wr_id);
516
517
return mb;
518
}
519
520
/* Wakeup writers if we now have credits. */
521
static void
522
sdp_bzcopy_write_space(struct sdp_sock *ssk)
523
{
524
struct socket *sk = ssk->socket;
525
526
if (tx_credits(ssk) >= ssk->min_bufs && sk)
527
sowwakeup(sk);
528
}
529
530
/* only from interrupt. */
531
static int
532
sdp_poll_rx_cq(struct sdp_sock *ssk)
533
{
534
struct ib_cq *cq = ssk->rx_ring.cq;
535
struct ib_wc ibwc[SDP_NUM_WC];
536
int n, i;
537
int wc_processed = 0;
538
struct mbuf *mb;
539
540
do {
541
n = ib_poll_cq(cq, SDP_NUM_WC, ibwc);
542
for (i = 0; i < n; ++i) {
543
struct ib_wc *wc = &ibwc[i];
544
545
BUG_ON(!(wc->wr_id & SDP_OP_RECV));
546
mb = sdp_process_rx_wc(ssk, wc);
547
if (!mb)
548
continue;
549
550
sdp_process_rx_mb(ssk, mb);
551
wc_processed++;
552
}
553
} while (n == SDP_NUM_WC);
554
555
if (wc_processed)
556
sdp_bzcopy_write_space(ssk);
557
558
return wc_processed;
559
}
560
561
static void
562
sdp_rx_comp_work(struct work_struct *work)
563
{
564
struct sdp_sock *ssk = container_of(work, struct sdp_sock,
565
rx_comp_work);
566
567
sdp_prf(ssk->socket, NULL, "%s", __func__);
568
569
SDP_WLOCK(ssk);
570
if (unlikely(!ssk->qp)) {
571
sdp_prf(ssk->socket, NULL, "qp was destroyed");
572
goto out;
573
}
574
if (unlikely(!ssk->rx_ring.cq)) {
575
sdp_prf(ssk->socket, NULL, "rx_ring.cq is NULL");
576
goto out;
577
}
578
579
if (unlikely(!ssk->poll_cq)) {
580
struct rdma_cm_id *id = ssk->id;
581
if (id && id->qp)
582
rdma_notify(id, IB_EVENT_COMM_EST);
583
goto out;
584
}
585
586
sdp_do_posts(ssk);
587
out:
588
SDP_WUNLOCK(ssk);
589
}
590
591
void
592
sdp_do_posts(struct sdp_sock *ssk)
593
{
594
struct socket *sk = ssk->socket;
595
int xmit_poll_force;
596
struct mbuf *mb;
597
598
SDP_WLOCK_ASSERT(ssk);
599
if (!ssk->qp_active) {
600
sdp_dbg(sk, "QP is deactivated\n");
601
return;
602
}
603
604
while ((mb = mbufq_dequeue(&ssk->rxctlq)) != NULL)
605
sdp_process_rx_ctl_mb(ssk, mb);
606
607
if (ssk->state == TCPS_TIME_WAIT)
608
return;
609
610
if (!ssk->rx_ring.cq || !ssk->tx_ring.cq)
611
return;
612
613
sdp_post_recvs(ssk);
614
615
if (tx_ring_posted(ssk))
616
sdp_xmit_poll(ssk, 1);
617
618
sdp_post_sends(ssk, M_NOWAIT);
619
620
xmit_poll_force = tx_credits(ssk) < SDP_MIN_TX_CREDITS;
621
622
if (credit_update_needed(ssk) || xmit_poll_force) {
623
/* if has pending tx because run out of tx_credits - xmit it */
624
sdp_prf(sk, NULL, "Processing to free pending sends");
625
sdp_xmit_poll(ssk, xmit_poll_force);
626
sdp_prf(sk, NULL, "Sending credit update");
627
sdp_post_sends(ssk, M_NOWAIT);
628
}
629
630
}
631
632
int
633
sdp_process_rx(struct sdp_sock *ssk)
634
{
635
int wc_processed = 0;
636
int credits_before;
637
638
if (!rx_ring_trylock(&ssk->rx_ring)) {
639
sdp_dbg(ssk->socket, "ring destroyed. not polling it\n");
640
return 0;
641
}
642
643
credits_before = tx_credits(ssk);
644
645
wc_processed = sdp_poll_rx_cq(ssk);
646
sdp_prf(ssk->socket, NULL, "processed %d", wc_processed);
647
648
if (wc_processed) {
649
sdp_prf(ssk->socket, NULL, "credits: %d -> %d",
650
credits_before, tx_credits(ssk));
651
queue_work(rx_comp_wq, &ssk->rx_comp_work);
652
}
653
sdp_arm_rx_cq(ssk);
654
655
rx_ring_unlock(&ssk->rx_ring);
656
657
return (wc_processed);
658
}
659
660
static void
661
sdp_rx_irq(struct ib_cq *cq, void *cq_context)
662
{
663
struct sdp_sock *ssk;
664
665
ssk = cq_context;
666
KASSERT(cq == ssk->rx_ring.cq,
667
("%s: mismatched cq on %p", __func__, ssk));
668
669
SDPSTATS_COUNTER_INC(rx_int_count);
670
671
sdp_prf(sk, NULL, "rx irq");
672
673
sdp_process_rx(ssk);
674
}
675
676
static
677
void sdp_rx_ring_purge(struct sdp_sock *ssk)
678
{
679
while (rx_ring_posted(ssk) > 0) {
680
struct mbuf *mb;
681
mb = sdp_recv_completion(ssk, ring_tail(ssk->rx_ring));
682
if (!mb)
683
break;
684
m_freem(mb);
685
}
686
}
687
688
void
689
sdp_rx_ring_init(struct sdp_sock *ssk)
690
{
691
ssk->rx_ring.buffer = NULL;
692
ssk->rx_ring.destroyed = 0;
693
rw_init(&ssk->rx_ring.destroyed_lock, "sdp rx lock");
694
}
695
696
static void
697
sdp_rx_cq_event_handler(struct ib_event *event, void *data)
698
{
699
}
700
701
int
702
sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
703
{
704
struct ib_cq_init_attr rx_cq_attr = {
705
.cqe = SDP_RX_SIZE,
706
.comp_vector = 0,
707
.flags = 0,
708
};
709
struct ib_cq *rx_cq;
710
int rc = 0;
711
712
sdp_dbg(ssk->socket, "rx ring created");
713
INIT_WORK(&ssk->rx_comp_work, sdp_rx_comp_work);
714
atomic_set(&ssk->rx_ring.head, 1);
715
atomic_set(&ssk->rx_ring.tail, 1);
716
717
ssk->rx_ring.buffer = malloc(sizeof(*ssk->rx_ring.buffer) * SDP_RX_SIZE,
718
M_SDP, M_WAITOK);
719
720
rx_cq = ib_create_cq(device, sdp_rx_irq, sdp_rx_cq_event_handler,
721
ssk, &rx_cq_attr);
722
if (IS_ERR(rx_cq)) {
723
rc = PTR_ERR(rx_cq);
724
sdp_warn(ssk->socket, "Unable to allocate RX CQ: %d.\n", rc);
725
goto err_cq;
726
}
727
728
sdp_sk(ssk->socket)->rx_ring.cq = rx_cq;
729
sdp_arm_rx_cq(ssk);
730
731
return 0;
732
733
err_cq:
734
free(ssk->rx_ring.buffer, M_SDP);
735
ssk->rx_ring.buffer = NULL;
736
return rc;
737
}
738
739
void
740
sdp_rx_ring_destroy(struct sdp_sock *ssk)
741
{
742
743
cancel_work_sync(&ssk->rx_comp_work);
744
rx_ring_destroy_lock(&ssk->rx_ring);
745
746
if (ssk->rx_ring.buffer) {
747
sdp_rx_ring_purge(ssk);
748
free(ssk->rx_ring.buffer, M_SDP);
749
ssk->rx_ring.buffer = NULL;
750
}
751
752
if (ssk->rx_ring.cq) {
753
ib_destroy_cq(ssk->rx_ring.cq);
754
ssk->rx_ring.cq = NULL;
755
}
756
757
WARN_ON(ring_head(ssk->rx_ring) != ring_tail(ssk->rx_ring));
758
}
759
760