Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/fs/dlm/lowcomms.c
26295 views
1
// SPDX-License-Identifier: GPL-2.0-only
2
/******************************************************************************
3
*******************************************************************************
4
**
5
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
6
** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved.
7
**
8
**
9
*******************************************************************************
10
******************************************************************************/
11
12
/*
13
* lowcomms.c
14
*
15
* This is the "low-level" comms layer.
16
*
17
* It is responsible for sending/receiving messages
18
* from other nodes in the cluster.
19
*
20
* Cluster nodes are referred to by their nodeids. nodeids are
21
* simply 32 bit numbers to the locking module - if they need to
22
* be expanded for the cluster infrastructure then that is its
23
* responsibility. It is this layer's
24
* responsibility to resolve these into IP address or
25
* whatever it needs for inter-node communication.
26
*
27
* The comms level is two kernel threads that deal mainly with
28
* the receiving of messages from other nodes and passing them
29
* up to the mid-level comms layer (which understands the
30
* message format) for execution by the locking core, and
31
* a send thread which does all the setting up of connections
32
* to remote nodes and the sending of data. Threads are not allowed
33
* to send their own data because it may cause them to wait in times
34
* of high load. Also, this way, the sending thread can collect together
35
* messages bound for one node and send them in one block.
36
*
37
* lowcomms will choose to use either TCP or SCTP as its transport layer
38
* depending on the configuration variable 'protocol'. This should be set
39
* to 0 (default) for TCP or 1 for SCTP. It should be configured using a
40
* cluster-wide mechanism as it must be the same on all nodes of the cluster
41
* for the DLM to function.
42
*
43
*/
44
45
#include <asm/ioctls.h>
46
#include <net/sock.h>
47
#include <net/tcp.h>
48
#include <linux/pagemap.h>
49
#include <linux/file.h>
50
#include <linux/mutex.h>
51
#include <linux/sctp.h>
52
#include <linux/slab.h>
53
#include <net/sctp/sctp.h>
54
#include <net/ipv6.h>
55
56
#include <trace/events/dlm.h>
57
#include <trace/events/sock.h>
58
59
#include "dlm_internal.h"
60
#include "lowcomms.h"
61
#include "midcomms.h"
62
#include "memory.h"
63
#include "config.h"
64
65
#define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(5000)
66
#define DLM_MAX_PROCESS_BUFFERS 24
67
#define NEEDED_RMEM (4*1024*1024)
68
69
struct connection {
70
struct socket *sock; /* NULL if not connected */
71
uint32_t nodeid; /* So we know who we are in the list */
72
/* this semaphore is used to allow parallel recv/send in read
73
* lock mode. When we release a sock we need to held the write lock.
74
*
75
* However this is locking code and not nice. When we remove the
76
* othercon handling we can look into other mechanism to synchronize
77
* io handling to call sock_release() at the right time.
78
*/
79
struct rw_semaphore sock_lock;
80
unsigned long flags;
81
#define CF_APP_LIMITED 0
82
#define CF_RECV_PENDING 1
83
#define CF_SEND_PENDING 2
84
#define CF_RECV_INTR 3
85
#define CF_IO_STOP 4
86
#define CF_IS_OTHERCON 5
87
struct list_head writequeue; /* List of outgoing writequeue_entries */
88
spinlock_t writequeue_lock;
89
int retries;
90
struct hlist_node list;
91
/* due some connect()/accept() races we currently have this cross over
92
* connection attempt second connection for one node.
93
*
94
* There is a solution to avoid the race by introducing a connect
95
* rule as e.g. our_nodeid > nodeid_to_connect who is allowed to
96
* connect. Otherside can connect but will only be considered that
97
* the other side wants to have a reconnect.
98
*
99
* However changing to this behaviour will break backwards compatible.
100
* In a DLM protocol major version upgrade we should remove this!
101
*/
102
struct connection *othercon;
103
struct work_struct rwork; /* receive worker */
104
struct work_struct swork; /* send worker */
105
wait_queue_head_t shutdown_wait;
106
unsigned char rx_leftover_buf[DLM_MAX_SOCKET_BUFSIZE];
107
int rx_leftover;
108
int mark;
109
int addr_count;
110
int curr_addr_index;
111
struct sockaddr_storage addr[DLM_MAX_ADDR_COUNT];
112
spinlock_t addrs_lock;
113
struct rcu_head rcu;
114
};
115
#define sock2con(x) ((struct connection *)(x)->sk_user_data)
116
117
struct listen_connection {
118
struct socket *sock;
119
struct work_struct rwork;
120
};
121
122
#define DLM_WQ_REMAIN_BYTES(e) (PAGE_SIZE - e->end)
123
#define DLM_WQ_LENGTH_BYTES(e) (e->end - e->offset)
124
125
/* An entry waiting to be sent */
126
struct writequeue_entry {
127
struct list_head list;
128
struct page *page;
129
int offset;
130
int len;
131
int end;
132
int users;
133
bool dirty;
134
struct connection *con;
135
struct list_head msgs;
136
struct kref ref;
137
};
138
139
struct dlm_msg {
140
struct writequeue_entry *entry;
141
struct dlm_msg *orig_msg;
142
bool retransmit;
143
void *ppc;
144
int len;
145
int idx; /* new()/commit() idx exchange */
146
147
struct list_head list;
148
struct kref ref;
149
};
150
151
struct processqueue_entry {
152
unsigned char *buf;
153
int nodeid;
154
int buflen;
155
156
struct list_head list;
157
};
158
159
struct dlm_proto_ops {
160
bool try_new_addr;
161
const char *name;
162
int proto;
163
int how;
164
165
void (*sockopts)(struct socket *sock);
166
int (*bind)(struct socket *sock);
167
int (*listen_validate)(void);
168
void (*listen_sockopts)(struct socket *sock);
169
int (*listen_bind)(struct socket *sock);
170
};
171
172
static struct listen_sock_callbacks {
173
void (*sk_error_report)(struct sock *);
174
void (*sk_data_ready)(struct sock *);
175
void (*sk_state_change)(struct sock *);
176
void (*sk_write_space)(struct sock *);
177
} listen_sock;
178
179
static struct listen_connection listen_con;
180
static struct sockaddr_storage dlm_local_addr[DLM_MAX_ADDR_COUNT];
181
static int dlm_local_count;
182
183
/* Work queues */
184
static struct workqueue_struct *io_workqueue;
185
static struct workqueue_struct *process_workqueue;
186
187
static struct hlist_head connection_hash[CONN_HASH_SIZE];
188
static DEFINE_SPINLOCK(connections_lock);
189
DEFINE_STATIC_SRCU(connections_srcu);
190
191
static const struct dlm_proto_ops *dlm_proto_ops;
192
193
#define DLM_IO_SUCCESS 0
194
#define DLM_IO_END 1
195
#define DLM_IO_EOF 2
196
#define DLM_IO_RESCHED 3
197
#define DLM_IO_FLUSH 4
198
199
static void process_recv_sockets(struct work_struct *work);
200
static void process_send_sockets(struct work_struct *work);
201
static void process_dlm_messages(struct work_struct *work);
202
203
static DECLARE_WORK(process_work, process_dlm_messages);
204
static DEFINE_SPINLOCK(processqueue_lock);
205
static bool process_dlm_messages_pending;
206
static DECLARE_WAIT_QUEUE_HEAD(processqueue_wq);
207
static atomic_t processqueue_count;
208
static LIST_HEAD(processqueue);
209
210
bool dlm_lowcomms_is_running(void)
211
{
212
return !!listen_con.sock;
213
}
214
215
static void lowcomms_queue_swork(struct connection *con)
216
{
217
assert_spin_locked(&con->writequeue_lock);
218
219
if (!test_bit(CF_IO_STOP, &con->flags) &&
220
!test_bit(CF_APP_LIMITED, &con->flags) &&
221
!test_and_set_bit(CF_SEND_PENDING, &con->flags))
222
queue_work(io_workqueue, &con->swork);
223
}
224
225
static void lowcomms_queue_rwork(struct connection *con)
226
{
227
#ifdef CONFIG_LOCKDEP
228
WARN_ON_ONCE(!lockdep_sock_is_held(con->sock->sk));
229
#endif
230
231
if (!test_bit(CF_IO_STOP, &con->flags) &&
232
!test_and_set_bit(CF_RECV_PENDING, &con->flags))
233
queue_work(io_workqueue, &con->rwork);
234
}
235
236
static void writequeue_entry_ctor(void *data)
237
{
238
struct writequeue_entry *entry = data;
239
240
INIT_LIST_HEAD(&entry->msgs);
241
}
242
243
struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void)
244
{
245
return kmem_cache_create("dlm_writequeue", sizeof(struct writequeue_entry),
246
0, 0, writequeue_entry_ctor);
247
}
248
249
struct kmem_cache *dlm_lowcomms_msg_cache_create(void)
250
{
251
return KMEM_CACHE(dlm_msg, 0);
252
}
253
254
/* need to held writequeue_lock */
255
static struct writequeue_entry *con_next_wq(struct connection *con)
256
{
257
struct writequeue_entry *e;
258
259
e = list_first_entry_or_null(&con->writequeue, struct writequeue_entry,
260
list);
261
/* if len is zero nothing is to send, if there are users filling
262
* buffers we wait until the users are done so we can send more.
263
*/
264
if (!e || e->users || e->len == 0)
265
return NULL;
266
267
return e;
268
}
269
270
static struct connection *__find_con(int nodeid, int r)
271
{
272
struct connection *con;
273
274
hlist_for_each_entry_rcu(con, &connection_hash[r], list) {
275
if (con->nodeid == nodeid)
276
return con;
277
}
278
279
return NULL;
280
}
281
282
static void dlm_con_init(struct connection *con, int nodeid)
283
{
284
con->nodeid = nodeid;
285
init_rwsem(&con->sock_lock);
286
INIT_LIST_HEAD(&con->writequeue);
287
spin_lock_init(&con->writequeue_lock);
288
INIT_WORK(&con->swork, process_send_sockets);
289
INIT_WORK(&con->rwork, process_recv_sockets);
290
spin_lock_init(&con->addrs_lock);
291
init_waitqueue_head(&con->shutdown_wait);
292
}
293
294
/*
295
* If 'allocation' is zero then we don't attempt to create a new
296
* connection structure for this node.
297
*/
298
static struct connection *nodeid2con(int nodeid, gfp_t alloc)
299
{
300
struct connection *con, *tmp;
301
int r;
302
303
r = nodeid_hash(nodeid);
304
con = __find_con(nodeid, r);
305
if (con || !alloc)
306
return con;
307
308
con = kzalloc(sizeof(*con), alloc);
309
if (!con)
310
return NULL;
311
312
dlm_con_init(con, nodeid);
313
314
spin_lock(&connections_lock);
315
/* Because multiple workqueues/threads calls this function it can
316
* race on multiple cpu's. Instead of locking hot path __find_con()
317
* we just check in rare cases of recently added nodes again
318
* under protection of connections_lock. If this is the case we
319
* abort our connection creation and return the existing connection.
320
*/
321
tmp = __find_con(nodeid, r);
322
if (tmp) {
323
spin_unlock(&connections_lock);
324
kfree(con);
325
return tmp;
326
}
327
328
hlist_add_head_rcu(&con->list, &connection_hash[r]);
329
spin_unlock(&connections_lock);
330
331
return con;
332
}
333
334
static int addr_compare(const struct sockaddr_storage *x,
335
const struct sockaddr_storage *y)
336
{
337
switch (x->ss_family) {
338
case AF_INET: {
339
struct sockaddr_in *sinx = (struct sockaddr_in *)x;
340
struct sockaddr_in *siny = (struct sockaddr_in *)y;
341
if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr)
342
return 0;
343
if (sinx->sin_port != siny->sin_port)
344
return 0;
345
break;
346
}
347
case AF_INET6: {
348
struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x;
349
struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y;
350
if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr))
351
return 0;
352
if (sinx->sin6_port != siny->sin6_port)
353
return 0;
354
break;
355
}
356
default:
357
return 0;
358
}
359
return 1;
360
}
361
362
static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
363
struct sockaddr *sa_out, bool try_new_addr,
364
unsigned int *mark)
365
{
366
struct sockaddr_storage sas;
367
struct connection *con;
368
int idx;
369
370
if (!dlm_local_count)
371
return -1;
372
373
idx = srcu_read_lock(&connections_srcu);
374
con = nodeid2con(nodeid, 0);
375
if (!con) {
376
srcu_read_unlock(&connections_srcu, idx);
377
return -ENOENT;
378
}
379
380
spin_lock(&con->addrs_lock);
381
if (!con->addr_count) {
382
spin_unlock(&con->addrs_lock);
383
srcu_read_unlock(&connections_srcu, idx);
384
return -ENOENT;
385
}
386
387
memcpy(&sas, &con->addr[con->curr_addr_index],
388
sizeof(struct sockaddr_storage));
389
390
if (try_new_addr) {
391
con->curr_addr_index++;
392
if (con->curr_addr_index == con->addr_count)
393
con->curr_addr_index = 0;
394
}
395
396
*mark = con->mark;
397
spin_unlock(&con->addrs_lock);
398
399
if (sas_out)
400
memcpy(sas_out, &sas, sizeof(struct sockaddr_storage));
401
402
if (!sa_out) {
403
srcu_read_unlock(&connections_srcu, idx);
404
return 0;
405
}
406
407
if (dlm_local_addr[0].ss_family == AF_INET) {
408
struct sockaddr_in *in4 = (struct sockaddr_in *) &sas;
409
struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out;
410
ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
411
} else {
412
struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &sas;
413
struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out;
414
ret6->sin6_addr = in6->sin6_addr;
415
}
416
417
srcu_read_unlock(&connections_srcu, idx);
418
return 0;
419
}
420
421
static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid,
422
unsigned int *mark)
423
{
424
struct connection *con;
425
int i, idx, addr_i;
426
427
idx = srcu_read_lock(&connections_srcu);
428
for (i = 0; i < CONN_HASH_SIZE; i++) {
429
hlist_for_each_entry_rcu(con, &connection_hash[i], list) {
430
WARN_ON_ONCE(!con->addr_count);
431
432
spin_lock(&con->addrs_lock);
433
for (addr_i = 0; addr_i < con->addr_count; addr_i++) {
434
if (addr_compare(&con->addr[addr_i], addr)) {
435
*nodeid = con->nodeid;
436
*mark = con->mark;
437
spin_unlock(&con->addrs_lock);
438
srcu_read_unlock(&connections_srcu, idx);
439
return 0;
440
}
441
}
442
spin_unlock(&con->addrs_lock);
443
}
444
}
445
srcu_read_unlock(&connections_srcu, idx);
446
447
return -ENOENT;
448
}
449
450
static bool dlm_lowcomms_con_has_addr(const struct connection *con,
451
const struct sockaddr_storage *addr)
452
{
453
int i;
454
455
for (i = 0; i < con->addr_count; i++) {
456
if (addr_compare(&con->addr[i], addr))
457
return true;
458
}
459
460
return false;
461
}
462
463
int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr)
464
{
465
struct connection *con;
466
bool ret;
467
int idx;
468
469
idx = srcu_read_lock(&connections_srcu);
470
con = nodeid2con(nodeid, GFP_NOFS);
471
if (!con) {
472
srcu_read_unlock(&connections_srcu, idx);
473
return -ENOMEM;
474
}
475
476
spin_lock(&con->addrs_lock);
477
if (!con->addr_count) {
478
memcpy(&con->addr[0], addr, sizeof(*addr));
479
con->addr_count = 1;
480
con->mark = dlm_config.ci_mark;
481
spin_unlock(&con->addrs_lock);
482
srcu_read_unlock(&connections_srcu, idx);
483
return 0;
484
}
485
486
ret = dlm_lowcomms_con_has_addr(con, addr);
487
if (ret) {
488
spin_unlock(&con->addrs_lock);
489
srcu_read_unlock(&connections_srcu, idx);
490
return -EEXIST;
491
}
492
493
if (con->addr_count >= DLM_MAX_ADDR_COUNT) {
494
spin_unlock(&con->addrs_lock);
495
srcu_read_unlock(&connections_srcu, idx);
496
return -ENOSPC;
497
}
498
499
memcpy(&con->addr[con->addr_count++], addr, sizeof(*addr));
500
srcu_read_unlock(&connections_srcu, idx);
501
spin_unlock(&con->addrs_lock);
502
return 0;
503
}
504
505
/* Data available on socket or listen socket received a connect */
506
static void lowcomms_data_ready(struct sock *sk)
507
{
508
struct connection *con = sock2con(sk);
509
510
trace_sk_data_ready(sk);
511
512
set_bit(CF_RECV_INTR, &con->flags);
513
lowcomms_queue_rwork(con);
514
}
515
516
static void lowcomms_write_space(struct sock *sk)
517
{
518
struct connection *con = sock2con(sk);
519
520
clear_bit(SOCK_NOSPACE, &con->sock->flags);
521
522
spin_lock_bh(&con->writequeue_lock);
523
if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
524
con->sock->sk->sk_write_pending--;
525
clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags);
526
}
527
528
lowcomms_queue_swork(con);
529
spin_unlock_bh(&con->writequeue_lock);
530
}
531
532
static void lowcomms_state_change(struct sock *sk)
533
{
534
/* SCTP layer is not calling sk_data_ready when the connection
535
* is done, so we catch the signal through here.
536
*/
537
if (sk->sk_shutdown & RCV_SHUTDOWN)
538
lowcomms_data_ready(sk);
539
}
540
541
static void lowcomms_listen_data_ready(struct sock *sk)
542
{
543
trace_sk_data_ready(sk);
544
545
queue_work(io_workqueue, &listen_con.rwork);
546
}
547
548
int dlm_lowcomms_connect_node(int nodeid)
549
{
550
struct connection *con;
551
int idx;
552
553
idx = srcu_read_lock(&connections_srcu);
554
con = nodeid2con(nodeid, 0);
555
if (WARN_ON_ONCE(!con)) {
556
srcu_read_unlock(&connections_srcu, idx);
557
return -ENOENT;
558
}
559
560
down_read(&con->sock_lock);
561
if (!con->sock) {
562
spin_lock_bh(&con->writequeue_lock);
563
lowcomms_queue_swork(con);
564
spin_unlock_bh(&con->writequeue_lock);
565
}
566
up_read(&con->sock_lock);
567
srcu_read_unlock(&connections_srcu, idx);
568
569
cond_resched();
570
return 0;
571
}
572
573
int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark)
574
{
575
struct connection *con;
576
int idx;
577
578
idx = srcu_read_lock(&connections_srcu);
579
con = nodeid2con(nodeid, 0);
580
if (!con) {
581
srcu_read_unlock(&connections_srcu, idx);
582
return -ENOENT;
583
}
584
585
spin_lock(&con->addrs_lock);
586
con->mark = mark;
587
spin_unlock(&con->addrs_lock);
588
srcu_read_unlock(&connections_srcu, idx);
589
return 0;
590
}
591
592
static void lowcomms_error_report(struct sock *sk)
593
{
594
struct connection *con = sock2con(sk);
595
struct inet_sock *inet;
596
597
inet = inet_sk(sk);
598
switch (sk->sk_family) {
599
case AF_INET:
600
printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
601
"sending to node %d at %pI4, dport %d, "
602
"sk_err=%d/%d\n", dlm_our_nodeid(),
603
con->nodeid, &inet->inet_daddr,
604
ntohs(inet->inet_dport), sk->sk_err,
605
READ_ONCE(sk->sk_err_soft));
606
break;
607
#if IS_ENABLED(CONFIG_IPV6)
608
case AF_INET6:
609
printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
610
"sending to node %d at %pI6c, "
611
"dport %d, sk_err=%d/%d\n", dlm_our_nodeid(),
612
con->nodeid, &sk->sk_v6_daddr,
613
ntohs(inet->inet_dport), sk->sk_err,
614
READ_ONCE(sk->sk_err_soft));
615
break;
616
#endif
617
default:
618
printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
619
"invalid socket family %d set, "
620
"sk_err=%d/%d\n", dlm_our_nodeid(),
621
sk->sk_family, sk->sk_err,
622
READ_ONCE(sk->sk_err_soft));
623
break;
624
}
625
626
dlm_midcomms_unack_msg_resend(con->nodeid);
627
628
listen_sock.sk_error_report(sk);
629
}
630
631
static void restore_callbacks(struct sock *sk)
632
{
633
#ifdef CONFIG_LOCKDEP
634
WARN_ON_ONCE(!lockdep_sock_is_held(sk));
635
#endif
636
637
sk->sk_user_data = NULL;
638
sk->sk_data_ready = listen_sock.sk_data_ready;
639
sk->sk_state_change = listen_sock.sk_state_change;
640
sk->sk_write_space = listen_sock.sk_write_space;
641
sk->sk_error_report = listen_sock.sk_error_report;
642
}
643
644
/* Make a socket active */
645
static void add_sock(struct socket *sock, struct connection *con)
646
{
647
struct sock *sk = sock->sk;
648
649
lock_sock(sk);
650
con->sock = sock;
651
652
sk->sk_user_data = con;
653
sk->sk_data_ready = lowcomms_data_ready;
654
sk->sk_write_space = lowcomms_write_space;
655
if (dlm_config.ci_protocol == DLM_PROTO_SCTP)
656
sk->sk_state_change = lowcomms_state_change;
657
sk->sk_allocation = GFP_NOFS;
658
sk->sk_use_task_frag = false;
659
sk->sk_error_report = lowcomms_error_report;
660
release_sock(sk);
661
}
662
663
/* Add the port number to an IPv6 or 4 sockaddr and return the address
664
length */
665
static void make_sockaddr(struct sockaddr_storage *saddr, __be16 port,
666
int *addr_len)
667
{
668
saddr->ss_family = dlm_local_addr[0].ss_family;
669
if (saddr->ss_family == AF_INET) {
670
struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
671
in4_addr->sin_port = port;
672
*addr_len = sizeof(struct sockaddr_in);
673
memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
674
} else {
675
struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
676
in6_addr->sin6_port = port;
677
*addr_len = sizeof(struct sockaddr_in6);
678
}
679
memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
680
}
681
682
static void dlm_page_release(struct kref *kref)
683
{
684
struct writequeue_entry *e = container_of(kref, struct writequeue_entry,
685
ref);
686
687
__free_page(e->page);
688
dlm_free_writequeue(e);
689
}
690
691
static void dlm_msg_release(struct kref *kref)
692
{
693
struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref);
694
695
kref_put(&msg->entry->ref, dlm_page_release);
696
dlm_free_msg(msg);
697
}
698
699
static void free_entry(struct writequeue_entry *e)
700
{
701
struct dlm_msg *msg, *tmp;
702
703
list_for_each_entry_safe(msg, tmp, &e->msgs, list) {
704
if (msg->orig_msg) {
705
msg->orig_msg->retransmit = false;
706
kref_put(&msg->orig_msg->ref, dlm_msg_release);
707
}
708
709
list_del(&msg->list);
710
kref_put(&msg->ref, dlm_msg_release);
711
}
712
713
list_del(&e->list);
714
kref_put(&e->ref, dlm_page_release);
715
}
716
717
static void dlm_close_sock(struct socket **sock)
718
{
719
lock_sock((*sock)->sk);
720
restore_callbacks((*sock)->sk);
721
release_sock((*sock)->sk);
722
723
sock_release(*sock);
724
*sock = NULL;
725
}
726
727
static void allow_connection_io(struct connection *con)
728
{
729
if (con->othercon)
730
clear_bit(CF_IO_STOP, &con->othercon->flags);
731
clear_bit(CF_IO_STOP, &con->flags);
732
}
733
734
static void stop_connection_io(struct connection *con)
735
{
736
if (con->othercon)
737
stop_connection_io(con->othercon);
738
739
spin_lock_bh(&con->writequeue_lock);
740
set_bit(CF_IO_STOP, &con->flags);
741
spin_unlock_bh(&con->writequeue_lock);
742
743
down_write(&con->sock_lock);
744
if (con->sock) {
745
lock_sock(con->sock->sk);
746
restore_callbacks(con->sock->sk);
747
release_sock(con->sock->sk);
748
}
749
up_write(&con->sock_lock);
750
751
cancel_work_sync(&con->swork);
752
cancel_work_sync(&con->rwork);
753
}
754
755
/* Close a remote connection and tidy up */
756
static void close_connection(struct connection *con, bool and_other)
757
{
758
struct writequeue_entry *e;
759
760
if (con->othercon && and_other)
761
close_connection(con->othercon, false);
762
763
down_write(&con->sock_lock);
764
if (!con->sock) {
765
up_write(&con->sock_lock);
766
return;
767
}
768
769
dlm_close_sock(&con->sock);
770
771
/* if we send a writequeue entry only a half way, we drop the
772
* whole entry because reconnection and that we not start of the
773
* middle of a msg which will confuse the other end.
774
*
775
* we can always drop messages because retransmits, but what we
776
* cannot allow is to transmit half messages which may be processed
777
* at the other side.
778
*
779
* our policy is to start on a clean state when disconnects, we don't
780
* know what's send/received on transport layer in this case.
781
*/
782
spin_lock_bh(&con->writequeue_lock);
783
if (!list_empty(&con->writequeue)) {
784
e = list_first_entry(&con->writequeue, struct writequeue_entry,
785
list);
786
if (e->dirty)
787
free_entry(e);
788
}
789
spin_unlock_bh(&con->writequeue_lock);
790
791
con->rx_leftover = 0;
792
con->retries = 0;
793
clear_bit(CF_APP_LIMITED, &con->flags);
794
clear_bit(CF_RECV_PENDING, &con->flags);
795
clear_bit(CF_SEND_PENDING, &con->flags);
796
up_write(&con->sock_lock);
797
}
798
799
static void shutdown_connection(struct connection *con, bool and_other)
800
{
801
int ret;
802
803
if (con->othercon && and_other)
804
shutdown_connection(con->othercon, false);
805
806
flush_workqueue(io_workqueue);
807
down_read(&con->sock_lock);
808
/* nothing to shutdown */
809
if (!con->sock) {
810
up_read(&con->sock_lock);
811
return;
812
}
813
814
ret = kernel_sock_shutdown(con->sock, dlm_proto_ops->how);
815
up_read(&con->sock_lock);
816
if (ret) {
817
log_print("Connection %p failed to shutdown: %d will force close",
818
con, ret);
819
goto force_close;
820
} else {
821
ret = wait_event_timeout(con->shutdown_wait, !con->sock,
822
DLM_SHUTDOWN_WAIT_TIMEOUT);
823
if (ret == 0) {
824
log_print("Connection %p shutdown timed out, will force close",
825
con);
826
goto force_close;
827
}
828
}
829
830
return;
831
832
force_close:
833
close_connection(con, false);
834
}
835
836
static struct processqueue_entry *new_processqueue_entry(int nodeid,
837
int buflen)
838
{
839
struct processqueue_entry *pentry;
840
841
pentry = kmalloc(sizeof(*pentry), GFP_NOFS);
842
if (!pentry)
843
return NULL;
844
845
pentry->buf = kmalloc(buflen, GFP_NOFS);
846
if (!pentry->buf) {
847
kfree(pentry);
848
return NULL;
849
}
850
851
pentry->nodeid = nodeid;
852
return pentry;
853
}
854
855
static void free_processqueue_entry(struct processqueue_entry *pentry)
856
{
857
kfree(pentry->buf);
858
kfree(pentry);
859
}
860
861
static void process_dlm_messages(struct work_struct *work)
862
{
863
struct processqueue_entry *pentry;
864
865
spin_lock_bh(&processqueue_lock);
866
pentry = list_first_entry_or_null(&processqueue,
867
struct processqueue_entry, list);
868
if (WARN_ON_ONCE(!pentry)) {
869
process_dlm_messages_pending = false;
870
spin_unlock_bh(&processqueue_lock);
871
return;
872
}
873
874
list_del(&pentry->list);
875
if (atomic_dec_and_test(&processqueue_count))
876
wake_up(&processqueue_wq);
877
spin_unlock_bh(&processqueue_lock);
878
879
for (;;) {
880
dlm_process_incoming_buffer(pentry->nodeid, pentry->buf,
881
pentry->buflen);
882
free_processqueue_entry(pentry);
883
884
spin_lock_bh(&processqueue_lock);
885
pentry = list_first_entry_or_null(&processqueue,
886
struct processqueue_entry, list);
887
if (!pentry) {
888
process_dlm_messages_pending = false;
889
spin_unlock_bh(&processqueue_lock);
890
break;
891
}
892
893
list_del(&pentry->list);
894
if (atomic_dec_and_test(&processqueue_count))
895
wake_up(&processqueue_wq);
896
spin_unlock_bh(&processqueue_lock);
897
}
898
}
899
900
/* Data received from remote end */
901
static int receive_from_sock(struct connection *con, int buflen)
902
{
903
struct processqueue_entry *pentry;
904
int ret, buflen_real;
905
struct msghdr msg;
906
struct kvec iov;
907
908
pentry = new_processqueue_entry(con->nodeid, buflen);
909
if (!pentry)
910
return DLM_IO_RESCHED;
911
912
memcpy(pentry->buf, con->rx_leftover_buf, con->rx_leftover);
913
914
/* calculate new buffer parameter regarding last receive and
915
* possible leftover bytes
916
*/
917
iov.iov_base = pentry->buf + con->rx_leftover;
918
iov.iov_len = buflen - con->rx_leftover;
919
920
memset(&msg, 0, sizeof(msg));
921
msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
922
clear_bit(CF_RECV_INTR, &con->flags);
923
again:
924
ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
925
msg.msg_flags);
926
trace_dlm_recv(con->nodeid, ret);
927
if (ret == -EAGAIN) {
928
lock_sock(con->sock->sk);
929
if (test_and_clear_bit(CF_RECV_INTR, &con->flags)) {
930
release_sock(con->sock->sk);
931
goto again;
932
}
933
934
clear_bit(CF_RECV_PENDING, &con->flags);
935
release_sock(con->sock->sk);
936
free_processqueue_entry(pentry);
937
return DLM_IO_END;
938
} else if (ret == 0) {
939
/* close will clear CF_RECV_PENDING */
940
free_processqueue_entry(pentry);
941
return DLM_IO_EOF;
942
} else if (ret < 0) {
943
free_processqueue_entry(pentry);
944
return ret;
945
}
946
947
/* new buflen according readed bytes and leftover from last receive */
948
buflen_real = ret + con->rx_leftover;
949
ret = dlm_validate_incoming_buffer(con->nodeid, pentry->buf,
950
buflen_real);
951
if (ret < 0) {
952
free_processqueue_entry(pentry);
953
return ret;
954
}
955
956
pentry->buflen = ret;
957
958
/* calculate leftover bytes from process and put it into begin of
959
* the receive buffer, so next receive we have the full message
960
* at the start address of the receive buffer.
961
*/
962
con->rx_leftover = buflen_real - ret;
963
memmove(con->rx_leftover_buf, pentry->buf + ret,
964
con->rx_leftover);
965
966
spin_lock_bh(&processqueue_lock);
967
ret = atomic_inc_return(&processqueue_count);
968
list_add_tail(&pentry->list, &processqueue);
969
if (!process_dlm_messages_pending) {
970
process_dlm_messages_pending = true;
971
queue_work(process_workqueue, &process_work);
972
}
973
spin_unlock_bh(&processqueue_lock);
974
975
if (ret > DLM_MAX_PROCESS_BUFFERS)
976
return DLM_IO_FLUSH;
977
978
return DLM_IO_SUCCESS;
979
}
980
981
/* Listening socket is busy, accept a connection */
982
static int accept_from_sock(void)
983
{
984
struct sockaddr_storage peeraddr;
985
int len, idx, result, nodeid;
986
struct connection *newcon;
987
struct socket *newsock;
988
unsigned int mark;
989
990
result = kernel_accept(listen_con.sock, &newsock, O_NONBLOCK);
991
if (result == -EAGAIN)
992
return DLM_IO_END;
993
else if (result < 0)
994
goto accept_err;
995
996
/* Get the connected socket's peer */
997
memset(&peeraddr, 0, sizeof(peeraddr));
998
len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2);
999
if (len < 0) {
1000
result = -ECONNABORTED;
1001
goto accept_err;
1002
}
1003
1004
/* Get the new node's NODEID */
1005
make_sockaddr(&peeraddr, 0, &len);
1006
if (addr_to_nodeid(&peeraddr, &nodeid, &mark)) {
1007
switch (peeraddr.ss_family) {
1008
case AF_INET: {
1009
struct sockaddr_in *sin = (struct sockaddr_in *)&peeraddr;
1010
1011
log_print("connect from non cluster IPv4 node %pI4",
1012
&sin->sin_addr);
1013
break;
1014
}
1015
#if IS_ENABLED(CONFIG_IPV6)
1016
case AF_INET6: {
1017
struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&peeraddr;
1018
1019
log_print("connect from non cluster IPv6 node %pI6c",
1020
&sin6->sin6_addr);
1021
break;
1022
}
1023
#endif
1024
default:
1025
log_print("invalid family from non cluster node");
1026
break;
1027
}
1028
1029
sock_release(newsock);
1030
return -1;
1031
}
1032
1033
log_print("got connection from %d", nodeid);
1034
1035
/* Check to see if we already have a connection to this node. This
1036
* could happen if the two nodes initiate a connection at roughly
1037
* the same time and the connections cross on the wire.
1038
* In this case we store the incoming one in "othercon"
1039
*/
1040
idx = srcu_read_lock(&connections_srcu);
1041
newcon = nodeid2con(nodeid, 0);
1042
if (WARN_ON_ONCE(!newcon)) {
1043
srcu_read_unlock(&connections_srcu, idx);
1044
result = -ENOENT;
1045
goto accept_err;
1046
}
1047
1048
sock_set_mark(newsock->sk, mark);
1049
1050
down_write(&newcon->sock_lock);
1051
if (newcon->sock) {
1052
struct connection *othercon = newcon->othercon;
1053
1054
if (!othercon) {
1055
othercon = kzalloc(sizeof(*othercon), GFP_NOFS);
1056
if (!othercon) {
1057
log_print("failed to allocate incoming socket");
1058
up_write(&newcon->sock_lock);
1059
srcu_read_unlock(&connections_srcu, idx);
1060
result = -ENOMEM;
1061
goto accept_err;
1062
}
1063
1064
dlm_con_init(othercon, nodeid);
1065
lockdep_set_subclass(&othercon->sock_lock, 1);
1066
newcon->othercon = othercon;
1067
set_bit(CF_IS_OTHERCON, &othercon->flags);
1068
} else {
1069
/* close other sock con if we have something new */
1070
close_connection(othercon, false);
1071
}
1072
1073
down_write(&othercon->sock_lock);
1074
add_sock(newsock, othercon);
1075
1076
/* check if we receved something while adding */
1077
lock_sock(othercon->sock->sk);
1078
lowcomms_queue_rwork(othercon);
1079
release_sock(othercon->sock->sk);
1080
up_write(&othercon->sock_lock);
1081
}
1082
else {
1083
/* accept copies the sk after we've saved the callbacks, so we
1084
don't want to save them a second time or comm errors will
1085
result in calling sk_error_report recursively. */
1086
add_sock(newsock, newcon);
1087
1088
/* check if we receved something while adding */
1089
lock_sock(newcon->sock->sk);
1090
lowcomms_queue_rwork(newcon);
1091
release_sock(newcon->sock->sk);
1092
}
1093
up_write(&newcon->sock_lock);
1094
srcu_read_unlock(&connections_srcu, idx);
1095
1096
return DLM_IO_SUCCESS;
1097
1098
accept_err:
1099
if (newsock)
1100
sock_release(newsock);
1101
1102
return result;
1103
}
1104
1105
/*
1106
* writequeue_entry_complete - try to delete and free write queue entry
1107
* @e: write queue entry to try to delete
1108
* @completed: bytes completed
1109
*
1110
* writequeue_lock must be held.
1111
*/
1112
static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
1113
{
1114
e->offset += completed;
1115
e->len -= completed;
1116
/* signal that page was half way transmitted */
1117
e->dirty = true;
1118
1119
if (e->len == 0 && e->users == 0)
1120
free_entry(e);
1121
}
1122
1123
/*
1124
* sctp_bind_addrs - bind a SCTP socket to all our addresses
1125
*/
1126
static int sctp_bind_addrs(struct socket *sock, __be16 port)
1127
{
1128
struct sockaddr_storage localaddr;
1129
struct sockaddr *addr = (struct sockaddr *)&localaddr;
1130
int i, addr_len, result = 0;
1131
1132
for (i = 0; i < dlm_local_count; i++) {
1133
memcpy(&localaddr, &dlm_local_addr[i], sizeof(localaddr));
1134
make_sockaddr(&localaddr, port, &addr_len);
1135
1136
if (!i)
1137
result = kernel_bind(sock, addr, addr_len);
1138
else
1139
result = sock_bind_add(sock->sk, addr, addr_len);
1140
1141
if (result < 0) {
1142
log_print("Can't bind to %d addr number %d, %d.\n",
1143
port, i + 1, result);
1144
break;
1145
}
1146
}
1147
return result;
1148
}
1149
1150
/* Get local addresses */
1151
static void init_local(void)
1152
{
1153
struct sockaddr_storage sas;
1154
int i;
1155
1156
dlm_local_count = 0;
1157
for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) {
1158
if (dlm_our_addr(&sas, i))
1159
break;
1160
1161
memcpy(&dlm_local_addr[dlm_local_count++], &sas, sizeof(sas));
1162
}
1163
}
1164
1165
static struct writequeue_entry *new_writequeue_entry(struct connection *con)
1166
{
1167
struct writequeue_entry *entry;
1168
1169
entry = dlm_allocate_writequeue();
1170
if (!entry)
1171
return NULL;
1172
1173
entry->page = alloc_page(GFP_ATOMIC | __GFP_ZERO);
1174
if (!entry->page) {
1175
dlm_free_writequeue(entry);
1176
return NULL;
1177
}
1178
1179
entry->offset = 0;
1180
entry->len = 0;
1181
entry->end = 0;
1182
entry->dirty = false;
1183
entry->con = con;
1184
entry->users = 1;
1185
kref_init(&entry->ref);
1186
return entry;
1187
}
1188
1189
static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
1190
char **ppc, void (*cb)(void *data),
1191
void *data)
1192
{
1193
struct writequeue_entry *e;
1194
1195
spin_lock_bh(&con->writequeue_lock);
1196
if (!list_empty(&con->writequeue)) {
1197
e = list_last_entry(&con->writequeue, struct writequeue_entry, list);
1198
if (DLM_WQ_REMAIN_BYTES(e) >= len) {
1199
kref_get(&e->ref);
1200
1201
*ppc = page_address(e->page) + e->end;
1202
if (cb)
1203
cb(data);
1204
1205
e->end += len;
1206
e->users++;
1207
goto out;
1208
}
1209
}
1210
1211
e = new_writequeue_entry(con);
1212
if (!e)
1213
goto out;
1214
1215
kref_get(&e->ref);
1216
*ppc = page_address(e->page);
1217
e->end += len;
1218
if (cb)
1219
cb(data);
1220
1221
list_add_tail(&e->list, &con->writequeue);
1222
1223
out:
1224
spin_unlock_bh(&con->writequeue_lock);
1225
return e;
1226
};
1227
1228
static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
1229
char **ppc, void (*cb)(void *data),
1230
void *data)
1231
{
1232
struct writequeue_entry *e;
1233
struct dlm_msg *msg;
1234
1235
msg = dlm_allocate_msg();
1236
if (!msg)
1237
return NULL;
1238
1239
kref_init(&msg->ref);
1240
1241
e = new_wq_entry(con, len, ppc, cb, data);
1242
if (!e) {
1243
dlm_free_msg(msg);
1244
return NULL;
1245
}
1246
1247
msg->retransmit = false;
1248
msg->orig_msg = NULL;
1249
msg->ppc = *ppc;
1250
msg->len = len;
1251
msg->entry = e;
1252
1253
return msg;
1254
}
1255
1256
/* avoid false positive for nodes_srcu, unlock happens in
1257
* dlm_lowcomms_commit_msg which is a must call if success
1258
*/
1259
#ifndef __CHECKER__
1260
struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, char **ppc,
1261
void (*cb)(void *data), void *data)
1262
{
1263
struct connection *con;
1264
struct dlm_msg *msg;
1265
int idx;
1266
1267
if (len > DLM_MAX_SOCKET_BUFSIZE ||
1268
len < sizeof(struct dlm_header)) {
1269
BUILD_BUG_ON(PAGE_SIZE < DLM_MAX_SOCKET_BUFSIZE);
1270
log_print("failed to allocate a buffer of size %d", len);
1271
WARN_ON_ONCE(1);
1272
return NULL;
1273
}
1274
1275
idx = srcu_read_lock(&connections_srcu);
1276
con = nodeid2con(nodeid, 0);
1277
if (WARN_ON_ONCE(!con)) {
1278
srcu_read_unlock(&connections_srcu, idx);
1279
return NULL;
1280
}
1281
1282
msg = dlm_lowcomms_new_msg_con(con, len, ppc, cb, data);
1283
if (!msg) {
1284
srcu_read_unlock(&connections_srcu, idx);
1285
return NULL;
1286
}
1287
1288
/* for dlm_lowcomms_commit_msg() */
1289
kref_get(&msg->ref);
1290
/* we assume if successful commit must called */
1291
msg->idx = idx;
1292
return msg;
1293
}
1294
#endif
1295
1296
static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg)
1297
{
1298
struct writequeue_entry *e = msg->entry;
1299
struct connection *con = e->con;
1300
int users;
1301
1302
spin_lock_bh(&con->writequeue_lock);
1303
kref_get(&msg->ref);
1304
list_add(&msg->list, &e->msgs);
1305
1306
users = --e->users;
1307
if (users)
1308
goto out;
1309
1310
e->len = DLM_WQ_LENGTH_BYTES(e);
1311
1312
lowcomms_queue_swork(con);
1313
1314
out:
1315
spin_unlock_bh(&con->writequeue_lock);
1316
return;
1317
}
1318
1319
/* avoid false positive for nodes_srcu, lock was happen in
1320
* dlm_lowcomms_new_msg
1321
*/
1322
#ifndef __CHECKER__
1323
void dlm_lowcomms_commit_msg(struct dlm_msg *msg)
1324
{
1325
_dlm_lowcomms_commit_msg(msg);
1326
srcu_read_unlock(&connections_srcu, msg->idx);
1327
/* because dlm_lowcomms_new_msg() */
1328
kref_put(&msg->ref, dlm_msg_release);
1329
}
1330
#endif
1331
1332
void dlm_lowcomms_put_msg(struct dlm_msg *msg)
1333
{
1334
kref_put(&msg->ref, dlm_msg_release);
1335
}
1336
1337
/* does not held connections_srcu, usage lowcomms_error_report only */
1338
int dlm_lowcomms_resend_msg(struct dlm_msg *msg)
1339
{
1340
struct dlm_msg *msg_resend;
1341
char *ppc;
1342
1343
if (msg->retransmit)
1344
return 1;
1345
1346
msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len, &ppc,
1347
NULL, NULL);
1348
if (!msg_resend)
1349
return -ENOMEM;
1350
1351
msg->retransmit = true;
1352
kref_get(&msg->ref);
1353
msg_resend->orig_msg = msg;
1354
1355
memcpy(ppc, msg->ppc, msg->len);
1356
_dlm_lowcomms_commit_msg(msg_resend);
1357
dlm_lowcomms_put_msg(msg_resend);
1358
1359
return 0;
1360
}
1361
1362
/* Send a message */
1363
static int send_to_sock(struct connection *con)
1364
{
1365
struct writequeue_entry *e;
1366
struct bio_vec bvec;
1367
struct msghdr msg = {
1368
.msg_flags = MSG_SPLICE_PAGES | MSG_DONTWAIT | MSG_NOSIGNAL,
1369
};
1370
int len, offset, ret;
1371
1372
spin_lock_bh(&con->writequeue_lock);
1373
e = con_next_wq(con);
1374
if (!e) {
1375
clear_bit(CF_SEND_PENDING, &con->flags);
1376
spin_unlock_bh(&con->writequeue_lock);
1377
return DLM_IO_END;
1378
}
1379
1380
len = e->len;
1381
offset = e->offset;
1382
WARN_ON_ONCE(len == 0 && e->users == 0);
1383
spin_unlock_bh(&con->writequeue_lock);
1384
1385
bvec_set_page(&bvec, e->page, len, offset);
1386
iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, &bvec, 1, len);
1387
ret = sock_sendmsg(con->sock, &msg);
1388
trace_dlm_send(con->nodeid, ret);
1389
if (ret == -EAGAIN || ret == 0) {
1390
lock_sock(con->sock->sk);
1391
spin_lock_bh(&con->writequeue_lock);
1392
if (test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
1393
!test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
1394
/* Notify TCP that we're limited by the
1395
* application window size.
1396
*/
1397
set_bit(SOCK_NOSPACE, &con->sock->sk->sk_socket->flags);
1398
con->sock->sk->sk_write_pending++;
1399
1400
clear_bit(CF_SEND_PENDING, &con->flags);
1401
spin_unlock_bh(&con->writequeue_lock);
1402
release_sock(con->sock->sk);
1403
1404
/* wait for write_space() event */
1405
return DLM_IO_END;
1406
}
1407
spin_unlock_bh(&con->writequeue_lock);
1408
release_sock(con->sock->sk);
1409
1410
return DLM_IO_RESCHED;
1411
} else if (ret < 0) {
1412
return ret;
1413
}
1414
1415
spin_lock_bh(&con->writequeue_lock);
1416
writequeue_entry_complete(e, ret);
1417
spin_unlock_bh(&con->writequeue_lock);
1418
1419
return DLM_IO_SUCCESS;
1420
}
1421
1422
static void clean_one_writequeue(struct connection *con)
1423
{
1424
struct writequeue_entry *e, *safe;
1425
1426
spin_lock_bh(&con->writequeue_lock);
1427
list_for_each_entry_safe(e, safe, &con->writequeue, list) {
1428
free_entry(e);
1429
}
1430
spin_unlock_bh(&con->writequeue_lock);
1431
}
1432
1433
static void connection_release(struct rcu_head *rcu)
1434
{
1435
struct connection *con = container_of(rcu, struct connection, rcu);
1436
1437
WARN_ON_ONCE(!list_empty(&con->writequeue));
1438
WARN_ON_ONCE(con->sock);
1439
kfree(con);
1440
}
1441
1442
/* Called from recovery when it knows that a node has
1443
left the cluster */
1444
int dlm_lowcomms_close(int nodeid)
1445
{
1446
struct connection *con;
1447
int idx;
1448
1449
log_print("closing connection to node %d", nodeid);
1450
1451
idx = srcu_read_lock(&connections_srcu);
1452
con = nodeid2con(nodeid, 0);
1453
if (WARN_ON_ONCE(!con)) {
1454
srcu_read_unlock(&connections_srcu, idx);
1455
return -ENOENT;
1456
}
1457
1458
stop_connection_io(con);
1459
log_print("io handling for node: %d stopped", nodeid);
1460
close_connection(con, true);
1461
1462
spin_lock(&connections_lock);
1463
hlist_del_rcu(&con->list);
1464
spin_unlock(&connections_lock);
1465
1466
clean_one_writequeue(con);
1467
call_srcu(&connections_srcu, &con->rcu, connection_release);
1468
if (con->othercon) {
1469
clean_one_writequeue(con->othercon);
1470
call_srcu(&connections_srcu, &con->othercon->rcu, connection_release);
1471
}
1472
srcu_read_unlock(&connections_srcu, idx);
1473
1474
/* for debugging we print when we are done to compare with other
1475
* messages in between. This function need to be correctly synchronized
1476
* with io handling
1477
*/
1478
log_print("closing connection to node %d done", nodeid);
1479
1480
return 0;
1481
}
1482
1483
/* Receive worker function */
1484
static void process_recv_sockets(struct work_struct *work)
1485
{
1486
struct connection *con = container_of(work, struct connection, rwork);
1487
int ret, buflen;
1488
1489
down_read(&con->sock_lock);
1490
if (!con->sock) {
1491
up_read(&con->sock_lock);
1492
return;
1493
}
1494
1495
buflen = READ_ONCE(dlm_config.ci_buffer_size);
1496
do {
1497
ret = receive_from_sock(con, buflen);
1498
} while (ret == DLM_IO_SUCCESS);
1499
up_read(&con->sock_lock);
1500
1501
switch (ret) {
1502
case DLM_IO_END:
1503
/* CF_RECV_PENDING cleared */
1504
break;
1505
case DLM_IO_EOF:
1506
close_connection(con, false);
1507
wake_up(&con->shutdown_wait);
1508
/* CF_RECV_PENDING cleared */
1509
break;
1510
case DLM_IO_FLUSH:
1511
/* we can't flush the process_workqueue here because a
1512
* WQ_MEM_RECLAIM workequeue can occurr a deadlock for a non
1513
* WQ_MEM_RECLAIM workqueue such as process_workqueue. Instead
1514
* we have a waitqueue to wait until all messages are
1515
* processed.
1516
*
1517
* This handling is only necessary to backoff the sender and
1518
* not queue all messages from the socket layer into DLM
1519
* processqueue. When DLM is capable to parse multiple messages
1520
* on an e.g. per socket basis this handling can might be
1521
* removed. Especially in a message burst we are too slow to
1522
* process messages and the queue will fill up memory.
1523
*/
1524
wait_event(processqueue_wq, !atomic_read(&processqueue_count));
1525
fallthrough;
1526
case DLM_IO_RESCHED:
1527
cond_resched();
1528
queue_work(io_workqueue, &con->rwork);
1529
/* CF_RECV_PENDING not cleared */
1530
break;
1531
default:
1532
if (ret < 0) {
1533
if (test_bit(CF_IS_OTHERCON, &con->flags)) {
1534
close_connection(con, false);
1535
} else {
1536
spin_lock_bh(&con->writequeue_lock);
1537
lowcomms_queue_swork(con);
1538
spin_unlock_bh(&con->writequeue_lock);
1539
}
1540
1541
/* CF_RECV_PENDING cleared for othercon
1542
* we trigger send queue if not already done
1543
* and process_send_sockets will handle it
1544
*/
1545
break;
1546
}
1547
1548
WARN_ON_ONCE(1);
1549
break;
1550
}
1551
}
1552
1553
static void process_listen_recv_socket(struct work_struct *work)
1554
{
1555
int ret;
1556
1557
if (WARN_ON_ONCE(!listen_con.sock))
1558
return;
1559
1560
do {
1561
ret = accept_from_sock();
1562
} while (ret == DLM_IO_SUCCESS);
1563
1564
if (ret < 0)
1565
log_print("critical error accepting connection: %d", ret);
1566
}
1567
1568
static int dlm_connect(struct connection *con)
1569
{
1570
struct sockaddr_storage addr;
1571
int result, addr_len;
1572
struct socket *sock;
1573
unsigned int mark;
1574
1575
memset(&addr, 0, sizeof(addr));
1576
result = nodeid_to_addr(con->nodeid, &addr, NULL,
1577
dlm_proto_ops->try_new_addr, &mark);
1578
if (result < 0) {
1579
log_print("no address for nodeid %d", con->nodeid);
1580
return result;
1581
}
1582
1583
/* Create a socket to communicate with */
1584
result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family,
1585
SOCK_STREAM, dlm_proto_ops->proto, &sock);
1586
if (result < 0)
1587
return result;
1588
1589
sock_set_mark(sock->sk, mark);
1590
dlm_proto_ops->sockopts(sock);
1591
1592
result = dlm_proto_ops->bind(sock);
1593
if (result < 0) {
1594
sock_release(sock);
1595
return result;
1596
}
1597
1598
add_sock(sock, con);
1599
1600
log_print_ratelimited("connecting to %d", con->nodeid);
1601
make_sockaddr(&addr, dlm_config.ci_tcp_port, &addr_len);
1602
result = kernel_connect(sock, (struct sockaddr *)&addr, addr_len, 0);
1603
switch (result) {
1604
case -EINPROGRESS:
1605
/* not an error */
1606
fallthrough;
1607
case 0:
1608
break;
1609
default:
1610
if (result < 0)
1611
dlm_close_sock(&con->sock);
1612
1613
break;
1614
}
1615
1616
return result;
1617
}
1618
1619
/* Send worker function */
1620
static void process_send_sockets(struct work_struct *work)
1621
{
1622
struct connection *con = container_of(work, struct connection, swork);
1623
int ret;
1624
1625
WARN_ON_ONCE(test_bit(CF_IS_OTHERCON, &con->flags));
1626
1627
down_read(&con->sock_lock);
1628
if (!con->sock) {
1629
up_read(&con->sock_lock);
1630
down_write(&con->sock_lock);
1631
if (!con->sock) {
1632
ret = dlm_connect(con);
1633
switch (ret) {
1634
case 0:
1635
break;
1636
default:
1637
/* CF_SEND_PENDING not cleared */
1638
up_write(&con->sock_lock);
1639
log_print("connect to node %d try %d error %d",
1640
con->nodeid, con->retries++, ret);
1641
msleep(1000);
1642
/* For now we try forever to reconnect. In
1643
* future we should send a event to cluster
1644
* manager to fence itself after certain amount
1645
* of retries.
1646
*/
1647
queue_work(io_workqueue, &con->swork);
1648
return;
1649
}
1650
}
1651
downgrade_write(&con->sock_lock);
1652
}
1653
1654
do {
1655
ret = send_to_sock(con);
1656
} while (ret == DLM_IO_SUCCESS);
1657
up_read(&con->sock_lock);
1658
1659
switch (ret) {
1660
case DLM_IO_END:
1661
/* CF_SEND_PENDING cleared */
1662
break;
1663
case DLM_IO_RESCHED:
1664
/* CF_SEND_PENDING not cleared */
1665
cond_resched();
1666
queue_work(io_workqueue, &con->swork);
1667
break;
1668
default:
1669
if (ret < 0) {
1670
close_connection(con, false);
1671
1672
/* CF_SEND_PENDING cleared */
1673
spin_lock_bh(&con->writequeue_lock);
1674
lowcomms_queue_swork(con);
1675
spin_unlock_bh(&con->writequeue_lock);
1676
break;
1677
}
1678
1679
WARN_ON_ONCE(1);
1680
break;
1681
}
1682
}
1683
1684
static void work_stop(void)
1685
{
1686
if (io_workqueue) {
1687
destroy_workqueue(io_workqueue);
1688
io_workqueue = NULL;
1689
}
1690
1691
if (process_workqueue) {
1692
destroy_workqueue(process_workqueue);
1693
process_workqueue = NULL;
1694
}
1695
}
1696
1697
static int work_start(void)
1698
{
1699
io_workqueue = alloc_workqueue("dlm_io", WQ_HIGHPRI | WQ_MEM_RECLAIM |
1700
WQ_UNBOUND, 0);
1701
if (!io_workqueue) {
1702
log_print("can't start dlm_io");
1703
return -ENOMEM;
1704
}
1705
1706
process_workqueue = alloc_workqueue("dlm_process", WQ_HIGHPRI | WQ_BH, 0);
1707
if (!process_workqueue) {
1708
log_print("can't start dlm_process");
1709
destroy_workqueue(io_workqueue);
1710
io_workqueue = NULL;
1711
return -ENOMEM;
1712
}
1713
1714
return 0;
1715
}
1716
1717
void dlm_lowcomms_shutdown(void)
1718
{
1719
struct connection *con;
1720
int i, idx;
1721
1722
/* stop lowcomms_listen_data_ready calls */
1723
lock_sock(listen_con.sock->sk);
1724
listen_con.sock->sk->sk_data_ready = listen_sock.sk_data_ready;
1725
release_sock(listen_con.sock->sk);
1726
1727
cancel_work_sync(&listen_con.rwork);
1728
dlm_close_sock(&listen_con.sock);
1729
1730
idx = srcu_read_lock(&connections_srcu);
1731
for (i = 0; i < CONN_HASH_SIZE; i++) {
1732
hlist_for_each_entry_rcu(con, &connection_hash[i], list) {
1733
shutdown_connection(con, true);
1734
stop_connection_io(con);
1735
flush_workqueue(process_workqueue);
1736
close_connection(con, true);
1737
1738
clean_one_writequeue(con);
1739
if (con->othercon)
1740
clean_one_writequeue(con->othercon);
1741
allow_connection_io(con);
1742
}
1743
}
1744
srcu_read_unlock(&connections_srcu, idx);
1745
}
1746
1747
void dlm_lowcomms_stop(void)
1748
{
1749
work_stop();
1750
dlm_proto_ops = NULL;
1751
}
1752
1753
static int dlm_listen_for_all(void)
1754
{
1755
struct socket *sock;
1756
int result;
1757
1758
log_print("Using %s for communications",
1759
dlm_proto_ops->name);
1760
1761
result = dlm_proto_ops->listen_validate();
1762
if (result < 0)
1763
return result;
1764
1765
result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family,
1766
SOCK_STREAM, dlm_proto_ops->proto, &sock);
1767
if (result < 0) {
1768
log_print("Can't create comms socket: %d", result);
1769
return result;
1770
}
1771
1772
sock_set_mark(sock->sk, dlm_config.ci_mark);
1773
dlm_proto_ops->listen_sockopts(sock);
1774
1775
result = dlm_proto_ops->listen_bind(sock);
1776
if (result < 0)
1777
goto out;
1778
1779
lock_sock(sock->sk);
1780
listen_sock.sk_data_ready = sock->sk->sk_data_ready;
1781
listen_sock.sk_write_space = sock->sk->sk_write_space;
1782
listen_sock.sk_error_report = sock->sk->sk_error_report;
1783
listen_sock.sk_state_change = sock->sk->sk_state_change;
1784
1785
listen_con.sock = sock;
1786
1787
sock->sk->sk_allocation = GFP_NOFS;
1788
sock->sk->sk_use_task_frag = false;
1789
sock->sk->sk_data_ready = lowcomms_listen_data_ready;
1790
release_sock(sock->sk);
1791
1792
result = sock->ops->listen(sock, 128);
1793
if (result < 0) {
1794
dlm_close_sock(&listen_con.sock);
1795
return result;
1796
}
1797
1798
return 0;
1799
1800
out:
1801
sock_release(sock);
1802
return result;
1803
}
1804
1805
static int dlm_tcp_bind(struct socket *sock)
1806
{
1807
struct sockaddr_storage src_addr;
1808
int result, addr_len;
1809
1810
/* Bind to our cluster-known address connecting to avoid
1811
* routing problems.
1812
*/
1813
memcpy(&src_addr, &dlm_local_addr[0], sizeof(src_addr));
1814
make_sockaddr(&src_addr, 0, &addr_len);
1815
1816
result = kernel_bind(sock, (struct sockaddr *)&src_addr,
1817
addr_len);
1818
if (result < 0) {
1819
/* This *may* not indicate a critical error */
1820
log_print("could not bind for connect: %d", result);
1821
}
1822
1823
return 0;
1824
}
1825
1826
static int dlm_tcp_listen_validate(void)
1827
{
1828
/* We don't support multi-homed hosts */
1829
if (dlm_local_count > 1) {
1830
log_print("Detect multi-homed hosts but use only the first IP address.");
1831
log_print("Try SCTP, if you want to enable multi-link.");
1832
}
1833
1834
return 0;
1835
}
1836
1837
static void dlm_tcp_sockopts(struct socket *sock)
1838
{
1839
/* Turn off Nagle's algorithm */
1840
tcp_sock_set_nodelay(sock->sk);
1841
}
1842
1843
static void dlm_tcp_listen_sockopts(struct socket *sock)
1844
{
1845
dlm_tcp_sockopts(sock);
1846
sock_set_reuseaddr(sock->sk);
1847
}
1848
1849
static int dlm_tcp_listen_bind(struct socket *sock)
1850
{
1851
int addr_len;
1852
1853
/* Bind to our port */
1854
make_sockaddr(&dlm_local_addr[0], dlm_config.ci_tcp_port, &addr_len);
1855
return kernel_bind(sock, (struct sockaddr *)&dlm_local_addr[0],
1856
addr_len);
1857
}
1858
1859
static const struct dlm_proto_ops dlm_tcp_ops = {
1860
.name = "TCP",
1861
.proto = IPPROTO_TCP,
1862
.how = SHUT_WR,
1863
.sockopts = dlm_tcp_sockopts,
1864
.bind = dlm_tcp_bind,
1865
.listen_validate = dlm_tcp_listen_validate,
1866
.listen_sockopts = dlm_tcp_listen_sockopts,
1867
.listen_bind = dlm_tcp_listen_bind,
1868
};
1869
1870
static int dlm_sctp_bind(struct socket *sock)
1871
{
1872
return sctp_bind_addrs(sock, 0);
1873
}
1874
1875
static int dlm_sctp_listen_validate(void)
1876
{
1877
if (!IS_ENABLED(CONFIG_IP_SCTP)) {
1878
log_print("SCTP is not enabled by this kernel");
1879
return -EOPNOTSUPP;
1880
}
1881
1882
request_module("sctp");
1883
return 0;
1884
}
1885
1886
static int dlm_sctp_bind_listen(struct socket *sock)
1887
{
1888
return sctp_bind_addrs(sock, dlm_config.ci_tcp_port);
1889
}
1890
1891
static void dlm_sctp_sockopts(struct socket *sock)
1892
{
1893
/* Turn off Nagle's algorithm */
1894
sctp_sock_set_nodelay(sock->sk);
1895
sock_set_rcvbuf(sock->sk, NEEDED_RMEM);
1896
}
1897
1898
static const struct dlm_proto_ops dlm_sctp_ops = {
1899
.name = "SCTP",
1900
.proto = IPPROTO_SCTP,
1901
.how = SHUT_RDWR,
1902
.try_new_addr = true,
1903
.sockopts = dlm_sctp_sockopts,
1904
.bind = dlm_sctp_bind,
1905
.listen_validate = dlm_sctp_listen_validate,
1906
.listen_sockopts = dlm_sctp_sockopts,
1907
.listen_bind = dlm_sctp_bind_listen,
1908
};
1909
1910
int dlm_lowcomms_start(void)
1911
{
1912
int error;
1913
1914
init_local();
1915
if (!dlm_local_count) {
1916
error = -ENOTCONN;
1917
log_print("no local IP address has been set");
1918
goto fail;
1919
}
1920
1921
error = work_start();
1922
if (error)
1923
goto fail;
1924
1925
/* Start listening */
1926
switch (dlm_config.ci_protocol) {
1927
case DLM_PROTO_TCP:
1928
dlm_proto_ops = &dlm_tcp_ops;
1929
break;
1930
case DLM_PROTO_SCTP:
1931
dlm_proto_ops = &dlm_sctp_ops;
1932
break;
1933
default:
1934
log_print("Invalid protocol identifier %d set",
1935
dlm_config.ci_protocol);
1936
error = -EINVAL;
1937
goto fail_proto_ops;
1938
}
1939
1940
error = dlm_listen_for_all();
1941
if (error)
1942
goto fail_listen;
1943
1944
return 0;
1945
1946
fail_listen:
1947
dlm_proto_ops = NULL;
1948
fail_proto_ops:
1949
work_stop();
1950
fail:
1951
return error;
1952
}
1953
1954
void dlm_lowcomms_init(void)
1955
{
1956
int i;
1957
1958
for (i = 0; i < CONN_HASH_SIZE; i++)
1959
INIT_HLIST_HEAD(&connection_hash[i]);
1960
1961
INIT_WORK(&listen_con.rwork, process_listen_recv_socket);
1962
}
1963
1964
void dlm_lowcomms_exit(void)
1965
{
1966
struct connection *con;
1967
int i, idx;
1968
1969
idx = srcu_read_lock(&connections_srcu);
1970
for (i = 0; i < CONN_HASH_SIZE; i++) {
1971
hlist_for_each_entry_rcu(con, &connection_hash[i], list) {
1972
spin_lock(&connections_lock);
1973
hlist_del_rcu(&con->list);
1974
spin_unlock(&connections_lock);
1975
1976
if (con->othercon)
1977
call_srcu(&connections_srcu, &con->othercon->rcu,
1978
connection_release);
1979
call_srcu(&connections_srcu, &con->rcu, connection_release);
1980
}
1981
}
1982
srcu_read_unlock(&connections_srcu, idx);
1983
}
1984
1985