Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/net/kcm/kcmsock.c
26285 views
1
// SPDX-License-Identifier: GPL-2.0-only
2
/*
3
* Kernel Connection Multiplexor
4
*
5
* Copyright (c) 2016 Tom Herbert <[email protected]>
6
*/
7
8
#include <linux/bpf.h>
9
#include <linux/errno.h>
10
#include <linux/errqueue.h>
11
#include <linux/file.h>
12
#include <linux/filter.h>
13
#include <linux/in.h>
14
#include <linux/kernel.h>
15
#include <linux/module.h>
16
#include <linux/net.h>
17
#include <linux/netdevice.h>
18
#include <linux/poll.h>
19
#include <linux/rculist.h>
20
#include <linux/skbuff.h>
21
#include <linux/socket.h>
22
#include <linux/splice.h>
23
#include <linux/uaccess.h>
24
#include <linux/workqueue.h>
25
#include <linux/syscalls.h>
26
#include <linux/sched/signal.h>
27
28
#include <net/kcm.h>
29
#include <net/netns/generic.h>
30
#include <net/sock.h>
31
#include <uapi/linux/kcm.h>
32
#include <trace/events/sock.h>
33
34
unsigned int kcm_net_id;
35
36
static struct kmem_cache *kcm_psockp __read_mostly;
37
static struct kmem_cache *kcm_muxp __read_mostly;
38
static struct workqueue_struct *kcm_wq;
39
40
static inline struct kcm_sock *kcm_sk(const struct sock *sk)
41
{
42
return (struct kcm_sock *)sk;
43
}
44
45
static inline struct kcm_tx_msg *kcm_tx_msg(struct sk_buff *skb)
46
{
47
return (struct kcm_tx_msg *)skb->cb;
48
}
49
50
static void report_csk_error(struct sock *csk, int err)
51
{
52
csk->sk_err = EPIPE;
53
sk_error_report(csk);
54
}
55
56
static void kcm_abort_tx_psock(struct kcm_psock *psock, int err,
57
bool wakeup_kcm)
58
{
59
struct sock *csk = psock->sk;
60
struct kcm_mux *mux = psock->mux;
61
62
/* Unrecoverable error in transmit */
63
64
spin_lock_bh(&mux->lock);
65
66
if (psock->tx_stopped) {
67
spin_unlock_bh(&mux->lock);
68
return;
69
}
70
71
psock->tx_stopped = 1;
72
KCM_STATS_INCR(psock->stats.tx_aborts);
73
74
if (!psock->tx_kcm) {
75
/* Take off psocks_avail list */
76
list_del(&psock->psock_avail_list);
77
} else if (wakeup_kcm) {
78
/* In this case psock is being aborted while outside of
79
* write_msgs and psock is reserved. Schedule tx_work
80
* to handle the failure there. Need to commit tx_stopped
81
* before queuing work.
82
*/
83
smp_mb();
84
85
queue_work(kcm_wq, &psock->tx_kcm->tx_work);
86
}
87
88
spin_unlock_bh(&mux->lock);
89
90
/* Report error on lower socket */
91
report_csk_error(csk, err);
92
}
93
94
/* RX mux lock held. */
95
static void kcm_update_rx_mux_stats(struct kcm_mux *mux,
96
struct kcm_psock *psock)
97
{
98
STRP_STATS_ADD(mux->stats.rx_bytes,
99
psock->strp.stats.bytes -
100
psock->saved_rx_bytes);
101
mux->stats.rx_msgs +=
102
psock->strp.stats.msgs - psock->saved_rx_msgs;
103
psock->saved_rx_msgs = psock->strp.stats.msgs;
104
psock->saved_rx_bytes = psock->strp.stats.bytes;
105
}
106
107
static void kcm_update_tx_mux_stats(struct kcm_mux *mux,
108
struct kcm_psock *psock)
109
{
110
KCM_STATS_ADD(mux->stats.tx_bytes,
111
psock->stats.tx_bytes - psock->saved_tx_bytes);
112
mux->stats.tx_msgs +=
113
psock->stats.tx_msgs - psock->saved_tx_msgs;
114
psock->saved_tx_msgs = psock->stats.tx_msgs;
115
psock->saved_tx_bytes = psock->stats.tx_bytes;
116
}
117
118
static int kcm_queue_rcv_skb(struct sock *sk, struct sk_buff *skb);
119
120
/* KCM is ready to receive messages on its queue-- either the KCM is new or
121
* has become unblocked after being blocked on full socket buffer. Queue any
122
* pending ready messages on a psock. RX mux lock held.
123
*/
124
static void kcm_rcv_ready(struct kcm_sock *kcm)
125
{
126
struct kcm_mux *mux = kcm->mux;
127
struct kcm_psock *psock;
128
struct sk_buff *skb;
129
130
if (unlikely(kcm->rx_wait || kcm->rx_psock || kcm->rx_disabled))
131
return;
132
133
while (unlikely((skb = __skb_dequeue(&mux->rx_hold_queue)))) {
134
if (kcm_queue_rcv_skb(&kcm->sk, skb)) {
135
/* Assuming buffer limit has been reached */
136
skb_queue_head(&mux->rx_hold_queue, skb);
137
WARN_ON(!sk_rmem_alloc_get(&kcm->sk));
138
return;
139
}
140
}
141
142
while (!list_empty(&mux->psocks_ready)) {
143
psock = list_first_entry(&mux->psocks_ready, struct kcm_psock,
144
psock_ready_list);
145
146
if (kcm_queue_rcv_skb(&kcm->sk, psock->ready_rx_msg)) {
147
/* Assuming buffer limit has been reached */
148
WARN_ON(!sk_rmem_alloc_get(&kcm->sk));
149
return;
150
}
151
152
/* Consumed the ready message on the psock. Schedule rx_work to
153
* get more messages.
154
*/
155
list_del(&psock->psock_ready_list);
156
psock->ready_rx_msg = NULL;
157
/* Commit clearing of ready_rx_msg for queuing work */
158
smp_mb();
159
160
strp_unpause(&psock->strp);
161
strp_check_rcv(&psock->strp);
162
}
163
164
/* Buffer limit is okay now, add to ready list */
165
list_add_tail(&kcm->wait_rx_list,
166
&kcm->mux->kcm_rx_waiters);
167
/* paired with lockless reads in kcm_rfree() */
168
WRITE_ONCE(kcm->rx_wait, true);
169
}
170
171
static void kcm_rfree(struct sk_buff *skb)
172
{
173
struct sock *sk = skb->sk;
174
struct kcm_sock *kcm = kcm_sk(sk);
175
struct kcm_mux *mux = kcm->mux;
176
unsigned int len = skb->truesize;
177
178
sk_mem_uncharge(sk, len);
179
atomic_sub(len, &sk->sk_rmem_alloc);
180
181
/* For reading rx_wait and rx_psock without holding lock */
182
smp_mb__after_atomic();
183
184
if (!READ_ONCE(kcm->rx_wait) && !READ_ONCE(kcm->rx_psock) &&
185
sk_rmem_alloc_get(sk) < sk->sk_rcvlowat) {
186
spin_lock_bh(&mux->rx_lock);
187
kcm_rcv_ready(kcm);
188
spin_unlock_bh(&mux->rx_lock);
189
}
190
}
191
192
static int kcm_queue_rcv_skb(struct sock *sk, struct sk_buff *skb)
193
{
194
struct sk_buff_head *list = &sk->sk_receive_queue;
195
196
if (atomic_read(&sk->sk_rmem_alloc) >= sk->sk_rcvbuf)
197
return -ENOMEM;
198
199
if (!sk_rmem_schedule(sk, skb, skb->truesize))
200
return -ENOBUFS;
201
202
skb->dev = NULL;
203
204
skb_orphan(skb);
205
skb->sk = sk;
206
skb->destructor = kcm_rfree;
207
atomic_add(skb->truesize, &sk->sk_rmem_alloc);
208
sk_mem_charge(sk, skb->truesize);
209
210
skb_queue_tail(list, skb);
211
212
if (!sock_flag(sk, SOCK_DEAD))
213
sk->sk_data_ready(sk);
214
215
return 0;
216
}
217
218
/* Requeue received messages for a kcm socket to other kcm sockets. This is
219
* called with a kcm socket is receive disabled.
220
* RX mux lock held.
221
*/
222
static void requeue_rx_msgs(struct kcm_mux *mux, struct sk_buff_head *head)
223
{
224
struct sk_buff *skb;
225
struct kcm_sock *kcm;
226
227
while ((skb = skb_dequeue(head))) {
228
/* Reset destructor to avoid calling kcm_rcv_ready */
229
skb->destructor = sock_rfree;
230
skb_orphan(skb);
231
try_again:
232
if (list_empty(&mux->kcm_rx_waiters)) {
233
skb_queue_tail(&mux->rx_hold_queue, skb);
234
continue;
235
}
236
237
kcm = list_first_entry(&mux->kcm_rx_waiters,
238
struct kcm_sock, wait_rx_list);
239
240
if (kcm_queue_rcv_skb(&kcm->sk, skb)) {
241
/* Should mean socket buffer full */
242
list_del(&kcm->wait_rx_list);
243
/* paired with lockless reads in kcm_rfree() */
244
WRITE_ONCE(kcm->rx_wait, false);
245
246
/* Commit rx_wait to read in kcm_free */
247
smp_wmb();
248
249
goto try_again;
250
}
251
}
252
}
253
254
/* Lower sock lock held */
255
static struct kcm_sock *reserve_rx_kcm(struct kcm_psock *psock,
256
struct sk_buff *head)
257
{
258
struct kcm_mux *mux = psock->mux;
259
struct kcm_sock *kcm;
260
261
WARN_ON(psock->ready_rx_msg);
262
263
if (psock->rx_kcm)
264
return psock->rx_kcm;
265
266
spin_lock_bh(&mux->rx_lock);
267
268
if (psock->rx_kcm) {
269
spin_unlock_bh(&mux->rx_lock);
270
return psock->rx_kcm;
271
}
272
273
kcm_update_rx_mux_stats(mux, psock);
274
275
if (list_empty(&mux->kcm_rx_waiters)) {
276
psock->ready_rx_msg = head;
277
strp_pause(&psock->strp);
278
list_add_tail(&psock->psock_ready_list,
279
&mux->psocks_ready);
280
spin_unlock_bh(&mux->rx_lock);
281
return NULL;
282
}
283
284
kcm = list_first_entry(&mux->kcm_rx_waiters,
285
struct kcm_sock, wait_rx_list);
286
list_del(&kcm->wait_rx_list);
287
/* paired with lockless reads in kcm_rfree() */
288
WRITE_ONCE(kcm->rx_wait, false);
289
290
psock->rx_kcm = kcm;
291
/* paired with lockless reads in kcm_rfree() */
292
WRITE_ONCE(kcm->rx_psock, psock);
293
294
spin_unlock_bh(&mux->rx_lock);
295
296
return kcm;
297
}
298
299
static void kcm_done(struct kcm_sock *kcm);
300
301
static void kcm_done_work(struct work_struct *w)
302
{
303
kcm_done(container_of(w, struct kcm_sock, done_work));
304
}
305
306
/* Lower sock held */
307
static void unreserve_rx_kcm(struct kcm_psock *psock,
308
bool rcv_ready)
309
{
310
struct kcm_sock *kcm = psock->rx_kcm;
311
struct kcm_mux *mux = psock->mux;
312
313
if (!kcm)
314
return;
315
316
spin_lock_bh(&mux->rx_lock);
317
318
psock->rx_kcm = NULL;
319
/* paired with lockless reads in kcm_rfree() */
320
WRITE_ONCE(kcm->rx_psock, NULL);
321
322
/* Commit kcm->rx_psock before sk_rmem_alloc_get to sync with
323
* kcm_rfree
324
*/
325
smp_mb();
326
327
if (unlikely(kcm->done)) {
328
spin_unlock_bh(&mux->rx_lock);
329
330
/* Need to run kcm_done in a task since we need to qcquire
331
* callback locks which may already be held here.
332
*/
333
INIT_WORK(&kcm->done_work, kcm_done_work);
334
schedule_work(&kcm->done_work);
335
return;
336
}
337
338
if (unlikely(kcm->rx_disabled)) {
339
requeue_rx_msgs(mux, &kcm->sk.sk_receive_queue);
340
} else if (rcv_ready || unlikely(!sk_rmem_alloc_get(&kcm->sk))) {
341
/* Check for degenerative race with rx_wait that all
342
* data was dequeued (accounted for in kcm_rfree).
343
*/
344
kcm_rcv_ready(kcm);
345
}
346
spin_unlock_bh(&mux->rx_lock);
347
}
348
349
/* Lower sock lock held */
350
static void psock_data_ready(struct sock *sk)
351
{
352
struct kcm_psock *psock;
353
354
trace_sk_data_ready(sk);
355
356
read_lock_bh(&sk->sk_callback_lock);
357
358
psock = (struct kcm_psock *)sk->sk_user_data;
359
if (likely(psock))
360
strp_data_ready(&psock->strp);
361
362
read_unlock_bh(&sk->sk_callback_lock);
363
}
364
365
/* Called with lower sock held */
366
static void kcm_rcv_strparser(struct strparser *strp, struct sk_buff *skb)
367
{
368
struct kcm_psock *psock = container_of(strp, struct kcm_psock, strp);
369
struct kcm_sock *kcm;
370
371
try_queue:
372
kcm = reserve_rx_kcm(psock, skb);
373
if (!kcm) {
374
/* Unable to reserve a KCM, message is held in psock and strp
375
* is paused.
376
*/
377
return;
378
}
379
380
if (kcm_queue_rcv_skb(&kcm->sk, skb)) {
381
/* Should mean socket buffer full */
382
unreserve_rx_kcm(psock, false);
383
goto try_queue;
384
}
385
}
386
387
static int kcm_parse_func_strparser(struct strparser *strp, struct sk_buff *skb)
388
{
389
struct kcm_psock *psock = container_of(strp, struct kcm_psock, strp);
390
struct bpf_prog *prog = psock->bpf_prog;
391
int res;
392
393
res = bpf_prog_run_pin_on_cpu(prog, skb);
394
return res;
395
}
396
397
static int kcm_read_sock_done(struct strparser *strp, int err)
398
{
399
struct kcm_psock *psock = container_of(strp, struct kcm_psock, strp);
400
401
unreserve_rx_kcm(psock, true);
402
403
return err;
404
}
405
406
static void psock_state_change(struct sock *sk)
407
{
408
/* TCP only does a EPOLLIN for a half close. Do a EPOLLHUP here
409
* since application will normally not poll with EPOLLIN
410
* on the TCP sockets.
411
*/
412
413
report_csk_error(sk, EPIPE);
414
}
415
416
static void psock_write_space(struct sock *sk)
417
{
418
struct kcm_psock *psock;
419
struct kcm_mux *mux;
420
struct kcm_sock *kcm;
421
422
read_lock_bh(&sk->sk_callback_lock);
423
424
psock = (struct kcm_psock *)sk->sk_user_data;
425
if (unlikely(!psock))
426
goto out;
427
mux = psock->mux;
428
429
spin_lock_bh(&mux->lock);
430
431
/* Check if the socket is reserved so someone is waiting for sending. */
432
kcm = psock->tx_kcm;
433
if (kcm)
434
queue_work(kcm_wq, &kcm->tx_work);
435
436
spin_unlock_bh(&mux->lock);
437
out:
438
read_unlock_bh(&sk->sk_callback_lock);
439
}
440
441
static void unreserve_psock(struct kcm_sock *kcm);
442
443
/* kcm sock is locked. */
444
static struct kcm_psock *reserve_psock(struct kcm_sock *kcm)
445
{
446
struct kcm_mux *mux = kcm->mux;
447
struct kcm_psock *psock;
448
449
psock = kcm->tx_psock;
450
451
smp_rmb(); /* Must read tx_psock before tx_wait */
452
453
if (psock) {
454
WARN_ON(kcm->tx_wait);
455
if (unlikely(psock->tx_stopped))
456
unreserve_psock(kcm);
457
else
458
return kcm->tx_psock;
459
}
460
461
spin_lock_bh(&mux->lock);
462
463
/* Check again under lock to see if psock was reserved for this
464
* psock via psock_unreserve.
465
*/
466
psock = kcm->tx_psock;
467
if (unlikely(psock)) {
468
WARN_ON(kcm->tx_wait);
469
spin_unlock_bh(&mux->lock);
470
return kcm->tx_psock;
471
}
472
473
if (!list_empty(&mux->psocks_avail)) {
474
psock = list_first_entry(&mux->psocks_avail,
475
struct kcm_psock,
476
psock_avail_list);
477
list_del(&psock->psock_avail_list);
478
if (kcm->tx_wait) {
479
list_del(&kcm->wait_psock_list);
480
kcm->tx_wait = false;
481
}
482
kcm->tx_psock = psock;
483
psock->tx_kcm = kcm;
484
KCM_STATS_INCR(psock->stats.reserved);
485
} else if (!kcm->tx_wait) {
486
list_add_tail(&kcm->wait_psock_list,
487
&mux->kcm_tx_waiters);
488
kcm->tx_wait = true;
489
}
490
491
spin_unlock_bh(&mux->lock);
492
493
return psock;
494
}
495
496
/* mux lock held */
497
static void psock_now_avail(struct kcm_psock *psock)
498
{
499
struct kcm_mux *mux = psock->mux;
500
struct kcm_sock *kcm;
501
502
if (list_empty(&mux->kcm_tx_waiters)) {
503
list_add_tail(&psock->psock_avail_list,
504
&mux->psocks_avail);
505
} else {
506
kcm = list_first_entry(&mux->kcm_tx_waiters,
507
struct kcm_sock,
508
wait_psock_list);
509
list_del(&kcm->wait_psock_list);
510
kcm->tx_wait = false;
511
psock->tx_kcm = kcm;
512
513
/* Commit before changing tx_psock since that is read in
514
* reserve_psock before queuing work.
515
*/
516
smp_mb();
517
518
kcm->tx_psock = psock;
519
KCM_STATS_INCR(psock->stats.reserved);
520
queue_work(kcm_wq, &kcm->tx_work);
521
}
522
}
523
524
/* kcm sock is locked. */
525
static void unreserve_psock(struct kcm_sock *kcm)
526
{
527
struct kcm_psock *psock;
528
struct kcm_mux *mux = kcm->mux;
529
530
spin_lock_bh(&mux->lock);
531
532
psock = kcm->tx_psock;
533
534
if (WARN_ON(!psock)) {
535
spin_unlock_bh(&mux->lock);
536
return;
537
}
538
539
smp_rmb(); /* Read tx_psock before tx_wait */
540
541
kcm_update_tx_mux_stats(mux, psock);
542
543
WARN_ON(kcm->tx_wait);
544
545
kcm->tx_psock = NULL;
546
psock->tx_kcm = NULL;
547
KCM_STATS_INCR(psock->stats.unreserved);
548
549
if (unlikely(psock->tx_stopped)) {
550
if (psock->done) {
551
/* Deferred free */
552
list_del(&psock->psock_list);
553
mux->psocks_cnt--;
554
sock_put(psock->sk);
555
fput(psock->sk->sk_socket->file);
556
kmem_cache_free(kcm_psockp, psock);
557
}
558
559
/* Don't put back on available list */
560
561
spin_unlock_bh(&mux->lock);
562
563
return;
564
}
565
566
psock_now_avail(psock);
567
568
spin_unlock_bh(&mux->lock);
569
}
570
571
static void kcm_report_tx_retry(struct kcm_sock *kcm)
572
{
573
struct kcm_mux *mux = kcm->mux;
574
575
spin_lock_bh(&mux->lock);
576
KCM_STATS_INCR(mux->stats.tx_retries);
577
spin_unlock_bh(&mux->lock);
578
}
579
580
/* Write any messages ready on the kcm socket. Called with kcm sock lock
581
* held. Return bytes actually sent or error.
582
*/
583
static int kcm_write_msgs(struct kcm_sock *kcm)
584
{
585
unsigned int total_sent = 0;
586
struct sock *sk = &kcm->sk;
587
struct kcm_psock *psock;
588
struct sk_buff *head;
589
int ret = 0;
590
591
kcm->tx_wait_more = false;
592
psock = kcm->tx_psock;
593
if (unlikely(psock && psock->tx_stopped)) {
594
/* A reserved psock was aborted asynchronously. Unreserve
595
* it and we'll retry the message.
596
*/
597
unreserve_psock(kcm);
598
kcm_report_tx_retry(kcm);
599
if (skb_queue_empty(&sk->sk_write_queue))
600
return 0;
601
602
kcm_tx_msg(skb_peek(&sk->sk_write_queue))->started_tx = false;
603
}
604
605
retry:
606
while ((head = skb_peek(&sk->sk_write_queue))) {
607
struct msghdr msg = {
608
.msg_flags = MSG_DONTWAIT | MSG_SPLICE_PAGES,
609
};
610
struct kcm_tx_msg *txm = kcm_tx_msg(head);
611
struct sk_buff *skb;
612
unsigned int msize;
613
int i;
614
615
if (!txm->started_tx) {
616
psock = reserve_psock(kcm);
617
if (!psock)
618
goto out;
619
skb = head;
620
txm->frag_offset = 0;
621
txm->sent = 0;
622
txm->started_tx = true;
623
} else {
624
if (WARN_ON(!psock)) {
625
ret = -EINVAL;
626
goto out;
627
}
628
skb = txm->frag_skb;
629
}
630
631
if (WARN_ON(!skb_shinfo(skb)->nr_frags) ||
632
WARN_ON_ONCE(!skb_frag_page(&skb_shinfo(skb)->frags[0]))) {
633
ret = -EINVAL;
634
goto out;
635
}
636
637
msize = 0;
638
for (i = 0; i < skb_shinfo(skb)->nr_frags; i++)
639
msize += skb_frag_size(&skb_shinfo(skb)->frags[i]);
640
641
iov_iter_bvec(&msg.msg_iter, ITER_SOURCE,
642
(const struct bio_vec *)skb_shinfo(skb)->frags,
643
skb_shinfo(skb)->nr_frags, msize);
644
iov_iter_advance(&msg.msg_iter, txm->frag_offset);
645
646
do {
647
ret = sock_sendmsg(psock->sk->sk_socket, &msg);
648
if (ret <= 0) {
649
if (ret == -EAGAIN) {
650
/* Save state to try again when there's
651
* write space on the socket
652
*/
653
txm->frag_skb = skb;
654
ret = 0;
655
goto out;
656
}
657
658
/* Hard failure in sending message, abort this
659
* psock since it has lost framing
660
* synchronization and retry sending the
661
* message from the beginning.
662
*/
663
kcm_abort_tx_psock(psock, ret ? -ret : EPIPE,
664
true);
665
unreserve_psock(kcm);
666
psock = NULL;
667
668
txm->started_tx = false;
669
kcm_report_tx_retry(kcm);
670
ret = 0;
671
goto retry;
672
}
673
674
txm->sent += ret;
675
txm->frag_offset += ret;
676
KCM_STATS_ADD(psock->stats.tx_bytes, ret);
677
} while (msg.msg_iter.count > 0);
678
679
if (skb == head) {
680
if (skb_has_frag_list(skb)) {
681
txm->frag_skb = skb_shinfo(skb)->frag_list;
682
txm->frag_offset = 0;
683
continue;
684
}
685
} else if (skb->next) {
686
txm->frag_skb = skb->next;
687
txm->frag_offset = 0;
688
continue;
689
}
690
691
/* Successfully sent the whole packet, account for it. */
692
sk->sk_wmem_queued -= txm->sent;
693
total_sent += txm->sent;
694
skb_dequeue(&sk->sk_write_queue);
695
kfree_skb(head);
696
KCM_STATS_INCR(psock->stats.tx_msgs);
697
}
698
out:
699
if (!head) {
700
/* Done with all queued messages. */
701
WARN_ON(!skb_queue_empty(&sk->sk_write_queue));
702
if (psock)
703
unreserve_psock(kcm);
704
}
705
706
/* Check if write space is available */
707
sk->sk_write_space(sk);
708
709
return total_sent ? : ret;
710
}
711
712
static void kcm_tx_work(struct work_struct *w)
713
{
714
struct kcm_sock *kcm = container_of(w, struct kcm_sock, tx_work);
715
struct sock *sk = &kcm->sk;
716
int err;
717
718
lock_sock(sk);
719
720
/* Primarily for SOCK_DGRAM sockets, also handle asynchronous tx
721
* aborts
722
*/
723
err = kcm_write_msgs(kcm);
724
if (err < 0) {
725
/* Hard failure in write, report error on KCM socket */
726
pr_warn("KCM: Hard failure on kcm_write_msgs %d\n", err);
727
report_csk_error(&kcm->sk, -err);
728
goto out;
729
}
730
731
/* Primarily for SOCK_SEQPACKET sockets */
732
if (likely(sk->sk_socket) &&
733
test_bit(SOCK_NOSPACE, &sk->sk_socket->flags)) {
734
clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
735
sk->sk_write_space(sk);
736
}
737
738
out:
739
release_sock(sk);
740
}
741
742
static void kcm_push(struct kcm_sock *kcm)
743
{
744
if (kcm->tx_wait_more)
745
kcm_write_msgs(kcm);
746
}
747
748
static int kcm_sendmsg(struct socket *sock, struct msghdr *msg, size_t len)
749
{
750
struct sock *sk = sock->sk;
751
struct kcm_sock *kcm = kcm_sk(sk);
752
struct sk_buff *skb = NULL, *head = NULL;
753
size_t copy, copied = 0;
754
long timeo = sock_sndtimeo(sk, msg->msg_flags & MSG_DONTWAIT);
755
int eor = (sock->type == SOCK_DGRAM) ?
756
!(msg->msg_flags & MSG_MORE) : !!(msg->msg_flags & MSG_EOR);
757
int err = -EPIPE;
758
759
mutex_lock(&kcm->tx_mutex);
760
lock_sock(sk);
761
762
/* Per tcp_sendmsg this should be in poll */
763
sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
764
765
if (sk->sk_err)
766
goto out_error;
767
768
if (kcm->seq_skb) {
769
/* Previously opened message */
770
head = kcm->seq_skb;
771
skb = kcm_tx_msg(head)->last_skb;
772
goto start;
773
}
774
775
/* Call the sk_stream functions to manage the sndbuf mem. */
776
if (!sk_stream_memory_free(sk)) {
777
kcm_push(kcm);
778
set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
779
err = sk_stream_wait_memory(sk, &timeo);
780
if (err)
781
goto out_error;
782
}
783
784
if (msg_data_left(msg)) {
785
/* New message, alloc head skb */
786
head = alloc_skb(0, sk->sk_allocation);
787
while (!head) {
788
kcm_push(kcm);
789
err = sk_stream_wait_memory(sk, &timeo);
790
if (err)
791
goto out_error;
792
793
head = alloc_skb(0, sk->sk_allocation);
794
}
795
796
skb = head;
797
798
/* Set ip_summed to CHECKSUM_UNNECESSARY to avoid calling
799
* csum_and_copy_from_iter from skb_do_copy_data_nocache.
800
*/
801
skb->ip_summed = CHECKSUM_UNNECESSARY;
802
}
803
804
start:
805
while (msg_data_left(msg)) {
806
bool merge = true;
807
int i = skb_shinfo(skb)->nr_frags;
808
struct page_frag *pfrag = sk_page_frag(sk);
809
810
if (!sk_page_frag_refill(sk, pfrag))
811
goto wait_for_memory;
812
813
if (!skb_can_coalesce(skb, i, pfrag->page,
814
pfrag->offset)) {
815
if (i == MAX_SKB_FRAGS) {
816
struct sk_buff *tskb;
817
818
tskb = alloc_skb(0, sk->sk_allocation);
819
if (!tskb)
820
goto wait_for_memory;
821
822
if (head == skb)
823
skb_shinfo(head)->frag_list = tskb;
824
else
825
skb->next = tskb;
826
827
skb = tskb;
828
skb->ip_summed = CHECKSUM_UNNECESSARY;
829
continue;
830
}
831
merge = false;
832
}
833
834
if (msg->msg_flags & MSG_SPLICE_PAGES) {
835
copy = msg_data_left(msg);
836
if (!sk_wmem_schedule(sk, copy))
837
goto wait_for_memory;
838
839
err = skb_splice_from_iter(skb, &msg->msg_iter, copy);
840
if (err < 0) {
841
if (err == -EMSGSIZE)
842
goto wait_for_memory;
843
goto out_error;
844
}
845
846
copy = err;
847
skb_shinfo(skb)->flags |= SKBFL_SHARED_FRAG;
848
sk_wmem_queued_add(sk, copy);
849
sk_mem_charge(sk, copy);
850
851
if (head != skb)
852
head->truesize += copy;
853
} else {
854
copy = min_t(int, msg_data_left(msg),
855
pfrag->size - pfrag->offset);
856
if (!sk_wmem_schedule(sk, copy))
857
goto wait_for_memory;
858
859
err = skb_copy_to_page_nocache(sk, &msg->msg_iter, skb,
860
pfrag->page,
861
pfrag->offset,
862
copy);
863
if (err)
864
goto out_error;
865
866
/* Update the skb. */
867
if (merge) {
868
skb_frag_size_add(
869
&skb_shinfo(skb)->frags[i - 1], copy);
870
} else {
871
skb_fill_page_desc(skb, i, pfrag->page,
872
pfrag->offset, copy);
873
get_page(pfrag->page);
874
}
875
876
pfrag->offset += copy;
877
}
878
879
copied += copy;
880
if (head != skb) {
881
head->len += copy;
882
head->data_len += copy;
883
}
884
885
continue;
886
887
wait_for_memory:
888
kcm_push(kcm);
889
err = sk_stream_wait_memory(sk, &timeo);
890
if (err)
891
goto out_error;
892
}
893
894
if (eor) {
895
bool not_busy = skb_queue_empty(&sk->sk_write_queue);
896
897
if (head) {
898
/* Message complete, queue it on send buffer */
899
__skb_queue_tail(&sk->sk_write_queue, head);
900
kcm->seq_skb = NULL;
901
KCM_STATS_INCR(kcm->stats.tx_msgs);
902
}
903
904
if (msg->msg_flags & MSG_BATCH) {
905
kcm->tx_wait_more = true;
906
} else if (kcm->tx_wait_more || not_busy) {
907
err = kcm_write_msgs(kcm);
908
if (err < 0) {
909
/* We got a hard error in write_msgs but have
910
* already queued this message. Report an error
911
* in the socket, but don't affect return value
912
* from sendmsg
913
*/
914
pr_warn("KCM: Hard failure on kcm_write_msgs\n");
915
report_csk_error(&kcm->sk, -err);
916
}
917
}
918
} else {
919
/* Message not complete, save state */
920
partial_message:
921
if (head) {
922
kcm->seq_skb = head;
923
kcm_tx_msg(head)->last_skb = skb;
924
}
925
}
926
927
KCM_STATS_ADD(kcm->stats.tx_bytes, copied);
928
929
release_sock(sk);
930
mutex_unlock(&kcm->tx_mutex);
931
return copied;
932
933
out_error:
934
kcm_push(kcm);
935
936
if (sock->type == SOCK_SEQPACKET) {
937
/* Wrote some bytes before encountering an
938
* error, return partial success.
939
*/
940
if (copied)
941
goto partial_message;
942
if (head != kcm->seq_skb)
943
kfree_skb(head);
944
} else {
945
kfree_skb(head);
946
kcm->seq_skb = NULL;
947
}
948
949
err = sk_stream_error(sk, msg->msg_flags, err);
950
951
/* make sure we wake any epoll edge trigger waiter */
952
if (unlikely(skb_queue_len(&sk->sk_write_queue) == 0 && err == -EAGAIN))
953
sk->sk_write_space(sk);
954
955
release_sock(sk);
956
mutex_unlock(&kcm->tx_mutex);
957
return err;
958
}
959
960
static void kcm_splice_eof(struct socket *sock)
961
{
962
struct sock *sk = sock->sk;
963
struct kcm_sock *kcm = kcm_sk(sk);
964
965
if (skb_queue_empty_lockless(&sk->sk_write_queue))
966
return;
967
968
lock_sock(sk);
969
kcm_write_msgs(kcm);
970
release_sock(sk);
971
}
972
973
static int kcm_recvmsg(struct socket *sock, struct msghdr *msg,
974
size_t len, int flags)
975
{
976
struct sock *sk = sock->sk;
977
struct kcm_sock *kcm = kcm_sk(sk);
978
int err = 0;
979
struct strp_msg *stm;
980
int copied = 0;
981
struct sk_buff *skb;
982
983
skb = skb_recv_datagram(sk, flags, &err);
984
if (!skb)
985
goto out;
986
987
/* Okay, have a message on the receive queue */
988
989
stm = strp_msg(skb);
990
991
if (len > stm->full_len)
992
len = stm->full_len;
993
994
err = skb_copy_datagram_msg(skb, stm->offset, msg, len);
995
if (err < 0)
996
goto out;
997
998
copied = len;
999
if (likely(!(flags & MSG_PEEK))) {
1000
KCM_STATS_ADD(kcm->stats.rx_bytes, copied);
1001
if (copied < stm->full_len) {
1002
if (sock->type == SOCK_DGRAM) {
1003
/* Truncated message */
1004
msg->msg_flags |= MSG_TRUNC;
1005
goto msg_finished;
1006
}
1007
stm->offset += copied;
1008
stm->full_len -= copied;
1009
} else {
1010
msg_finished:
1011
/* Finished with message */
1012
msg->msg_flags |= MSG_EOR;
1013
KCM_STATS_INCR(kcm->stats.rx_msgs);
1014
}
1015
}
1016
1017
out:
1018
skb_free_datagram(sk, skb);
1019
return copied ? : err;
1020
}
1021
1022
static ssize_t kcm_splice_read(struct socket *sock, loff_t *ppos,
1023
struct pipe_inode_info *pipe, size_t len,
1024
unsigned int flags)
1025
{
1026
struct sock *sk = sock->sk;
1027
struct kcm_sock *kcm = kcm_sk(sk);
1028
struct strp_msg *stm;
1029
int err = 0;
1030
ssize_t copied;
1031
struct sk_buff *skb;
1032
1033
if (sock->file->f_flags & O_NONBLOCK || flags & SPLICE_F_NONBLOCK)
1034
flags = MSG_DONTWAIT;
1035
else
1036
flags = 0;
1037
1038
/* Only support splice for SOCKSEQPACKET */
1039
1040
skb = skb_recv_datagram(sk, flags, &err);
1041
if (!skb)
1042
goto err_out;
1043
1044
/* Okay, have a message on the receive queue */
1045
1046
stm = strp_msg(skb);
1047
1048
if (len > stm->full_len)
1049
len = stm->full_len;
1050
1051
copied = skb_splice_bits(skb, sk, stm->offset, pipe, len, flags);
1052
if (copied < 0) {
1053
err = copied;
1054
goto err_out;
1055
}
1056
1057
KCM_STATS_ADD(kcm->stats.rx_bytes, copied);
1058
1059
stm->offset += copied;
1060
stm->full_len -= copied;
1061
1062
/* We have no way to return MSG_EOR. If all the bytes have been
1063
* read we still leave the message in the receive socket buffer.
1064
* A subsequent recvmsg needs to be done to return MSG_EOR and
1065
* finish reading the message.
1066
*/
1067
1068
skb_free_datagram(sk, skb);
1069
return copied;
1070
1071
err_out:
1072
skb_free_datagram(sk, skb);
1073
return err;
1074
}
1075
1076
/* kcm sock lock held */
1077
static void kcm_recv_disable(struct kcm_sock *kcm)
1078
{
1079
struct kcm_mux *mux = kcm->mux;
1080
1081
if (kcm->rx_disabled)
1082
return;
1083
1084
spin_lock_bh(&mux->rx_lock);
1085
1086
kcm->rx_disabled = 1;
1087
1088
/* If a psock is reserved we'll do cleanup in unreserve */
1089
if (!kcm->rx_psock) {
1090
if (kcm->rx_wait) {
1091
list_del(&kcm->wait_rx_list);
1092
/* paired with lockless reads in kcm_rfree() */
1093
WRITE_ONCE(kcm->rx_wait, false);
1094
}
1095
1096
requeue_rx_msgs(mux, &kcm->sk.sk_receive_queue);
1097
}
1098
1099
spin_unlock_bh(&mux->rx_lock);
1100
}
1101
1102
/* kcm sock lock held */
1103
static void kcm_recv_enable(struct kcm_sock *kcm)
1104
{
1105
struct kcm_mux *mux = kcm->mux;
1106
1107
if (!kcm->rx_disabled)
1108
return;
1109
1110
spin_lock_bh(&mux->rx_lock);
1111
1112
kcm->rx_disabled = 0;
1113
kcm_rcv_ready(kcm);
1114
1115
spin_unlock_bh(&mux->rx_lock);
1116
}
1117
1118
static int kcm_setsockopt(struct socket *sock, int level, int optname,
1119
sockptr_t optval, unsigned int optlen)
1120
{
1121
struct kcm_sock *kcm = kcm_sk(sock->sk);
1122
int val, valbool;
1123
int err = 0;
1124
1125
if (level != SOL_KCM)
1126
return -ENOPROTOOPT;
1127
1128
if (optlen < sizeof(int))
1129
return -EINVAL;
1130
1131
if (copy_from_sockptr(&val, optval, sizeof(int)))
1132
return -EFAULT;
1133
1134
valbool = val ? 1 : 0;
1135
1136
switch (optname) {
1137
case KCM_RECV_DISABLE:
1138
lock_sock(&kcm->sk);
1139
if (valbool)
1140
kcm_recv_disable(kcm);
1141
else
1142
kcm_recv_enable(kcm);
1143
release_sock(&kcm->sk);
1144
break;
1145
default:
1146
err = -ENOPROTOOPT;
1147
}
1148
1149
return err;
1150
}
1151
1152
static int kcm_getsockopt(struct socket *sock, int level, int optname,
1153
char __user *optval, int __user *optlen)
1154
{
1155
struct kcm_sock *kcm = kcm_sk(sock->sk);
1156
int val, len;
1157
1158
if (level != SOL_KCM)
1159
return -ENOPROTOOPT;
1160
1161
if (get_user(len, optlen))
1162
return -EFAULT;
1163
1164
if (len < 0)
1165
return -EINVAL;
1166
1167
len = min_t(unsigned int, len, sizeof(int));
1168
1169
switch (optname) {
1170
case KCM_RECV_DISABLE:
1171
val = kcm->rx_disabled;
1172
break;
1173
default:
1174
return -ENOPROTOOPT;
1175
}
1176
1177
if (put_user(len, optlen))
1178
return -EFAULT;
1179
if (copy_to_user(optval, &val, len))
1180
return -EFAULT;
1181
return 0;
1182
}
1183
1184
static void init_kcm_sock(struct kcm_sock *kcm, struct kcm_mux *mux)
1185
{
1186
struct kcm_sock *tkcm;
1187
struct list_head *head;
1188
int index = 0;
1189
1190
/* For SOCK_SEQPACKET sock type, datagram_poll checks the sk_state, so
1191
* we set sk_state, otherwise epoll_wait always returns right away with
1192
* EPOLLHUP
1193
*/
1194
kcm->sk.sk_state = TCP_ESTABLISHED;
1195
1196
/* Add to mux's kcm sockets list */
1197
kcm->mux = mux;
1198
spin_lock_bh(&mux->lock);
1199
1200
head = &mux->kcm_socks;
1201
list_for_each_entry(tkcm, &mux->kcm_socks, kcm_sock_list) {
1202
if (tkcm->index != index)
1203
break;
1204
head = &tkcm->kcm_sock_list;
1205
index++;
1206
}
1207
1208
list_add(&kcm->kcm_sock_list, head);
1209
kcm->index = index;
1210
1211
mux->kcm_socks_cnt++;
1212
spin_unlock_bh(&mux->lock);
1213
1214
INIT_WORK(&kcm->tx_work, kcm_tx_work);
1215
mutex_init(&kcm->tx_mutex);
1216
1217
spin_lock_bh(&mux->rx_lock);
1218
kcm_rcv_ready(kcm);
1219
spin_unlock_bh(&mux->rx_lock);
1220
}
1221
1222
static int kcm_attach(struct socket *sock, struct socket *csock,
1223
struct bpf_prog *prog)
1224
{
1225
struct kcm_sock *kcm = kcm_sk(sock->sk);
1226
struct kcm_mux *mux = kcm->mux;
1227
struct sock *csk;
1228
struct kcm_psock *psock = NULL, *tpsock;
1229
struct list_head *head;
1230
int index = 0;
1231
static const struct strp_callbacks cb = {
1232
.rcv_msg = kcm_rcv_strparser,
1233
.parse_msg = kcm_parse_func_strparser,
1234
.read_sock_done = kcm_read_sock_done,
1235
};
1236
int err = 0;
1237
1238
csk = csock->sk;
1239
if (!csk)
1240
return -EINVAL;
1241
1242
lock_sock(csk);
1243
1244
/* Only allow TCP sockets to be attached for now */
1245
if ((csk->sk_family != AF_INET && csk->sk_family != AF_INET6) ||
1246
csk->sk_protocol != IPPROTO_TCP) {
1247
err = -EOPNOTSUPP;
1248
goto out;
1249
}
1250
1251
/* Don't allow listeners or closed sockets */
1252
if (csk->sk_state == TCP_LISTEN || csk->sk_state == TCP_CLOSE) {
1253
err = -EOPNOTSUPP;
1254
goto out;
1255
}
1256
1257
psock = kmem_cache_zalloc(kcm_psockp, GFP_KERNEL);
1258
if (!psock) {
1259
err = -ENOMEM;
1260
goto out;
1261
}
1262
1263
psock->mux = mux;
1264
psock->sk = csk;
1265
psock->bpf_prog = prog;
1266
1267
write_lock_bh(&csk->sk_callback_lock);
1268
1269
/* Check if sk_user_data is already by KCM or someone else.
1270
* Must be done under lock to prevent race conditions.
1271
*/
1272
if (csk->sk_user_data) {
1273
write_unlock_bh(&csk->sk_callback_lock);
1274
kmem_cache_free(kcm_psockp, psock);
1275
err = -EALREADY;
1276
goto out;
1277
}
1278
1279
err = strp_init(&psock->strp, csk, &cb);
1280
if (err) {
1281
write_unlock_bh(&csk->sk_callback_lock);
1282
kmem_cache_free(kcm_psockp, psock);
1283
goto out;
1284
}
1285
1286
psock->save_data_ready = csk->sk_data_ready;
1287
psock->save_write_space = csk->sk_write_space;
1288
psock->save_state_change = csk->sk_state_change;
1289
csk->sk_user_data = psock;
1290
csk->sk_data_ready = psock_data_ready;
1291
csk->sk_write_space = psock_write_space;
1292
csk->sk_state_change = psock_state_change;
1293
1294
write_unlock_bh(&csk->sk_callback_lock);
1295
1296
sock_hold(csk);
1297
1298
/* Finished initialization, now add the psock to the MUX. */
1299
spin_lock_bh(&mux->lock);
1300
head = &mux->psocks;
1301
list_for_each_entry(tpsock, &mux->psocks, psock_list) {
1302
if (tpsock->index != index)
1303
break;
1304
head = &tpsock->psock_list;
1305
index++;
1306
}
1307
1308
list_add(&psock->psock_list, head);
1309
psock->index = index;
1310
1311
KCM_STATS_INCR(mux->stats.psock_attach);
1312
mux->psocks_cnt++;
1313
psock_now_avail(psock);
1314
spin_unlock_bh(&mux->lock);
1315
1316
/* Schedule RX work in case there are already bytes queued */
1317
strp_check_rcv(&psock->strp);
1318
1319
out:
1320
release_sock(csk);
1321
1322
return err;
1323
}
1324
1325
static int kcm_attach_ioctl(struct socket *sock, struct kcm_attach *info)
1326
{
1327
struct socket *csock;
1328
struct bpf_prog *prog;
1329
int err;
1330
1331
csock = sockfd_lookup(info->fd, &err);
1332
if (!csock)
1333
return -ENOENT;
1334
1335
prog = bpf_prog_get_type(info->bpf_fd, BPF_PROG_TYPE_SOCKET_FILTER);
1336
if (IS_ERR(prog)) {
1337
err = PTR_ERR(prog);
1338
goto out;
1339
}
1340
1341
err = kcm_attach(sock, csock, prog);
1342
if (err) {
1343
bpf_prog_put(prog);
1344
goto out;
1345
}
1346
1347
/* Keep reference on file also */
1348
1349
return 0;
1350
out:
1351
sockfd_put(csock);
1352
return err;
1353
}
1354
1355
static void kcm_unattach(struct kcm_psock *psock)
1356
{
1357
struct sock *csk = psock->sk;
1358
struct kcm_mux *mux = psock->mux;
1359
1360
lock_sock(csk);
1361
1362
/* Stop getting callbacks from TCP socket. After this there should
1363
* be no way to reserve a kcm for this psock.
1364
*/
1365
write_lock_bh(&csk->sk_callback_lock);
1366
csk->sk_user_data = NULL;
1367
csk->sk_data_ready = psock->save_data_ready;
1368
csk->sk_write_space = psock->save_write_space;
1369
csk->sk_state_change = psock->save_state_change;
1370
strp_stop(&psock->strp);
1371
1372
if (WARN_ON(psock->rx_kcm)) {
1373
write_unlock_bh(&csk->sk_callback_lock);
1374
release_sock(csk);
1375
return;
1376
}
1377
1378
spin_lock_bh(&mux->rx_lock);
1379
1380
/* Stop receiver activities. After this point psock should not be
1381
* able to get onto ready list either through callbacks or work.
1382
*/
1383
if (psock->ready_rx_msg) {
1384
list_del(&psock->psock_ready_list);
1385
kfree_skb(psock->ready_rx_msg);
1386
psock->ready_rx_msg = NULL;
1387
KCM_STATS_INCR(mux->stats.rx_ready_drops);
1388
}
1389
1390
spin_unlock_bh(&mux->rx_lock);
1391
1392
write_unlock_bh(&csk->sk_callback_lock);
1393
1394
/* Call strp_done without sock lock */
1395
release_sock(csk);
1396
strp_done(&psock->strp);
1397
lock_sock(csk);
1398
1399
bpf_prog_put(psock->bpf_prog);
1400
1401
spin_lock_bh(&mux->lock);
1402
1403
aggregate_psock_stats(&psock->stats, &mux->aggregate_psock_stats);
1404
save_strp_stats(&psock->strp, &mux->aggregate_strp_stats);
1405
1406
KCM_STATS_INCR(mux->stats.psock_unattach);
1407
1408
if (psock->tx_kcm) {
1409
/* psock was reserved. Just mark it finished and we will clean
1410
* up in the kcm paths, we need kcm lock which can not be
1411
* acquired here.
1412
*/
1413
KCM_STATS_INCR(mux->stats.psock_unattach_rsvd);
1414
spin_unlock_bh(&mux->lock);
1415
1416
/* We are unattaching a socket that is reserved. Abort the
1417
* socket since we may be out of sync in sending on it. We need
1418
* to do this without the mux lock.
1419
*/
1420
kcm_abort_tx_psock(psock, EPIPE, false);
1421
1422
spin_lock_bh(&mux->lock);
1423
if (!psock->tx_kcm) {
1424
/* psock now unreserved in window mux was unlocked */
1425
goto no_reserved;
1426
}
1427
psock->done = 1;
1428
1429
/* Commit done before queuing work to process it */
1430
smp_mb();
1431
1432
/* Queue tx work to make sure psock->done is handled */
1433
queue_work(kcm_wq, &psock->tx_kcm->tx_work);
1434
spin_unlock_bh(&mux->lock);
1435
} else {
1436
no_reserved:
1437
if (!psock->tx_stopped)
1438
list_del(&psock->psock_avail_list);
1439
list_del(&psock->psock_list);
1440
mux->psocks_cnt--;
1441
spin_unlock_bh(&mux->lock);
1442
1443
sock_put(csk);
1444
fput(csk->sk_socket->file);
1445
kmem_cache_free(kcm_psockp, psock);
1446
}
1447
1448
release_sock(csk);
1449
}
1450
1451
static int kcm_unattach_ioctl(struct socket *sock, struct kcm_unattach *info)
1452
{
1453
struct kcm_sock *kcm = kcm_sk(sock->sk);
1454
struct kcm_mux *mux = kcm->mux;
1455
struct kcm_psock *psock;
1456
struct socket *csock;
1457
struct sock *csk;
1458
int err;
1459
1460
csock = sockfd_lookup(info->fd, &err);
1461
if (!csock)
1462
return -ENOENT;
1463
1464
csk = csock->sk;
1465
if (!csk) {
1466
err = -EINVAL;
1467
goto out;
1468
}
1469
1470
err = -ENOENT;
1471
1472
spin_lock_bh(&mux->lock);
1473
1474
list_for_each_entry(psock, &mux->psocks, psock_list) {
1475
if (psock->sk != csk)
1476
continue;
1477
1478
/* Found the matching psock */
1479
1480
if (psock->unattaching || WARN_ON(psock->done)) {
1481
err = -EALREADY;
1482
break;
1483
}
1484
1485
psock->unattaching = 1;
1486
1487
spin_unlock_bh(&mux->lock);
1488
1489
/* Lower socket lock should already be held */
1490
kcm_unattach(psock);
1491
1492
err = 0;
1493
goto out;
1494
}
1495
1496
spin_unlock_bh(&mux->lock);
1497
1498
out:
1499
sockfd_put(csock);
1500
return err;
1501
}
1502
1503
static struct proto kcm_proto = {
1504
.name = "KCM",
1505
.owner = THIS_MODULE,
1506
.obj_size = sizeof(struct kcm_sock),
1507
};
1508
1509
/* Clone a kcm socket. */
1510
static struct file *kcm_clone(struct socket *osock)
1511
{
1512
struct socket *newsock;
1513
struct sock *newsk;
1514
1515
newsock = sock_alloc();
1516
if (!newsock)
1517
return ERR_PTR(-ENFILE);
1518
1519
newsock->type = osock->type;
1520
newsock->ops = osock->ops;
1521
1522
__module_get(newsock->ops->owner);
1523
1524
newsk = sk_alloc(sock_net(osock->sk), PF_KCM, GFP_KERNEL,
1525
&kcm_proto, false);
1526
if (!newsk) {
1527
sock_release(newsock);
1528
return ERR_PTR(-ENOMEM);
1529
}
1530
sock_init_data(newsock, newsk);
1531
init_kcm_sock(kcm_sk(newsk), kcm_sk(osock->sk)->mux);
1532
1533
return sock_alloc_file(newsock, 0, osock->sk->sk_prot_creator->name);
1534
}
1535
1536
static int kcm_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg)
1537
{
1538
int err;
1539
1540
switch (cmd) {
1541
case SIOCKCMATTACH: {
1542
struct kcm_attach info;
1543
1544
if (copy_from_user(&info, (void __user *)arg, sizeof(info)))
1545
return -EFAULT;
1546
1547
err = kcm_attach_ioctl(sock, &info);
1548
1549
break;
1550
}
1551
case SIOCKCMUNATTACH: {
1552
struct kcm_unattach info;
1553
1554
if (copy_from_user(&info, (void __user *)arg, sizeof(info)))
1555
return -EFAULT;
1556
1557
err = kcm_unattach_ioctl(sock, &info);
1558
1559
break;
1560
}
1561
case SIOCKCMCLONE: {
1562
struct kcm_clone info;
1563
struct file *file;
1564
1565
info.fd = get_unused_fd_flags(0);
1566
if (unlikely(info.fd < 0))
1567
return info.fd;
1568
1569
file = kcm_clone(sock);
1570
if (IS_ERR(file)) {
1571
put_unused_fd(info.fd);
1572
return PTR_ERR(file);
1573
}
1574
if (copy_to_user((void __user *)arg, &info,
1575
sizeof(info))) {
1576
put_unused_fd(info.fd);
1577
fput(file);
1578
return -EFAULT;
1579
}
1580
fd_install(info.fd, file);
1581
err = 0;
1582
break;
1583
}
1584
default:
1585
err = -ENOIOCTLCMD;
1586
break;
1587
}
1588
1589
return err;
1590
}
1591
1592
static void release_mux(struct kcm_mux *mux)
1593
{
1594
struct kcm_net *knet = mux->knet;
1595
struct kcm_psock *psock, *tmp_psock;
1596
1597
/* Release psocks */
1598
list_for_each_entry_safe(psock, tmp_psock,
1599
&mux->psocks, psock_list) {
1600
if (!WARN_ON(psock->unattaching))
1601
kcm_unattach(psock);
1602
}
1603
1604
if (WARN_ON(mux->psocks_cnt))
1605
return;
1606
1607
__skb_queue_purge(&mux->rx_hold_queue);
1608
1609
mutex_lock(&knet->mutex);
1610
aggregate_mux_stats(&mux->stats, &knet->aggregate_mux_stats);
1611
aggregate_psock_stats(&mux->aggregate_psock_stats,
1612
&knet->aggregate_psock_stats);
1613
aggregate_strp_stats(&mux->aggregate_strp_stats,
1614
&knet->aggregate_strp_stats);
1615
list_del_rcu(&mux->kcm_mux_list);
1616
knet->count--;
1617
mutex_unlock(&knet->mutex);
1618
1619
kfree_rcu(mux, rcu);
1620
}
1621
1622
static void kcm_done(struct kcm_sock *kcm)
1623
{
1624
struct kcm_mux *mux = kcm->mux;
1625
struct sock *sk = &kcm->sk;
1626
int socks_cnt;
1627
1628
spin_lock_bh(&mux->rx_lock);
1629
if (kcm->rx_psock) {
1630
/* Cleanup in unreserve_rx_kcm */
1631
WARN_ON(kcm->done);
1632
kcm->rx_disabled = 1;
1633
kcm->done = 1;
1634
spin_unlock_bh(&mux->rx_lock);
1635
return;
1636
}
1637
1638
if (kcm->rx_wait) {
1639
list_del(&kcm->wait_rx_list);
1640
/* paired with lockless reads in kcm_rfree() */
1641
WRITE_ONCE(kcm->rx_wait, false);
1642
}
1643
/* Move any pending receive messages to other kcm sockets */
1644
requeue_rx_msgs(mux, &sk->sk_receive_queue);
1645
1646
spin_unlock_bh(&mux->rx_lock);
1647
1648
if (WARN_ON(sk_rmem_alloc_get(sk)))
1649
return;
1650
1651
/* Detach from MUX */
1652
spin_lock_bh(&mux->lock);
1653
1654
list_del(&kcm->kcm_sock_list);
1655
mux->kcm_socks_cnt--;
1656
socks_cnt = mux->kcm_socks_cnt;
1657
1658
spin_unlock_bh(&mux->lock);
1659
1660
if (!socks_cnt) {
1661
/* We are done with the mux now. */
1662
release_mux(mux);
1663
}
1664
1665
WARN_ON(kcm->rx_wait);
1666
1667
sock_put(&kcm->sk);
1668
}
1669
1670
/* Called by kcm_release to close a KCM socket.
1671
* If this is the last KCM socket on the MUX, destroy the MUX.
1672
*/
1673
static int kcm_release(struct socket *sock)
1674
{
1675
struct sock *sk = sock->sk;
1676
struct kcm_sock *kcm;
1677
struct kcm_mux *mux;
1678
struct kcm_psock *psock;
1679
1680
if (!sk)
1681
return 0;
1682
1683
kcm = kcm_sk(sk);
1684
mux = kcm->mux;
1685
1686
lock_sock(sk);
1687
sock_orphan(sk);
1688
kfree_skb(kcm->seq_skb);
1689
1690
/* Purge queue under lock to avoid race condition with tx_work trying
1691
* to act when queue is nonempty. If tx_work runs after this point
1692
* it will just return.
1693
*/
1694
__skb_queue_purge(&sk->sk_write_queue);
1695
1696
release_sock(sk);
1697
1698
spin_lock_bh(&mux->lock);
1699
if (kcm->tx_wait) {
1700
/* Take of tx_wait list, after this point there should be no way
1701
* that a psock will be assigned to this kcm.
1702
*/
1703
list_del(&kcm->wait_psock_list);
1704
kcm->tx_wait = false;
1705
}
1706
spin_unlock_bh(&mux->lock);
1707
1708
/* Cancel work. After this point there should be no outside references
1709
* to the kcm socket.
1710
*/
1711
disable_work_sync(&kcm->tx_work);
1712
1713
lock_sock(sk);
1714
psock = kcm->tx_psock;
1715
if (psock) {
1716
/* A psock was reserved, so we need to kill it since it
1717
* may already have some bytes queued from a message. We
1718
* need to do this after removing kcm from tx_wait list.
1719
*/
1720
kcm_abort_tx_psock(psock, EPIPE, false);
1721
unreserve_psock(kcm);
1722
}
1723
release_sock(sk);
1724
1725
WARN_ON(kcm->tx_wait);
1726
WARN_ON(kcm->tx_psock);
1727
1728
sock->sk = NULL;
1729
1730
kcm_done(kcm);
1731
1732
return 0;
1733
}
1734
1735
static const struct proto_ops kcm_dgram_ops = {
1736
.family = PF_KCM,
1737
.owner = THIS_MODULE,
1738
.release = kcm_release,
1739
.bind = sock_no_bind,
1740
.connect = sock_no_connect,
1741
.socketpair = sock_no_socketpair,
1742
.accept = sock_no_accept,
1743
.getname = sock_no_getname,
1744
.poll = datagram_poll,
1745
.ioctl = kcm_ioctl,
1746
.listen = sock_no_listen,
1747
.shutdown = sock_no_shutdown,
1748
.setsockopt = kcm_setsockopt,
1749
.getsockopt = kcm_getsockopt,
1750
.sendmsg = kcm_sendmsg,
1751
.recvmsg = kcm_recvmsg,
1752
.mmap = sock_no_mmap,
1753
.splice_eof = kcm_splice_eof,
1754
};
1755
1756
static const struct proto_ops kcm_seqpacket_ops = {
1757
.family = PF_KCM,
1758
.owner = THIS_MODULE,
1759
.release = kcm_release,
1760
.bind = sock_no_bind,
1761
.connect = sock_no_connect,
1762
.socketpair = sock_no_socketpair,
1763
.accept = sock_no_accept,
1764
.getname = sock_no_getname,
1765
.poll = datagram_poll,
1766
.ioctl = kcm_ioctl,
1767
.listen = sock_no_listen,
1768
.shutdown = sock_no_shutdown,
1769
.setsockopt = kcm_setsockopt,
1770
.getsockopt = kcm_getsockopt,
1771
.sendmsg = kcm_sendmsg,
1772
.recvmsg = kcm_recvmsg,
1773
.mmap = sock_no_mmap,
1774
.splice_eof = kcm_splice_eof,
1775
.splice_read = kcm_splice_read,
1776
};
1777
1778
/* Create proto operation for kcm sockets */
1779
static int kcm_create(struct net *net, struct socket *sock,
1780
int protocol, int kern)
1781
{
1782
struct kcm_net *knet = net_generic(net, kcm_net_id);
1783
struct sock *sk;
1784
struct kcm_mux *mux;
1785
1786
switch (sock->type) {
1787
case SOCK_DGRAM:
1788
sock->ops = &kcm_dgram_ops;
1789
break;
1790
case SOCK_SEQPACKET:
1791
sock->ops = &kcm_seqpacket_ops;
1792
break;
1793
default:
1794
return -ESOCKTNOSUPPORT;
1795
}
1796
1797
if (protocol != KCMPROTO_CONNECTED)
1798
return -EPROTONOSUPPORT;
1799
1800
sk = sk_alloc(net, PF_KCM, GFP_KERNEL, &kcm_proto, kern);
1801
if (!sk)
1802
return -ENOMEM;
1803
1804
/* Allocate a kcm mux, shared between KCM sockets */
1805
mux = kmem_cache_zalloc(kcm_muxp, GFP_KERNEL);
1806
if (!mux) {
1807
sk_free(sk);
1808
return -ENOMEM;
1809
}
1810
1811
spin_lock_init(&mux->lock);
1812
spin_lock_init(&mux->rx_lock);
1813
INIT_LIST_HEAD(&mux->kcm_socks);
1814
INIT_LIST_HEAD(&mux->kcm_rx_waiters);
1815
INIT_LIST_HEAD(&mux->kcm_tx_waiters);
1816
1817
INIT_LIST_HEAD(&mux->psocks);
1818
INIT_LIST_HEAD(&mux->psocks_ready);
1819
INIT_LIST_HEAD(&mux->psocks_avail);
1820
1821
mux->knet = knet;
1822
1823
/* Add new MUX to list */
1824
mutex_lock(&knet->mutex);
1825
list_add_rcu(&mux->kcm_mux_list, &knet->mux_list);
1826
knet->count++;
1827
mutex_unlock(&knet->mutex);
1828
1829
skb_queue_head_init(&mux->rx_hold_queue);
1830
1831
/* Init KCM socket */
1832
sock_init_data(sock, sk);
1833
init_kcm_sock(kcm_sk(sk), mux);
1834
1835
return 0;
1836
}
1837
1838
static const struct net_proto_family kcm_family_ops = {
1839
.family = PF_KCM,
1840
.create = kcm_create,
1841
.owner = THIS_MODULE,
1842
};
1843
1844
static __net_init int kcm_init_net(struct net *net)
1845
{
1846
struct kcm_net *knet = net_generic(net, kcm_net_id);
1847
1848
INIT_LIST_HEAD_RCU(&knet->mux_list);
1849
mutex_init(&knet->mutex);
1850
1851
return 0;
1852
}
1853
1854
static __net_exit void kcm_exit_net(struct net *net)
1855
{
1856
struct kcm_net *knet = net_generic(net, kcm_net_id);
1857
1858
/* All KCM sockets should be closed at this point, which should mean
1859
* that all multiplexors and psocks have been destroyed.
1860
*/
1861
WARN_ON(!list_empty(&knet->mux_list));
1862
1863
mutex_destroy(&knet->mutex);
1864
}
1865
1866
static struct pernet_operations kcm_net_ops = {
1867
.init = kcm_init_net,
1868
.exit = kcm_exit_net,
1869
.id = &kcm_net_id,
1870
.size = sizeof(struct kcm_net),
1871
};
1872
1873
static int __init kcm_init(void)
1874
{
1875
int err = -ENOMEM;
1876
1877
kcm_muxp = KMEM_CACHE(kcm_mux, SLAB_HWCACHE_ALIGN);
1878
if (!kcm_muxp)
1879
goto fail;
1880
1881
kcm_psockp = KMEM_CACHE(kcm_psock, SLAB_HWCACHE_ALIGN);
1882
if (!kcm_psockp)
1883
goto fail;
1884
1885
kcm_wq = create_singlethread_workqueue("kkcmd");
1886
if (!kcm_wq)
1887
goto fail;
1888
1889
err = proto_register(&kcm_proto, 1);
1890
if (err)
1891
goto fail;
1892
1893
err = register_pernet_device(&kcm_net_ops);
1894
if (err)
1895
goto net_ops_fail;
1896
1897
err = sock_register(&kcm_family_ops);
1898
if (err)
1899
goto sock_register_fail;
1900
1901
err = kcm_proc_init();
1902
if (err)
1903
goto proc_init_fail;
1904
1905
return 0;
1906
1907
proc_init_fail:
1908
sock_unregister(PF_KCM);
1909
1910
sock_register_fail:
1911
unregister_pernet_device(&kcm_net_ops);
1912
1913
net_ops_fail:
1914
proto_unregister(&kcm_proto);
1915
1916
fail:
1917
kmem_cache_destroy(kcm_muxp);
1918
kmem_cache_destroy(kcm_psockp);
1919
1920
if (kcm_wq)
1921
destroy_workqueue(kcm_wq);
1922
1923
return err;
1924
}
1925
1926
static void __exit kcm_exit(void)
1927
{
1928
kcm_proc_exit();
1929
sock_unregister(PF_KCM);
1930
unregister_pernet_device(&kcm_net_ops);
1931
proto_unregister(&kcm_proto);
1932
destroy_workqueue(kcm_wq);
1933
1934
kmem_cache_destroy(kcm_muxp);
1935
kmem_cache_destroy(kcm_psockp);
1936
}
1937
1938
module_init(kcm_init);
1939
module_exit(kcm_exit);
1940
1941
MODULE_LICENSE("GPL");
1942
MODULE_DESCRIPTION("KCM (Kernel Connection Multiplexor) sockets");
1943
MODULE_ALIAS_NETPROTO(PF_KCM);
1944
1945