Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/freebsd-src
Path: blob/main/sys/ofed/drivers/infiniband/ulp/sdp/sdp_tx.c
39566 views
1
/*-
2
* SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0
3
*
4
* Copyright (c) 2009 Mellanox Technologies Ltd. All rights reserved.
5
*
6
* This software is available to you under a choice of one of two
7
* licenses. You may choose to be licensed under the terms of the GNU
8
* General Public License (GPL) Version 2, available from the file
9
* COPYING in the main directory of this source tree, or the
10
* OpenIB.org BSD license below:
11
*
12
* Redistribution and use in source and binary forms, with or
13
* without modification, are permitted provided that the following
14
* conditions are met:
15
*
16
* - Redistributions of source code must retain the above
17
* copyright notice, this list of conditions and the following
18
* disclaimer.
19
*
20
* - Redistributions in binary form must reproduce the above
21
* copyright notice, this list of conditions and the following
22
* disclaimer in the documentation and/or other materials
23
* provided with the distribution.
24
*
25
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
26
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
27
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
28
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
29
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
30
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
31
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
32
* SOFTWARE.
33
*/
34
#include "sdp.h"
35
36
#define sdp_cnt(var) do { (var)++; } while (0)
37
38
SDP_MODPARAM_SINT(sdp_keepalive_probes_sent, 0,
39
"Total number of keepalive probes sent.");
40
41
static int sdp_process_tx_cq(struct sdp_sock *ssk);
42
static void sdp_poll_tx_timeout(void *data);
43
44
int
45
sdp_xmit_poll(struct sdp_sock *ssk, int force)
46
{
47
int wc_processed = 0;
48
49
SDP_WLOCK_ASSERT(ssk);
50
sdp_prf(ssk->socket, NULL, "%s", __func__);
51
52
/* If we don't have a pending timer, set one up to catch our recent
53
post in case the interface becomes idle */
54
if (!callout_pending(&ssk->tx_ring.timer))
55
callout_reset(&ssk->tx_ring.timer, SDP_TX_POLL_TIMEOUT,
56
sdp_poll_tx_timeout, ssk);
57
58
/* Poll the CQ every SDP_TX_POLL_MODER packets */
59
if (force || (++ssk->tx_ring.poll_cnt & (SDP_TX_POLL_MODER - 1)) == 0)
60
wc_processed = sdp_process_tx_cq(ssk);
61
62
return wc_processed;
63
}
64
65
void
66
sdp_post_send(struct sdp_sock *ssk, struct mbuf *mb)
67
{
68
struct sdp_buf *tx_req;
69
struct sdp_bsdh *h;
70
unsigned long mseq;
71
struct ib_device *dev;
72
const struct ib_send_wr *bad_wr;
73
struct ib_sge ibsge[SDP_MAX_SEND_SGES];
74
struct ib_sge *sge;
75
struct ib_send_wr tx_wr = { NULL };
76
int i, rc;
77
u64 addr;
78
79
SDPSTATS_COUNTER_MID_INC(post_send, h->mid);
80
SDPSTATS_HIST(send_size, mb->len);
81
82
if (!ssk->qp_active) {
83
m_freem(mb);
84
return;
85
}
86
87
mseq = ring_head(ssk->tx_ring);
88
h = mtod(mb, struct sdp_bsdh *);
89
ssk->tx_packets++;
90
ssk->tx_bytes += mb->m_pkthdr.len;
91
92
#ifdef SDP_ZCOPY
93
if (unlikely(h->mid == SDP_MID_SRCAVAIL)) {
94
struct tx_srcavail_state *tx_sa = TX_SRCAVAIL_STATE(mb);
95
if (ssk->tx_sa != tx_sa) {
96
sdp_dbg_data(ssk->socket, "SrcAvail cancelled "
97
"before being sent!\n");
98
WARN_ON(1);
99
m_freem(mb);
100
return;
101
}
102
TX_SRCAVAIL_STATE(mb)->mseq = mseq;
103
}
104
#endif
105
106
if (unlikely(mb->m_flags & M_URG))
107
h->flags = SDP_OOB_PRES | SDP_OOB_PEND;
108
else
109
h->flags = 0;
110
111
mb->m_flags |= M_RDONLY; /* Don't allow compression once sent. */
112
h->bufs = htons(rx_ring_posted(ssk));
113
h->len = htonl(mb->m_pkthdr.len);
114
h->mseq = htonl(mseq);
115
h->mseq_ack = htonl(mseq_ack(ssk));
116
117
sdp_prf1(ssk->socket, mb, "TX: %s bufs: %d mseq:%ld ack:%d",
118
mid2str(h->mid), rx_ring_posted(ssk), mseq,
119
ntohl(h->mseq_ack));
120
121
SDP_DUMP_PACKET(ssk->socket, "TX", mb, h);
122
123
tx_req = &ssk->tx_ring.buffer[mseq & (SDP_TX_SIZE - 1)];
124
tx_req->mb = mb;
125
dev = ssk->ib_device;
126
sge = &ibsge[0];
127
for (i = 0; mb != NULL; i++, mb = mb->m_next, sge++) {
128
addr = ib_dma_map_single(dev, mb->m_data, mb->m_len,
129
DMA_TO_DEVICE);
130
/* TODO: proper error handling */
131
BUG_ON(ib_dma_mapping_error(dev, addr));
132
BUG_ON(i >= SDP_MAX_SEND_SGES);
133
tx_req->mapping[i] = addr;
134
sge->addr = addr;
135
sge->length = mb->m_len;
136
sge->lkey = ssk->sdp_dev->pd->local_dma_lkey;
137
}
138
tx_wr.next = NULL;
139
tx_wr.wr_id = mseq | SDP_OP_SEND;
140
tx_wr.sg_list = ibsge;
141
tx_wr.num_sge = i;
142
tx_wr.opcode = IB_WR_SEND;
143
tx_wr.send_flags = IB_SEND_SIGNALED;
144
if (unlikely(tx_req->mb->m_flags & M_URG))
145
tx_wr.send_flags |= IB_SEND_SOLICITED;
146
147
rc = ib_post_send(ssk->qp, &tx_wr, &bad_wr);
148
if (unlikely(rc)) {
149
sdp_dbg(ssk->socket,
150
"ib_post_send failed with status %d.\n", rc);
151
152
sdp_cleanup_sdp_buf(ssk, tx_req, DMA_TO_DEVICE);
153
154
sdp_notify(ssk, ECONNRESET);
155
m_freem(tx_req->mb);
156
return;
157
}
158
159
atomic_inc(&ssk->tx_ring.head);
160
atomic_dec(&ssk->tx_ring.credits);
161
atomic_set(&ssk->remote_credits, rx_ring_posted(ssk));
162
163
return;
164
}
165
166
static struct mbuf *
167
sdp_send_completion(struct sdp_sock *ssk, int mseq)
168
{
169
struct ib_device *dev;
170
struct sdp_buf *tx_req;
171
struct mbuf *mb = NULL;
172
struct sdp_tx_ring *tx_ring = &ssk->tx_ring;
173
174
if (unlikely(mseq != ring_tail(*tx_ring))) {
175
printk(KERN_WARNING "Bogus send completion id %d tail %d\n",
176
mseq, ring_tail(*tx_ring));
177
goto out;
178
}
179
180
dev = ssk->ib_device;
181
tx_req = &tx_ring->buffer[mseq & (SDP_TX_SIZE - 1)];
182
mb = tx_req->mb;
183
sdp_cleanup_sdp_buf(ssk, tx_req, DMA_TO_DEVICE);
184
185
#ifdef SDP_ZCOPY
186
/* TODO: AIO and real zcopy code; add their context support here */
187
if (BZCOPY_STATE(mb))
188
BZCOPY_STATE(mb)->busy--;
189
#endif
190
191
atomic_inc(&tx_ring->tail);
192
193
out:
194
return mb;
195
}
196
197
static int
198
sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc)
199
{
200
struct mbuf *mb = NULL;
201
struct sdp_bsdh *h;
202
203
if (unlikely(wc->status)) {
204
if (wc->status != IB_WC_WR_FLUSH_ERR) {
205
sdp_prf(ssk->socket, mb, "Send completion with error. "
206
"Status %d", wc->status);
207
sdp_dbg_data(ssk->socket, "Send completion with error. "
208
"Status %d\n", wc->status);
209
sdp_notify(ssk, ECONNRESET);
210
}
211
}
212
213
mb = sdp_send_completion(ssk, wc->wr_id);
214
if (unlikely(!mb))
215
return -1;
216
217
h = mtod(mb, struct sdp_bsdh *);
218
sdp_prf1(ssk->socket, mb, "tx completion. mseq:%d", ntohl(h->mseq));
219
sdp_dbg(ssk->socket, "tx completion. %p %d mseq:%d",
220
mb, mb->m_pkthdr.len, ntohl(h->mseq));
221
m_freem(mb);
222
223
return 0;
224
}
225
226
static inline void
227
sdp_process_tx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
228
{
229
230
if (likely(wc->wr_id & SDP_OP_SEND)) {
231
sdp_handle_send_comp(ssk, wc);
232
return;
233
}
234
235
#ifdef SDP_ZCOPY
236
if (wc->wr_id & SDP_OP_RDMA) {
237
/* TODO: handle failed RDMA read cqe */
238
239
sdp_dbg_data(ssk->socket,
240
"TX comp: RDMA read. status: %d\n", wc->status);
241
sdp_prf1(sk, NULL, "TX comp: RDMA read");
242
243
if (!ssk->tx_ring.rdma_inflight) {
244
sdp_warn(ssk->socket, "ERROR: unexpected RDMA read\n");
245
return;
246
}
247
248
if (!ssk->tx_ring.rdma_inflight->busy) {
249
sdp_warn(ssk->socket,
250
"ERROR: too many RDMA read completions\n");
251
return;
252
}
253
254
/* Only last RDMA read WR is signalled. Order is guaranteed -
255
* therefore if Last RDMA read WR is completed - all other
256
* have, too */
257
ssk->tx_ring.rdma_inflight->busy = 0;
258
sowwakeup(ssk->socket);
259
sdp_dbg_data(ssk->socket, "woke up sleepers\n");
260
return;
261
}
262
#endif
263
264
/* Keepalive probe sent cleanup */
265
sdp_cnt(sdp_keepalive_probes_sent);
266
267
if (likely(!wc->status))
268
return;
269
270
sdp_dbg(ssk->socket, " %s consumes KEEPALIVE status %d\n",
271
__func__, wc->status);
272
273
if (wc->status == IB_WC_WR_FLUSH_ERR)
274
return;
275
276
sdp_notify(ssk, ECONNRESET);
277
}
278
279
static int
280
sdp_process_tx_cq(struct sdp_sock *ssk)
281
{
282
struct ib_wc ibwc[SDP_NUM_WC];
283
int n, i;
284
int wc_processed = 0;
285
286
SDP_WLOCK_ASSERT(ssk);
287
288
if (!ssk->tx_ring.cq) {
289
sdp_dbg(ssk->socket, "tx irq on destroyed tx_cq\n");
290
return 0;
291
}
292
293
do {
294
n = ib_poll_cq(ssk->tx_ring.cq, SDP_NUM_WC, ibwc);
295
for (i = 0; i < n; ++i) {
296
sdp_process_tx_wc(ssk, ibwc + i);
297
wc_processed++;
298
}
299
} while (n == SDP_NUM_WC);
300
301
if (wc_processed) {
302
sdp_post_sends(ssk, M_NOWAIT);
303
sdp_prf1(sk, NULL, "Waking sendmsg. inflight=%d",
304
(u32) tx_ring_posted(ssk));
305
sowwakeup(ssk->socket);
306
}
307
308
return wc_processed;
309
}
310
311
static void
312
sdp_poll_tx(struct sdp_sock *ssk)
313
{
314
struct socket *sk = ssk->socket;
315
u32 inflight, wc_processed;
316
317
sdp_prf1(ssk->socket, NULL, "TX timeout: inflight=%d, head=%d tail=%d",
318
(u32) tx_ring_posted(ssk),
319
ring_head(ssk->tx_ring), ring_tail(ssk->tx_ring));
320
321
if (unlikely(ssk->state == TCPS_CLOSED)) {
322
sdp_warn(sk, "Socket is closed\n");
323
goto out;
324
}
325
326
wc_processed = sdp_process_tx_cq(ssk);
327
if (!wc_processed)
328
SDPSTATS_COUNTER_INC(tx_poll_miss);
329
else
330
SDPSTATS_COUNTER_INC(tx_poll_hit);
331
332
inflight = (u32) tx_ring_posted(ssk);
333
sdp_prf1(ssk->socket, NULL, "finished tx processing. inflight = %d",
334
inflight);
335
336
/* If there are still packets in flight and the timer has not already
337
* been scheduled by the Tx routine then schedule it here to guarantee
338
* completion processing of these packets */
339
if (inflight)
340
callout_reset(&ssk->tx_ring.timer, SDP_TX_POLL_TIMEOUT,
341
sdp_poll_tx_timeout, ssk);
342
out:
343
#ifdef SDP_ZCOPY
344
if (ssk->tx_ring.rdma_inflight && ssk->tx_ring.rdma_inflight->busy) {
345
sdp_prf1(sk, NULL, "RDMA is inflight - arming irq");
346
sdp_arm_tx_cq(ssk);
347
}
348
#endif
349
return;
350
}
351
352
static void
353
sdp_poll_tx_timeout(void *data)
354
{
355
struct sdp_sock *ssk = (struct sdp_sock *)data;
356
357
if (!callout_active(&ssk->tx_ring.timer))
358
return;
359
callout_deactivate(&ssk->tx_ring.timer);
360
sdp_poll_tx(ssk);
361
}
362
363
static void
364
sdp_tx_irq(struct ib_cq *cq, void *cq_context)
365
{
366
struct sdp_sock *ssk;
367
368
ssk = cq_context;
369
sdp_prf1(ssk->socket, NULL, "tx irq");
370
sdp_dbg_data(ssk->socket, "Got tx comp interrupt\n");
371
SDPSTATS_COUNTER_INC(tx_int_count);
372
SDP_WLOCK(ssk);
373
sdp_poll_tx(ssk);
374
SDP_WUNLOCK(ssk);
375
}
376
377
static
378
void sdp_tx_ring_purge(struct sdp_sock *ssk)
379
{
380
while (tx_ring_posted(ssk)) {
381
struct mbuf *mb;
382
mb = sdp_send_completion(ssk, ring_tail(ssk->tx_ring));
383
if (!mb)
384
break;
385
m_freem(mb);
386
}
387
}
388
389
void
390
sdp_post_keepalive(struct sdp_sock *ssk)
391
{
392
int rc;
393
struct ib_send_wr wr;
394
const struct ib_send_wr *bad_wr;
395
396
sdp_dbg(ssk->socket, "%s\n", __func__);
397
398
memset(&wr, 0, sizeof(wr));
399
400
wr.next = NULL;
401
wr.wr_id = 0;
402
wr.sg_list = NULL;
403
wr.num_sge = 0;
404
wr.opcode = IB_WR_RDMA_WRITE;
405
406
rc = ib_post_send(ssk->qp, &wr, &bad_wr);
407
if (rc) {
408
sdp_dbg(ssk->socket,
409
"ib_post_keepalive failed with status %d.\n", rc);
410
sdp_notify(ssk, ECONNRESET);
411
}
412
413
sdp_cnt(sdp_keepalive_probes_sent);
414
}
415
416
static void
417
sdp_tx_cq_event_handler(struct ib_event *event, void *data)
418
{
419
}
420
421
int
422
sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
423
{
424
struct ib_cq_init_attr tx_cq_attr = {
425
.cqe = SDP_TX_SIZE,
426
.comp_vector = 0,
427
.flags = 0,
428
};
429
struct ib_cq *tx_cq;
430
int rc = 0;
431
432
sdp_dbg(ssk->socket, "tx ring create\n");
433
callout_init_rw(&ssk->tx_ring.timer, &ssk->lock, 0);
434
callout_init_rw(&ssk->nagle_timer, &ssk->lock, 0);
435
atomic_set(&ssk->tx_ring.head, 1);
436
atomic_set(&ssk->tx_ring.tail, 1);
437
438
ssk->tx_ring.buffer = malloc(sizeof(*ssk->tx_ring.buffer) * SDP_TX_SIZE,
439
M_SDP, M_WAITOK);
440
441
tx_cq = ib_create_cq(device, sdp_tx_irq, sdp_tx_cq_event_handler,
442
ssk, &tx_cq_attr);
443
if (IS_ERR(tx_cq)) {
444
rc = PTR_ERR(tx_cq);
445
sdp_warn(ssk->socket, "Unable to allocate TX CQ: %d.\n", rc);
446
goto err_cq;
447
}
448
ssk->tx_ring.cq = tx_cq;
449
ssk->tx_ring.poll_cnt = 0;
450
sdp_arm_tx_cq(ssk);
451
452
return 0;
453
454
err_cq:
455
free(ssk->tx_ring.buffer, M_SDP);
456
ssk->tx_ring.buffer = NULL;
457
return rc;
458
}
459
460
void
461
sdp_tx_ring_destroy(struct sdp_sock *ssk)
462
{
463
464
sdp_dbg(ssk->socket, "tx ring destroy\n");
465
SDP_WLOCK(ssk);
466
callout_stop(&ssk->tx_ring.timer);
467
callout_stop(&ssk->nagle_timer);
468
SDP_WUNLOCK(ssk);
469
callout_drain(&ssk->tx_ring.timer);
470
callout_drain(&ssk->nagle_timer);
471
472
if (ssk->tx_ring.buffer) {
473
sdp_tx_ring_purge(ssk);
474
free(ssk->tx_ring.buffer, M_SDP);
475
ssk->tx_ring.buffer = NULL;
476
}
477
478
if (ssk->tx_ring.cq) {
479
ib_destroy_cq(ssk->tx_ring.cq);
480
ssk->tx_ring.cq = NULL;
481
}
482
483
WARN_ON(ring_head(ssk->tx_ring) != ring_tail(ssk->tx_ring));
484
}
485
486