Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/drivers/block/drbd/drbd_receiver.c
26282 views
1
// SPDX-License-Identifier: GPL-2.0-only
2
/*
3
drbd_receiver.c
4
5
This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6
7
Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
8
Copyright (C) 1999-2008, Philipp Reisner <[email protected]>.
9
Copyright (C) 2002-2008, Lars Ellenberg <[email protected]>.
10
11
*/
12
13
14
#include <linux/module.h>
15
16
#include <linux/uaccess.h>
17
#include <net/sock.h>
18
19
#include <linux/drbd.h>
20
#include <linux/fs.h>
21
#include <linux/file.h>
22
#include <linux/in.h>
23
#include <linux/mm.h>
24
#include <linux/memcontrol.h>
25
#include <linux/mm_inline.h>
26
#include <linux/slab.h>
27
#include <uapi/linux/sched/types.h>
28
#include <linux/sched/signal.h>
29
#include <linux/pkt_sched.h>
30
#include <linux/unistd.h>
31
#include <linux/vmalloc.h>
32
#include <linux/random.h>
33
#include <linux/string.h>
34
#include <linux/scatterlist.h>
35
#include <linux/part_stat.h>
36
#include <linux/mempool.h>
37
#include "drbd_int.h"
38
#include "drbd_protocol.h"
39
#include "drbd_req.h"
40
#include "drbd_vli.h"
41
42
#define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME|DRBD_FF_WZEROES)
43
44
struct packet_info {
45
enum drbd_packet cmd;
46
unsigned int size;
47
unsigned int vnr;
48
void *data;
49
};
50
51
enum finish_epoch {
52
FE_STILL_LIVE,
53
FE_DESTROYED,
54
FE_RECYCLED,
55
};
56
57
static int drbd_do_features(struct drbd_connection *connection);
58
static int drbd_do_auth(struct drbd_connection *connection);
59
static int drbd_disconnected(struct drbd_peer_device *);
60
static void conn_wait_active_ee_empty(struct drbd_connection *connection);
61
static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
62
static int e_end_block(struct drbd_work *, int);
63
64
65
#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
66
67
static struct page *__drbd_alloc_pages(unsigned int number)
68
{
69
struct page *page = NULL;
70
struct page *tmp = NULL;
71
unsigned int i = 0;
72
73
/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
74
* "criss-cross" setup, that might cause write-out on some other DRBD,
75
* which in turn might block on the other node at this very place. */
76
for (i = 0; i < number; i++) {
77
tmp = mempool_alloc(&drbd_buffer_page_pool, GFP_TRY);
78
if (!tmp)
79
goto fail;
80
set_page_private(tmp, (unsigned long)page);
81
page = tmp;
82
}
83
return page;
84
fail:
85
page_chain_for_each_safe(page, tmp) {
86
set_page_private(page, 0);
87
mempool_free(page, &drbd_buffer_page_pool);
88
}
89
return NULL;
90
}
91
92
/**
93
* drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
94
* @peer_device: DRBD device.
95
* @number: number of pages requested
96
* @retry: whether to retry, if not enough pages are available right now
97
*
98
* Tries to allocate number pages, first from our own page pool, then from
99
* the kernel.
100
* Possibly retry until DRBD frees sufficient pages somewhere else.
101
*
102
* If this allocation would exceed the max_buffers setting, we throttle
103
* allocation (schedule_timeout) to give the system some room to breathe.
104
*
105
* We do not use max-buffers as hard limit, because it could lead to
106
* congestion and further to a distributed deadlock during online-verify or
107
* (checksum based) resync, if the max-buffers, socket buffer sizes and
108
* resync-rate settings are mis-configured.
109
*
110
* Returns a page chain linked via page->private.
111
*/
112
struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
113
bool retry)
114
{
115
struct drbd_device *device = peer_device->device;
116
struct page *page;
117
struct net_conf *nc;
118
unsigned int mxb;
119
120
rcu_read_lock();
121
nc = rcu_dereference(peer_device->connection->net_conf);
122
mxb = nc ? nc->max_buffers : 1000000;
123
rcu_read_unlock();
124
125
if (atomic_read(&device->pp_in_use) >= mxb)
126
schedule_timeout_interruptible(HZ / 10);
127
page = __drbd_alloc_pages(number);
128
129
if (page)
130
atomic_add(number, &device->pp_in_use);
131
return page;
132
}
133
134
/* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
135
* Is also used from inside an other spin_lock_irq(&resource->req_lock);
136
* Either links the page chain back to the global pool,
137
* or returns all pages to the system. */
138
static void drbd_free_pages(struct drbd_device *device, struct page *page)
139
{
140
struct page *tmp;
141
int i = 0;
142
143
if (page == NULL)
144
return;
145
146
page_chain_for_each_safe(page, tmp) {
147
set_page_private(page, 0);
148
if (page_count(page) == 1)
149
mempool_free(page, &drbd_buffer_page_pool);
150
else
151
put_page(page);
152
i++;
153
}
154
i = atomic_sub_return(i, &device->pp_in_use);
155
if (i < 0)
156
drbd_warn(device, "ASSERTION FAILED: pp_in_use: %d < 0\n", i);
157
}
158
159
/*
160
You need to hold the req_lock:
161
_drbd_wait_ee_list_empty()
162
163
You must not have the req_lock:
164
drbd_free_peer_req()
165
drbd_alloc_peer_req()
166
drbd_free_peer_reqs()
167
drbd_ee_fix_bhs()
168
drbd_finish_peer_reqs()
169
drbd_clear_done_ee()
170
drbd_wait_ee_list_empty()
171
*/
172
173
/* normal: payload_size == request size (bi_size)
174
* w_same: payload_size == logical_block_size
175
* trim: payload_size == 0 */
176
struct drbd_peer_request *
177
drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
178
unsigned int request_size, unsigned int payload_size, gfp_t gfp_mask) __must_hold(local)
179
{
180
struct drbd_device *device = peer_device->device;
181
struct drbd_peer_request *peer_req;
182
struct page *page = NULL;
183
unsigned int nr_pages = PFN_UP(payload_size);
184
185
if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
186
return NULL;
187
188
peer_req = mempool_alloc(&drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
189
if (!peer_req) {
190
if (!(gfp_mask & __GFP_NOWARN))
191
drbd_err(device, "%s: allocation failed\n", __func__);
192
return NULL;
193
}
194
195
if (nr_pages) {
196
page = drbd_alloc_pages(peer_device, nr_pages,
197
gfpflags_allow_blocking(gfp_mask));
198
if (!page)
199
goto fail;
200
if (!mempool_is_saturated(&drbd_buffer_page_pool))
201
peer_req->flags |= EE_RELEASE_TO_MEMPOOL;
202
}
203
204
memset(peer_req, 0, sizeof(*peer_req));
205
INIT_LIST_HEAD(&peer_req->w.list);
206
drbd_clear_interval(&peer_req->i);
207
peer_req->i.size = request_size;
208
peer_req->i.sector = sector;
209
peer_req->submit_jif = jiffies;
210
peer_req->peer_device = peer_device;
211
peer_req->pages = page;
212
/*
213
* The block_id is opaque to the receiver. It is not endianness
214
* converted, and sent back to the sender unchanged.
215
*/
216
peer_req->block_id = id;
217
218
return peer_req;
219
220
fail:
221
mempool_free(peer_req, &drbd_ee_mempool);
222
return NULL;
223
}
224
225
void drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req)
226
{
227
might_sleep();
228
if (peer_req->flags & EE_HAS_DIGEST)
229
kfree(peer_req->digest);
230
drbd_free_pages(device, peer_req->pages);
231
D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
232
D_ASSERT(device, drbd_interval_empty(&peer_req->i));
233
if (!expect(device, !(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
234
peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
235
drbd_al_complete_io(device, &peer_req->i);
236
}
237
mempool_free(peer_req, &drbd_ee_mempool);
238
}
239
240
int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
241
{
242
LIST_HEAD(work_list);
243
struct drbd_peer_request *peer_req, *t;
244
int count = 0;
245
246
spin_lock_irq(&device->resource->req_lock);
247
list_splice_init(list, &work_list);
248
spin_unlock_irq(&device->resource->req_lock);
249
250
list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
251
drbd_free_peer_req(device, peer_req);
252
count++;
253
}
254
return count;
255
}
256
257
/*
258
* See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
259
*/
260
static int drbd_finish_peer_reqs(struct drbd_device *device)
261
{
262
LIST_HEAD(work_list);
263
struct drbd_peer_request *peer_req, *t;
264
int err = 0;
265
266
spin_lock_irq(&device->resource->req_lock);
267
list_splice_init(&device->done_ee, &work_list);
268
spin_unlock_irq(&device->resource->req_lock);
269
270
/* possible callbacks here:
271
* e_end_block, and e_end_resync_block, e_send_superseded.
272
* all ignore the last argument.
273
*/
274
list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
275
int err2;
276
277
/* list_del not necessary, next/prev members not touched */
278
err2 = peer_req->w.cb(&peer_req->w, !!err);
279
if (!err)
280
err = err2;
281
drbd_free_peer_req(device, peer_req);
282
}
283
wake_up(&device->ee_wait);
284
285
return err;
286
}
287
288
static void _drbd_wait_ee_list_empty(struct drbd_device *device,
289
struct list_head *head)
290
{
291
DEFINE_WAIT(wait);
292
293
/* avoids spin_lock/unlock
294
* and calling prepare_to_wait in the fast path */
295
while (!list_empty(head)) {
296
prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
297
spin_unlock_irq(&device->resource->req_lock);
298
io_schedule();
299
finish_wait(&device->ee_wait, &wait);
300
spin_lock_irq(&device->resource->req_lock);
301
}
302
}
303
304
static void drbd_wait_ee_list_empty(struct drbd_device *device,
305
struct list_head *head)
306
{
307
spin_lock_irq(&device->resource->req_lock);
308
_drbd_wait_ee_list_empty(device, head);
309
spin_unlock_irq(&device->resource->req_lock);
310
}
311
312
static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
313
{
314
struct kvec iov = {
315
.iov_base = buf,
316
.iov_len = size,
317
};
318
struct msghdr msg = {
319
.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
320
};
321
iov_iter_kvec(&msg.msg_iter, ITER_DEST, &iov, 1, size);
322
return sock_recvmsg(sock, &msg, msg.msg_flags);
323
}
324
325
static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
326
{
327
int rv;
328
329
rv = drbd_recv_short(connection->data.socket, buf, size, 0);
330
331
if (rv < 0) {
332
if (rv == -ECONNRESET)
333
drbd_info(connection, "sock was reset by peer\n");
334
else if (rv != -ERESTARTSYS)
335
drbd_err(connection, "sock_recvmsg returned %d\n", rv);
336
} else if (rv == 0) {
337
if (test_bit(DISCONNECT_SENT, &connection->flags)) {
338
long t;
339
rcu_read_lock();
340
t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
341
rcu_read_unlock();
342
343
t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
344
345
if (t)
346
goto out;
347
}
348
drbd_info(connection, "sock was shut down by peer\n");
349
}
350
351
if (rv != size)
352
conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
353
354
out:
355
return rv;
356
}
357
358
static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
359
{
360
int err;
361
362
err = drbd_recv(connection, buf, size);
363
if (err != size) {
364
if (err >= 0)
365
err = -EIO;
366
} else
367
err = 0;
368
return err;
369
}
370
371
static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
372
{
373
int err;
374
375
err = drbd_recv_all(connection, buf, size);
376
if (err && !signal_pending(current))
377
drbd_warn(connection, "short read (expected size %d)\n", (int)size);
378
return err;
379
}
380
381
/* quoting tcp(7):
382
* On individual connections, the socket buffer size must be set prior to the
383
* listen(2) or connect(2) calls in order to have it take effect.
384
* This is our wrapper to do so.
385
*/
386
static void drbd_setbufsize(struct socket *sock, unsigned int snd,
387
unsigned int rcv)
388
{
389
/* open coded SO_SNDBUF, SO_RCVBUF */
390
if (snd) {
391
sock->sk->sk_sndbuf = snd;
392
sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
393
}
394
if (rcv) {
395
sock->sk->sk_rcvbuf = rcv;
396
sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
397
}
398
}
399
400
static struct socket *drbd_try_connect(struct drbd_connection *connection)
401
{
402
const char *what;
403
struct socket *sock;
404
struct sockaddr_in6 src_in6;
405
struct sockaddr_in6 peer_in6;
406
struct net_conf *nc;
407
int err, peer_addr_len, my_addr_len;
408
int sndbuf_size, rcvbuf_size, connect_int;
409
int disconnect_on_error = 1;
410
411
rcu_read_lock();
412
nc = rcu_dereference(connection->net_conf);
413
if (!nc) {
414
rcu_read_unlock();
415
return NULL;
416
}
417
sndbuf_size = nc->sndbuf_size;
418
rcvbuf_size = nc->rcvbuf_size;
419
connect_int = nc->connect_int;
420
rcu_read_unlock();
421
422
my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
423
memcpy(&src_in6, &connection->my_addr, my_addr_len);
424
425
if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
426
src_in6.sin6_port = 0;
427
else
428
((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
429
430
peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
431
memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
432
433
what = "sock_create_kern";
434
err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
435
SOCK_STREAM, IPPROTO_TCP, &sock);
436
if (err < 0) {
437
sock = NULL;
438
goto out;
439
}
440
441
sock->sk->sk_rcvtimeo =
442
sock->sk->sk_sndtimeo = connect_int * HZ;
443
drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
444
445
/* explicitly bind to the configured IP as source IP
446
* for the outgoing connections.
447
* This is needed for multihomed hosts and to be
448
* able to use lo: interfaces for drbd.
449
* Make sure to use 0 as port number, so linux selects
450
* a free one dynamically.
451
*/
452
what = "bind before connect";
453
err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
454
if (err < 0)
455
goto out;
456
457
/* connect may fail, peer not yet available.
458
* stay C_WF_CONNECTION, don't go Disconnecting! */
459
disconnect_on_error = 0;
460
what = "connect";
461
err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
462
463
out:
464
if (err < 0) {
465
if (sock) {
466
sock_release(sock);
467
sock = NULL;
468
}
469
switch (-err) {
470
/* timeout, busy, signal pending */
471
case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
472
case EINTR: case ERESTARTSYS:
473
/* peer not (yet) available, network problem */
474
case ECONNREFUSED: case ENETUNREACH:
475
case EHOSTDOWN: case EHOSTUNREACH:
476
disconnect_on_error = 0;
477
break;
478
default:
479
drbd_err(connection, "%s failed, err = %d\n", what, err);
480
}
481
if (disconnect_on_error)
482
conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
483
}
484
485
return sock;
486
}
487
488
struct accept_wait_data {
489
struct drbd_connection *connection;
490
struct socket *s_listen;
491
struct completion door_bell;
492
void (*original_sk_state_change)(struct sock *sk);
493
494
};
495
496
static void drbd_incoming_connection(struct sock *sk)
497
{
498
struct accept_wait_data *ad = sk->sk_user_data;
499
void (*state_change)(struct sock *sk);
500
501
state_change = ad->original_sk_state_change;
502
if (sk->sk_state == TCP_ESTABLISHED)
503
complete(&ad->door_bell);
504
state_change(sk);
505
}
506
507
static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
508
{
509
int err, sndbuf_size, rcvbuf_size, my_addr_len;
510
struct sockaddr_in6 my_addr;
511
struct socket *s_listen;
512
struct net_conf *nc;
513
const char *what;
514
515
rcu_read_lock();
516
nc = rcu_dereference(connection->net_conf);
517
if (!nc) {
518
rcu_read_unlock();
519
return -EIO;
520
}
521
sndbuf_size = nc->sndbuf_size;
522
rcvbuf_size = nc->rcvbuf_size;
523
rcu_read_unlock();
524
525
my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
526
memcpy(&my_addr, &connection->my_addr, my_addr_len);
527
528
what = "sock_create_kern";
529
err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
530
SOCK_STREAM, IPPROTO_TCP, &s_listen);
531
if (err) {
532
s_listen = NULL;
533
goto out;
534
}
535
536
s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
537
drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
538
539
what = "bind before listen";
540
err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
541
if (err < 0)
542
goto out;
543
544
ad->s_listen = s_listen;
545
write_lock_bh(&s_listen->sk->sk_callback_lock);
546
ad->original_sk_state_change = s_listen->sk->sk_state_change;
547
s_listen->sk->sk_state_change = drbd_incoming_connection;
548
s_listen->sk->sk_user_data = ad;
549
write_unlock_bh(&s_listen->sk->sk_callback_lock);
550
551
what = "listen";
552
err = s_listen->ops->listen(s_listen, 5);
553
if (err < 0)
554
goto out;
555
556
return 0;
557
out:
558
if (s_listen)
559
sock_release(s_listen);
560
if (err < 0) {
561
if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
562
drbd_err(connection, "%s failed, err = %d\n", what, err);
563
conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
564
}
565
}
566
567
return -EIO;
568
}
569
570
static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
571
{
572
write_lock_bh(&sk->sk_callback_lock);
573
sk->sk_state_change = ad->original_sk_state_change;
574
sk->sk_user_data = NULL;
575
write_unlock_bh(&sk->sk_callback_lock);
576
}
577
578
static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
579
{
580
int timeo, connect_int, err = 0;
581
struct socket *s_estab = NULL;
582
struct net_conf *nc;
583
584
rcu_read_lock();
585
nc = rcu_dereference(connection->net_conf);
586
if (!nc) {
587
rcu_read_unlock();
588
return NULL;
589
}
590
connect_int = nc->connect_int;
591
rcu_read_unlock();
592
593
timeo = connect_int * HZ;
594
/* 28.5% random jitter */
595
timeo += get_random_u32_below(2) ? timeo / 7 : -timeo / 7;
596
597
err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
598
if (err <= 0)
599
return NULL;
600
601
err = kernel_accept(ad->s_listen, &s_estab, 0);
602
if (err < 0) {
603
if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
604
drbd_err(connection, "accept failed, err = %d\n", err);
605
conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
606
}
607
}
608
609
if (s_estab)
610
unregister_state_change(s_estab->sk, ad);
611
612
return s_estab;
613
}
614
615
static int decode_header(struct drbd_connection *, void *, struct packet_info *);
616
617
static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
618
enum drbd_packet cmd)
619
{
620
if (!conn_prepare_command(connection, sock))
621
return -EIO;
622
return conn_send_command(connection, sock, cmd, 0, NULL, 0);
623
}
624
625
static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
626
{
627
unsigned int header_size = drbd_header_size(connection);
628
struct packet_info pi;
629
struct net_conf *nc;
630
int err;
631
632
rcu_read_lock();
633
nc = rcu_dereference(connection->net_conf);
634
if (!nc) {
635
rcu_read_unlock();
636
return -EIO;
637
}
638
sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
639
rcu_read_unlock();
640
641
err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
642
if (err != header_size) {
643
if (err >= 0)
644
err = -EIO;
645
return err;
646
}
647
err = decode_header(connection, connection->data.rbuf, &pi);
648
if (err)
649
return err;
650
return pi.cmd;
651
}
652
653
/**
654
* drbd_socket_okay() - Free the socket if its connection is not okay
655
* @sock: pointer to the pointer to the socket.
656
*/
657
static bool drbd_socket_okay(struct socket **sock)
658
{
659
int rr;
660
char tb[4];
661
662
if (!*sock)
663
return false;
664
665
rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
666
667
if (rr > 0 || rr == -EAGAIN) {
668
return true;
669
} else {
670
sock_release(*sock);
671
*sock = NULL;
672
return false;
673
}
674
}
675
676
static bool connection_established(struct drbd_connection *connection,
677
struct socket **sock1,
678
struct socket **sock2)
679
{
680
struct net_conf *nc;
681
int timeout;
682
bool ok;
683
684
if (!*sock1 || !*sock2)
685
return false;
686
687
rcu_read_lock();
688
nc = rcu_dereference(connection->net_conf);
689
timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
690
rcu_read_unlock();
691
schedule_timeout_interruptible(timeout);
692
693
ok = drbd_socket_okay(sock1);
694
ok = drbd_socket_okay(sock2) && ok;
695
696
return ok;
697
}
698
699
/* Gets called if a connection is established, or if a new minor gets created
700
in a connection */
701
int drbd_connected(struct drbd_peer_device *peer_device)
702
{
703
struct drbd_device *device = peer_device->device;
704
int err;
705
706
atomic_set(&device->packet_seq, 0);
707
device->peer_seq = 0;
708
709
device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
710
&peer_device->connection->cstate_mutex :
711
&device->own_state_mutex;
712
713
err = drbd_send_sync_param(peer_device);
714
if (!err)
715
err = drbd_send_sizes(peer_device, 0, 0);
716
if (!err)
717
err = drbd_send_uuids(peer_device);
718
if (!err)
719
err = drbd_send_current_state(peer_device);
720
clear_bit(USE_DEGR_WFC_T, &device->flags);
721
clear_bit(RESIZE_PENDING, &device->flags);
722
atomic_set(&device->ap_in_flight, 0);
723
mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
724
return err;
725
}
726
727
/*
728
* return values:
729
* 1 yes, we have a valid connection
730
* 0 oops, did not work out, please try again
731
* -1 peer talks different language,
732
* no point in trying again, please go standalone.
733
* -2 We do not have a network config...
734
*/
735
static int conn_connect(struct drbd_connection *connection)
736
{
737
struct drbd_socket sock, msock;
738
struct drbd_peer_device *peer_device;
739
struct net_conf *nc;
740
int vnr, timeout, h;
741
bool discard_my_data, ok;
742
enum drbd_state_rv rv;
743
struct accept_wait_data ad = {
744
.connection = connection,
745
.door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
746
};
747
748
clear_bit(DISCONNECT_SENT, &connection->flags);
749
if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
750
return -2;
751
752
mutex_init(&sock.mutex);
753
sock.sbuf = connection->data.sbuf;
754
sock.rbuf = connection->data.rbuf;
755
sock.socket = NULL;
756
mutex_init(&msock.mutex);
757
msock.sbuf = connection->meta.sbuf;
758
msock.rbuf = connection->meta.rbuf;
759
msock.socket = NULL;
760
761
/* Assume that the peer only understands protocol 80 until we know better. */
762
connection->agreed_pro_version = 80;
763
764
if (prepare_listen_socket(connection, &ad))
765
return 0;
766
767
do {
768
struct socket *s;
769
770
s = drbd_try_connect(connection);
771
if (s) {
772
if (!sock.socket) {
773
sock.socket = s;
774
send_first_packet(connection, &sock, P_INITIAL_DATA);
775
} else if (!msock.socket) {
776
clear_bit(RESOLVE_CONFLICTS, &connection->flags);
777
msock.socket = s;
778
send_first_packet(connection, &msock, P_INITIAL_META);
779
} else {
780
drbd_err(connection, "Logic error in conn_connect()\n");
781
goto out_release_sockets;
782
}
783
}
784
785
if (connection_established(connection, &sock.socket, &msock.socket))
786
break;
787
788
retry:
789
s = drbd_wait_for_connect(connection, &ad);
790
if (s) {
791
int fp = receive_first_packet(connection, s);
792
drbd_socket_okay(&sock.socket);
793
drbd_socket_okay(&msock.socket);
794
switch (fp) {
795
case P_INITIAL_DATA:
796
if (sock.socket) {
797
drbd_warn(connection, "initial packet S crossed\n");
798
sock_release(sock.socket);
799
sock.socket = s;
800
goto randomize;
801
}
802
sock.socket = s;
803
break;
804
case P_INITIAL_META:
805
set_bit(RESOLVE_CONFLICTS, &connection->flags);
806
if (msock.socket) {
807
drbd_warn(connection, "initial packet M crossed\n");
808
sock_release(msock.socket);
809
msock.socket = s;
810
goto randomize;
811
}
812
msock.socket = s;
813
break;
814
default:
815
drbd_warn(connection, "Error receiving initial packet\n");
816
sock_release(s);
817
randomize:
818
if (get_random_u32_below(2))
819
goto retry;
820
}
821
}
822
823
if (connection->cstate <= C_DISCONNECTING)
824
goto out_release_sockets;
825
if (signal_pending(current)) {
826
flush_signals(current);
827
smp_rmb();
828
if (get_t_state(&connection->receiver) == EXITING)
829
goto out_release_sockets;
830
}
831
832
ok = connection_established(connection, &sock.socket, &msock.socket);
833
} while (!ok);
834
835
if (ad.s_listen)
836
sock_release(ad.s_listen);
837
838
sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
839
msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
840
841
sock.socket->sk->sk_allocation = GFP_NOIO;
842
msock.socket->sk->sk_allocation = GFP_NOIO;
843
844
sock.socket->sk->sk_use_task_frag = false;
845
msock.socket->sk->sk_use_task_frag = false;
846
847
sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
848
msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
849
850
/* NOT YET ...
851
* sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
852
* sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
853
* first set it to the P_CONNECTION_FEATURES timeout,
854
* which we set to 4x the configured ping_timeout. */
855
rcu_read_lock();
856
nc = rcu_dereference(connection->net_conf);
857
858
sock.socket->sk->sk_sndtimeo =
859
sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
860
861
msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
862
timeout = nc->timeout * HZ / 10;
863
discard_my_data = nc->discard_my_data;
864
rcu_read_unlock();
865
866
msock.socket->sk->sk_sndtimeo = timeout;
867
868
/* we don't want delays.
869
* we use TCP_CORK where appropriate, though */
870
tcp_sock_set_nodelay(sock.socket->sk);
871
tcp_sock_set_nodelay(msock.socket->sk);
872
873
connection->data.socket = sock.socket;
874
connection->meta.socket = msock.socket;
875
connection->last_received = jiffies;
876
877
h = drbd_do_features(connection);
878
if (h <= 0)
879
return h;
880
881
if (connection->cram_hmac_tfm) {
882
/* drbd_request_state(device, NS(conn, WFAuth)); */
883
switch (drbd_do_auth(connection)) {
884
case -1:
885
drbd_err(connection, "Authentication of peer failed\n");
886
return -1;
887
case 0:
888
drbd_err(connection, "Authentication of peer failed, trying again.\n");
889
return 0;
890
}
891
}
892
893
connection->data.socket->sk->sk_sndtimeo = timeout;
894
connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
895
896
if (drbd_send_protocol(connection) == -EOPNOTSUPP)
897
return -1;
898
899
/* Prevent a race between resync-handshake and
900
* being promoted to Primary.
901
*
902
* Grab and release the state mutex, so we know that any current
903
* drbd_set_role() is finished, and any incoming drbd_set_role
904
* will see the STATE_SENT flag, and wait for it to be cleared.
905
*/
906
idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
907
mutex_lock(peer_device->device->state_mutex);
908
909
/* avoid a race with conn_request_state( C_DISCONNECTING ) */
910
spin_lock_irq(&connection->resource->req_lock);
911
set_bit(STATE_SENT, &connection->flags);
912
spin_unlock_irq(&connection->resource->req_lock);
913
914
idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
915
mutex_unlock(peer_device->device->state_mutex);
916
917
rcu_read_lock();
918
idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
919
struct drbd_device *device = peer_device->device;
920
kref_get(&device->kref);
921
rcu_read_unlock();
922
923
if (discard_my_data)
924
set_bit(DISCARD_MY_DATA, &device->flags);
925
else
926
clear_bit(DISCARD_MY_DATA, &device->flags);
927
928
drbd_connected(peer_device);
929
kref_put(&device->kref, drbd_destroy_device);
930
rcu_read_lock();
931
}
932
rcu_read_unlock();
933
934
rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
935
if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
936
clear_bit(STATE_SENT, &connection->flags);
937
return 0;
938
}
939
940
drbd_thread_start(&connection->ack_receiver);
941
/* opencoded create_singlethread_workqueue(),
942
* to be able to use format string arguments */
943
connection->ack_sender =
944
alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
945
if (!connection->ack_sender) {
946
drbd_err(connection, "Failed to create workqueue ack_sender\n");
947
return 0;
948
}
949
950
mutex_lock(&connection->resource->conf_update);
951
/* The discard_my_data flag is a single-shot modifier to the next
952
* connection attempt, the handshake of which is now well underway.
953
* No need for rcu style copying of the whole struct
954
* just to clear a single value. */
955
connection->net_conf->discard_my_data = 0;
956
mutex_unlock(&connection->resource->conf_update);
957
958
return h;
959
960
out_release_sockets:
961
if (ad.s_listen)
962
sock_release(ad.s_listen);
963
if (sock.socket)
964
sock_release(sock.socket);
965
if (msock.socket)
966
sock_release(msock.socket);
967
return -1;
968
}
969
970
static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
971
{
972
unsigned int header_size = drbd_header_size(connection);
973
974
if (header_size == sizeof(struct p_header100) &&
975
*(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
976
struct p_header100 *h = header;
977
if (h->pad != 0) {
978
drbd_err(connection, "Header padding is not zero\n");
979
return -EINVAL;
980
}
981
pi->vnr = be16_to_cpu(h->volume);
982
pi->cmd = be16_to_cpu(h->command);
983
pi->size = be32_to_cpu(h->length);
984
} else if (header_size == sizeof(struct p_header95) &&
985
*(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
986
struct p_header95 *h = header;
987
pi->cmd = be16_to_cpu(h->command);
988
pi->size = be32_to_cpu(h->length);
989
pi->vnr = 0;
990
} else if (header_size == sizeof(struct p_header80) &&
991
*(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
992
struct p_header80 *h = header;
993
pi->cmd = be16_to_cpu(h->command);
994
pi->size = be16_to_cpu(h->length);
995
pi->vnr = 0;
996
} else {
997
drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
998
be32_to_cpu(*(__be32 *)header),
999
connection->agreed_pro_version);
1000
return -EINVAL;
1001
}
1002
pi->data = header + header_size;
1003
return 0;
1004
}
1005
1006
static void drbd_unplug_all_devices(struct drbd_connection *connection)
1007
{
1008
if (current->plug == &connection->receiver_plug) {
1009
blk_finish_plug(&connection->receiver_plug);
1010
blk_start_plug(&connection->receiver_plug);
1011
} /* else: maybe just schedule() ?? */
1012
}
1013
1014
static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1015
{
1016
void *buffer = connection->data.rbuf;
1017
int err;
1018
1019
err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1020
if (err)
1021
return err;
1022
1023
err = decode_header(connection, buffer, pi);
1024
connection->last_received = jiffies;
1025
1026
return err;
1027
}
1028
1029
static int drbd_recv_header_maybe_unplug(struct drbd_connection *connection, struct packet_info *pi)
1030
{
1031
void *buffer = connection->data.rbuf;
1032
unsigned int size = drbd_header_size(connection);
1033
int err;
1034
1035
err = drbd_recv_short(connection->data.socket, buffer, size, MSG_NOSIGNAL|MSG_DONTWAIT);
1036
if (err != size) {
1037
/* If we have nothing in the receive buffer now, to reduce
1038
* application latency, try to drain the backend queues as
1039
* quickly as possible, and let remote TCP know what we have
1040
* received so far. */
1041
if (err == -EAGAIN) {
1042
tcp_sock_set_quickack(connection->data.socket->sk, 2);
1043
drbd_unplug_all_devices(connection);
1044
}
1045
if (err > 0) {
1046
buffer += err;
1047
size -= err;
1048
}
1049
err = drbd_recv_all_warn(connection, buffer, size);
1050
if (err)
1051
return err;
1052
}
1053
1054
err = decode_header(connection, connection->data.rbuf, pi);
1055
connection->last_received = jiffies;
1056
1057
return err;
1058
}
1059
/* This is blkdev_issue_flush, but asynchronous.
1060
* We want to submit to all component volumes in parallel,
1061
* then wait for all completions.
1062
*/
1063
struct issue_flush_context {
1064
atomic_t pending;
1065
int error;
1066
struct completion done;
1067
};
1068
struct one_flush_context {
1069
struct drbd_device *device;
1070
struct issue_flush_context *ctx;
1071
};
1072
1073
static void one_flush_endio(struct bio *bio)
1074
{
1075
struct one_flush_context *octx = bio->bi_private;
1076
struct drbd_device *device = octx->device;
1077
struct issue_flush_context *ctx = octx->ctx;
1078
1079
if (bio->bi_status) {
1080
ctx->error = blk_status_to_errno(bio->bi_status);
1081
drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_status);
1082
}
1083
kfree(octx);
1084
bio_put(bio);
1085
1086
clear_bit(FLUSH_PENDING, &device->flags);
1087
put_ldev(device);
1088
kref_put(&device->kref, drbd_destroy_device);
1089
1090
if (atomic_dec_and_test(&ctx->pending))
1091
complete(&ctx->done);
1092
}
1093
1094
static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx)
1095
{
1096
struct bio *bio = bio_alloc(device->ldev->backing_bdev, 0,
1097
REQ_OP_WRITE | REQ_PREFLUSH, GFP_NOIO);
1098
struct one_flush_context *octx = kmalloc(sizeof(*octx), GFP_NOIO);
1099
1100
if (!octx) {
1101
drbd_warn(device, "Could not allocate a octx, CANNOT ISSUE FLUSH\n");
1102
/* FIXME: what else can I do now? disconnecting or detaching
1103
* really does not help to improve the state of the world, either.
1104
*/
1105
bio_put(bio);
1106
1107
ctx->error = -ENOMEM;
1108
put_ldev(device);
1109
kref_put(&device->kref, drbd_destroy_device);
1110
return;
1111
}
1112
1113
octx->device = device;
1114
octx->ctx = ctx;
1115
bio->bi_private = octx;
1116
bio->bi_end_io = one_flush_endio;
1117
1118
device->flush_jif = jiffies;
1119
set_bit(FLUSH_PENDING, &device->flags);
1120
atomic_inc(&ctx->pending);
1121
submit_bio(bio);
1122
}
1123
1124
static void drbd_flush(struct drbd_connection *connection)
1125
{
1126
if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1127
struct drbd_peer_device *peer_device;
1128
struct issue_flush_context ctx;
1129
int vnr;
1130
1131
atomic_set(&ctx.pending, 1);
1132
ctx.error = 0;
1133
init_completion(&ctx.done);
1134
1135
rcu_read_lock();
1136
idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1137
struct drbd_device *device = peer_device->device;
1138
1139
if (!get_ldev(device))
1140
continue;
1141
kref_get(&device->kref);
1142
rcu_read_unlock();
1143
1144
submit_one_flush(device, &ctx);
1145
1146
rcu_read_lock();
1147
}
1148
rcu_read_unlock();
1149
1150
/* Do we want to add a timeout,
1151
* if disk-timeout is set? */
1152
if (!atomic_dec_and_test(&ctx.pending))
1153
wait_for_completion(&ctx.done);
1154
1155
if (ctx.error) {
1156
/* would rather check on EOPNOTSUPP, but that is not reliable.
1157
* don't try again for ANY return value != 0
1158
* if (rv == -EOPNOTSUPP) */
1159
/* Any error is already reported by bio_endio callback. */
1160
drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1161
}
1162
}
1163
}
1164
1165
/**
1166
* drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1167
* @connection: DRBD connection.
1168
* @epoch: Epoch object.
1169
* @ev: Epoch event.
1170
*/
1171
static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1172
struct drbd_epoch *epoch,
1173
enum epoch_event ev)
1174
{
1175
int epoch_size;
1176
struct drbd_epoch *next_epoch;
1177
enum finish_epoch rv = FE_STILL_LIVE;
1178
1179
spin_lock(&connection->epoch_lock);
1180
do {
1181
next_epoch = NULL;
1182
1183
epoch_size = atomic_read(&epoch->epoch_size);
1184
1185
switch (ev & ~EV_CLEANUP) {
1186
case EV_PUT:
1187
atomic_dec(&epoch->active);
1188
break;
1189
case EV_GOT_BARRIER_NR:
1190
set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1191
break;
1192
case EV_BECAME_LAST:
1193
/* nothing to do*/
1194
break;
1195
}
1196
1197
if (epoch_size != 0 &&
1198
atomic_read(&epoch->active) == 0 &&
1199
(test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1200
if (!(ev & EV_CLEANUP)) {
1201
spin_unlock(&connection->epoch_lock);
1202
drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1203
spin_lock(&connection->epoch_lock);
1204
}
1205
#if 0
1206
/* FIXME: dec unacked on connection, once we have
1207
* something to count pending connection packets in. */
1208
if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1209
dec_unacked(epoch->connection);
1210
#endif
1211
1212
if (connection->current_epoch != epoch) {
1213
next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1214
list_del(&epoch->list);
1215
ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1216
connection->epochs--;
1217
kfree(epoch);
1218
1219
if (rv == FE_STILL_LIVE)
1220
rv = FE_DESTROYED;
1221
} else {
1222
epoch->flags = 0;
1223
atomic_set(&epoch->epoch_size, 0);
1224
/* atomic_set(&epoch->active, 0); is already zero */
1225
if (rv == FE_STILL_LIVE)
1226
rv = FE_RECYCLED;
1227
}
1228
}
1229
1230
if (!next_epoch)
1231
break;
1232
1233
epoch = next_epoch;
1234
} while (1);
1235
1236
spin_unlock(&connection->epoch_lock);
1237
1238
return rv;
1239
}
1240
1241
static enum write_ordering_e
1242
max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1243
{
1244
struct disk_conf *dc;
1245
1246
dc = rcu_dereference(bdev->disk_conf);
1247
1248
if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1249
wo = WO_DRAIN_IO;
1250
if (wo == WO_DRAIN_IO && !dc->disk_drain)
1251
wo = WO_NONE;
1252
1253
return wo;
1254
}
1255
1256
/*
1257
* drbd_bump_write_ordering() - Fall back to an other write ordering method
1258
* @wo: Write ordering method to try.
1259
*/
1260
void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1261
enum write_ordering_e wo)
1262
{
1263
struct drbd_device *device;
1264
enum write_ordering_e pwo;
1265
int vnr;
1266
static char *write_ordering_str[] = {
1267
[WO_NONE] = "none",
1268
[WO_DRAIN_IO] = "drain",
1269
[WO_BDEV_FLUSH] = "flush",
1270
};
1271
1272
pwo = resource->write_ordering;
1273
if (wo != WO_BDEV_FLUSH)
1274
wo = min(pwo, wo);
1275
rcu_read_lock();
1276
idr_for_each_entry(&resource->devices, device, vnr) {
1277
if (get_ldev(device)) {
1278
wo = max_allowed_wo(device->ldev, wo);
1279
if (device->ldev == bdev)
1280
bdev = NULL;
1281
put_ldev(device);
1282
}
1283
}
1284
1285
if (bdev)
1286
wo = max_allowed_wo(bdev, wo);
1287
1288
rcu_read_unlock();
1289
1290
resource->write_ordering = wo;
1291
if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1292
drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1293
}
1294
1295
/*
1296
* Mapping "discard" to ZEROOUT with UNMAP does not work for us:
1297
* Drivers have to "announce" q->limits.max_write_zeroes_sectors, or it
1298
* will directly go to fallback mode, submitting normal writes, and
1299
* never even try to UNMAP.
1300
*
1301
* And dm-thin does not do this (yet), mostly because in general it has
1302
* to assume that "skip_block_zeroing" is set. See also:
1303
* https://www.mail-archive.com/dm-devel%40redhat.com/msg07965.html
1304
* https://www.redhat.com/archives/dm-devel/2018-January/msg00271.html
1305
*
1306
* We *may* ignore the discard-zeroes-data setting, if so configured.
1307
*
1308
* Assumption is that this "discard_zeroes_data=0" is only because the backend
1309
* may ignore partial unaligned discards.
1310
*
1311
* LVM/DM thin as of at least
1312
* LVM version: 2.02.115(2)-RHEL7 (2015-01-28)
1313
* Library version: 1.02.93-RHEL7 (2015-01-28)
1314
* Driver version: 4.29.0
1315
* still behaves this way.
1316
*
1317
* For unaligned (wrt. alignment and granularity) or too small discards,
1318
* we zero-out the initial (and/or) trailing unaligned partial chunks,
1319
* but discard all the aligned full chunks.
1320
*
1321
* At least for LVM/DM thin, with skip_block_zeroing=false,
1322
* the result is effectively "discard_zeroes_data=1".
1323
*/
1324
/* flags: EE_TRIM|EE_ZEROOUT */
1325
int drbd_issue_discard_or_zero_out(struct drbd_device *device, sector_t start, unsigned int nr_sectors, int flags)
1326
{
1327
struct block_device *bdev = device->ldev->backing_bdev;
1328
sector_t tmp, nr;
1329
unsigned int max_discard_sectors, granularity;
1330
int alignment;
1331
int err = 0;
1332
1333
if ((flags & EE_ZEROOUT) || !(flags & EE_TRIM))
1334
goto zero_out;
1335
1336
/* Zero-sector (unknown) and one-sector granularities are the same. */
1337
granularity = max(bdev_discard_granularity(bdev) >> 9, 1U);
1338
alignment = (bdev_discard_alignment(bdev) >> 9) % granularity;
1339
1340
max_discard_sectors = min(bdev_max_discard_sectors(bdev), (1U << 22));
1341
max_discard_sectors -= max_discard_sectors % granularity;
1342
if (unlikely(!max_discard_sectors))
1343
goto zero_out;
1344
1345
if (nr_sectors < granularity)
1346
goto zero_out;
1347
1348
tmp = start;
1349
if (sector_div(tmp, granularity) != alignment) {
1350
if (nr_sectors < 2*granularity)
1351
goto zero_out;
1352
/* start + gran - (start + gran - align) % gran */
1353
tmp = start + granularity - alignment;
1354
tmp = start + granularity - sector_div(tmp, granularity);
1355
1356
nr = tmp - start;
1357
/* don't flag BLKDEV_ZERO_NOUNMAP, we don't know how many
1358
* layers are below us, some may have smaller granularity */
1359
err |= blkdev_issue_zeroout(bdev, start, nr, GFP_NOIO, 0);
1360
nr_sectors -= nr;
1361
start = tmp;
1362
}
1363
while (nr_sectors >= max_discard_sectors) {
1364
err |= blkdev_issue_discard(bdev, start, max_discard_sectors,
1365
GFP_NOIO);
1366
nr_sectors -= max_discard_sectors;
1367
start += max_discard_sectors;
1368
}
1369
if (nr_sectors) {
1370
/* max_discard_sectors is unsigned int (and a multiple of
1371
* granularity, we made sure of that above already);
1372
* nr is < max_discard_sectors;
1373
* I don't need sector_div here, even though nr is sector_t */
1374
nr = nr_sectors;
1375
nr -= (unsigned int)nr % granularity;
1376
if (nr) {
1377
err |= blkdev_issue_discard(bdev, start, nr, GFP_NOIO);
1378
nr_sectors -= nr;
1379
start += nr;
1380
}
1381
}
1382
zero_out:
1383
if (nr_sectors) {
1384
err |= blkdev_issue_zeroout(bdev, start, nr_sectors, GFP_NOIO,
1385
(flags & EE_TRIM) ? 0 : BLKDEV_ZERO_NOUNMAP);
1386
}
1387
return err != 0;
1388
}
1389
1390
static bool can_do_reliable_discards(struct drbd_device *device)
1391
{
1392
struct disk_conf *dc;
1393
bool can_do;
1394
1395
if (!bdev_max_discard_sectors(device->ldev->backing_bdev))
1396
return false;
1397
1398
rcu_read_lock();
1399
dc = rcu_dereference(device->ldev->disk_conf);
1400
can_do = dc->discard_zeroes_if_aligned;
1401
rcu_read_unlock();
1402
return can_do;
1403
}
1404
1405
static void drbd_issue_peer_discard_or_zero_out(struct drbd_device *device, struct drbd_peer_request *peer_req)
1406
{
1407
/* If the backend cannot discard, or does not guarantee
1408
* read-back zeroes in discarded ranges, we fall back to
1409
* zero-out. Unless configuration specifically requested
1410
* otherwise. */
1411
if (!can_do_reliable_discards(device))
1412
peer_req->flags |= EE_ZEROOUT;
1413
1414
if (drbd_issue_discard_or_zero_out(device, peer_req->i.sector,
1415
peer_req->i.size >> 9, peer_req->flags & (EE_ZEROOUT|EE_TRIM)))
1416
peer_req->flags |= EE_WAS_ERROR;
1417
drbd_endio_write_sec_final(peer_req);
1418
}
1419
1420
static int peer_request_fault_type(struct drbd_peer_request *peer_req)
1421
{
1422
if (peer_req_op(peer_req) == REQ_OP_READ) {
1423
return peer_req->flags & EE_APPLICATION ?
1424
DRBD_FAULT_DT_RD : DRBD_FAULT_RS_RD;
1425
} else {
1426
return peer_req->flags & EE_APPLICATION ?
1427
DRBD_FAULT_DT_WR : DRBD_FAULT_RS_WR;
1428
}
1429
}
1430
1431
/**
1432
* drbd_submit_peer_request()
1433
* @peer_req: peer request
1434
*
1435
* May spread the pages to multiple bios,
1436
* depending on bio_add_page restrictions.
1437
*
1438
* Returns 0 if all bios have been submitted,
1439
* -ENOMEM if we could not allocate enough bios,
1440
* -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1441
* single page to an empty bio (which should never happen and likely indicates
1442
* that the lower level IO stack is in some way broken). This has been observed
1443
* on certain Xen deployments.
1444
*/
1445
/* TODO allocate from our own bio_set. */
1446
int drbd_submit_peer_request(struct drbd_peer_request *peer_req)
1447
{
1448
struct drbd_device *device = peer_req->peer_device->device;
1449
struct bio *bios = NULL;
1450
struct bio *bio;
1451
struct page *page = peer_req->pages;
1452
sector_t sector = peer_req->i.sector;
1453
unsigned int data_size = peer_req->i.size;
1454
unsigned int n_bios = 0;
1455
unsigned int nr_pages = PFN_UP(data_size);
1456
1457
/* TRIM/DISCARD: for now, always use the helper function
1458
* blkdev_issue_zeroout(..., discard=true).
1459
* It's synchronous, but it does the right thing wrt. bio splitting.
1460
* Correctness first, performance later. Next step is to code an
1461
* asynchronous variant of the same.
1462
*/
1463
if (peer_req->flags & (EE_TRIM | EE_ZEROOUT)) {
1464
/* wait for all pending IO completions, before we start
1465
* zeroing things out. */
1466
conn_wait_active_ee_empty(peer_req->peer_device->connection);
1467
/* add it to the active list now,
1468
* so we can find it to present it in debugfs */
1469
peer_req->submit_jif = jiffies;
1470
peer_req->flags |= EE_SUBMITTED;
1471
1472
/* If this was a resync request from receive_rs_deallocated(),
1473
* it is already on the sync_ee list */
1474
if (list_empty(&peer_req->w.list)) {
1475
spin_lock_irq(&device->resource->req_lock);
1476
list_add_tail(&peer_req->w.list, &device->active_ee);
1477
spin_unlock_irq(&device->resource->req_lock);
1478
}
1479
1480
drbd_issue_peer_discard_or_zero_out(device, peer_req);
1481
return 0;
1482
}
1483
1484
/* In most cases, we will only need one bio. But in case the lower
1485
* level restrictions happen to be different at this offset on this
1486
* side than those of the sending peer, we may need to submit the
1487
* request in more than one bio.
1488
*
1489
* Plain bio_alloc is good enough here, this is no DRBD internally
1490
* generated bio, but a bio allocated on behalf of the peer.
1491
*/
1492
next_bio:
1493
/* _DISCARD, _WRITE_ZEROES handled above.
1494
* REQ_OP_FLUSH (empty flush) not expected,
1495
* should have been mapped to a "drbd protocol barrier".
1496
* REQ_OP_SECURE_ERASE: I don't see how we could ever support that.
1497
*/
1498
if (!(peer_req_op(peer_req) == REQ_OP_WRITE ||
1499
peer_req_op(peer_req) == REQ_OP_READ)) {
1500
drbd_err(device, "Invalid bio op received: 0x%x\n", peer_req->opf);
1501
return -EINVAL;
1502
}
1503
1504
bio = bio_alloc(device->ldev->backing_bdev, nr_pages, peer_req->opf, GFP_NOIO);
1505
/* > peer_req->i.sector, unless this is the first bio */
1506
bio->bi_iter.bi_sector = sector;
1507
bio->bi_private = peer_req;
1508
bio->bi_end_io = drbd_peer_request_endio;
1509
1510
bio->bi_next = bios;
1511
bios = bio;
1512
++n_bios;
1513
1514
page_chain_for_each(page) {
1515
unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1516
if (!bio_add_page(bio, page, len, 0))
1517
goto next_bio;
1518
data_size -= len;
1519
sector += len >> 9;
1520
--nr_pages;
1521
}
1522
D_ASSERT(device, data_size == 0);
1523
D_ASSERT(device, page == NULL);
1524
1525
atomic_set(&peer_req->pending_bios, n_bios);
1526
/* for debugfs: update timestamp, mark as submitted */
1527
peer_req->submit_jif = jiffies;
1528
peer_req->flags |= EE_SUBMITTED;
1529
do {
1530
bio = bios;
1531
bios = bios->bi_next;
1532
bio->bi_next = NULL;
1533
1534
drbd_submit_bio_noacct(device, peer_request_fault_type(peer_req), bio);
1535
} while (bios);
1536
return 0;
1537
}
1538
1539
static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1540
struct drbd_peer_request *peer_req)
1541
{
1542
struct drbd_interval *i = &peer_req->i;
1543
1544
drbd_remove_interval(&device->write_requests, i);
1545
drbd_clear_interval(i);
1546
1547
/* Wake up any processes waiting for this peer request to complete. */
1548
if (i->waiting)
1549
wake_up(&device->misc_wait);
1550
}
1551
1552
static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1553
{
1554
struct drbd_peer_device *peer_device;
1555
int vnr;
1556
1557
rcu_read_lock();
1558
idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1559
struct drbd_device *device = peer_device->device;
1560
1561
kref_get(&device->kref);
1562
rcu_read_unlock();
1563
drbd_wait_ee_list_empty(device, &device->active_ee);
1564
kref_put(&device->kref, drbd_destroy_device);
1565
rcu_read_lock();
1566
}
1567
rcu_read_unlock();
1568
}
1569
1570
static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1571
{
1572
int rv;
1573
struct p_barrier *p = pi->data;
1574
struct drbd_epoch *epoch;
1575
1576
/* FIXME these are unacked on connection,
1577
* not a specific (peer)device.
1578
*/
1579
connection->current_epoch->barrier_nr = p->barrier;
1580
connection->current_epoch->connection = connection;
1581
rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1582
1583
/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1584
* the activity log, which means it would not be resynced in case the
1585
* R_PRIMARY crashes now.
1586
* Therefore we must send the barrier_ack after the barrier request was
1587
* completed. */
1588
switch (connection->resource->write_ordering) {
1589
case WO_NONE:
1590
if (rv == FE_RECYCLED)
1591
return 0;
1592
1593
/* receiver context, in the writeout path of the other node.
1594
* avoid potential distributed deadlock */
1595
epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1596
if (epoch)
1597
break;
1598
else
1599
drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1600
fallthrough;
1601
1602
case WO_BDEV_FLUSH:
1603
case WO_DRAIN_IO:
1604
conn_wait_active_ee_empty(connection);
1605
drbd_flush(connection);
1606
1607
if (atomic_read(&connection->current_epoch->epoch_size)) {
1608
epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1609
if (epoch)
1610
break;
1611
}
1612
1613
return 0;
1614
default:
1615
drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1616
connection->resource->write_ordering);
1617
return -EIO;
1618
}
1619
1620
epoch->flags = 0;
1621
atomic_set(&epoch->epoch_size, 0);
1622
atomic_set(&epoch->active, 0);
1623
1624
spin_lock(&connection->epoch_lock);
1625
if (atomic_read(&connection->current_epoch->epoch_size)) {
1626
list_add(&epoch->list, &connection->current_epoch->list);
1627
connection->current_epoch = epoch;
1628
connection->epochs++;
1629
} else {
1630
/* The current_epoch got recycled while we allocated this one... */
1631
kfree(epoch);
1632
}
1633
spin_unlock(&connection->epoch_lock);
1634
1635
return 0;
1636
}
1637
1638
/* quick wrapper in case payload size != request_size (write same) */
1639
static void drbd_csum_ee_size(struct crypto_shash *h,
1640
struct drbd_peer_request *r, void *d,
1641
unsigned int payload_size)
1642
{
1643
unsigned int tmp = r->i.size;
1644
r->i.size = payload_size;
1645
drbd_csum_ee(h, r, d);
1646
r->i.size = tmp;
1647
}
1648
1649
/* used from receive_RSDataReply (recv_resync_read)
1650
* and from receive_Data.
1651
* data_size: actual payload ("data in")
1652
* for normal writes that is bi_size.
1653
* for discards, that is zero.
1654
* for write same, it is logical_block_size.
1655
* both trim and write same have the bi_size ("data len to be affected")
1656
* as extra argument in the packet header.
1657
*/
1658
static struct drbd_peer_request *
1659
read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1660
struct packet_info *pi) __must_hold(local)
1661
{
1662
struct drbd_device *device = peer_device->device;
1663
const sector_t capacity = get_capacity(device->vdisk);
1664
struct drbd_peer_request *peer_req;
1665
struct page *page;
1666
int digest_size, err;
1667
unsigned int data_size = pi->size, ds;
1668
void *dig_in = peer_device->connection->int_dig_in;
1669
void *dig_vv = peer_device->connection->int_dig_vv;
1670
unsigned long *data;
1671
struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1672
struct p_trim *zeroes = (pi->cmd == P_ZEROES) ? pi->data : NULL;
1673
1674
digest_size = 0;
1675
if (!trim && peer_device->connection->peer_integrity_tfm) {
1676
digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
1677
/*
1678
* FIXME: Receive the incoming digest into the receive buffer
1679
* here, together with its struct p_data?
1680
*/
1681
err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1682
if (err)
1683
return NULL;
1684
data_size -= digest_size;
1685
}
1686
1687
/* assume request_size == data_size, but special case trim. */
1688
ds = data_size;
1689
if (trim) {
1690
if (!expect(peer_device, data_size == 0))
1691
return NULL;
1692
ds = be32_to_cpu(trim->size);
1693
} else if (zeroes) {
1694
if (!expect(peer_device, data_size == 0))
1695
return NULL;
1696
ds = be32_to_cpu(zeroes->size);
1697
}
1698
1699
if (!expect(peer_device, IS_ALIGNED(ds, 512)))
1700
return NULL;
1701
if (trim || zeroes) {
1702
if (!expect(peer_device, ds <= (DRBD_MAX_BBIO_SECTORS << 9)))
1703
return NULL;
1704
} else if (!expect(peer_device, ds <= DRBD_MAX_BIO_SIZE))
1705
return NULL;
1706
1707
/* even though we trust out peer,
1708
* we sometimes have to double check. */
1709
if (sector + (ds>>9) > capacity) {
1710
drbd_err(device, "request from peer beyond end of local disk: "
1711
"capacity: %llus < sector: %llus + size: %u\n",
1712
(unsigned long long)capacity,
1713
(unsigned long long)sector, ds);
1714
return NULL;
1715
}
1716
1717
/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1718
* "criss-cross" setup, that might cause write-out on some other DRBD,
1719
* which in turn might block on the other node at this very place. */
1720
peer_req = drbd_alloc_peer_req(peer_device, id, sector, ds, data_size, GFP_NOIO);
1721
if (!peer_req)
1722
return NULL;
1723
1724
peer_req->flags |= EE_WRITE;
1725
if (trim) {
1726
peer_req->flags |= EE_TRIM;
1727
return peer_req;
1728
}
1729
if (zeroes) {
1730
peer_req->flags |= EE_ZEROOUT;
1731
return peer_req;
1732
}
1733
1734
/* receive payload size bytes into page chain */
1735
ds = data_size;
1736
page = peer_req->pages;
1737
page_chain_for_each(page) {
1738
unsigned len = min_t(int, ds, PAGE_SIZE);
1739
data = kmap(page);
1740
err = drbd_recv_all_warn(peer_device->connection, data, len);
1741
if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1742
drbd_err(device, "Fault injection: Corrupting data on receive\n");
1743
data[0] = data[0] ^ (unsigned long)-1;
1744
}
1745
kunmap(page);
1746
if (err) {
1747
drbd_free_peer_req(device, peer_req);
1748
return NULL;
1749
}
1750
ds -= len;
1751
}
1752
1753
if (digest_size) {
1754
drbd_csum_ee_size(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv, data_size);
1755
if (memcmp(dig_in, dig_vv, digest_size)) {
1756
drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1757
(unsigned long long)sector, data_size);
1758
drbd_free_peer_req(device, peer_req);
1759
return NULL;
1760
}
1761
}
1762
device->recv_cnt += data_size >> 9;
1763
return peer_req;
1764
}
1765
1766
/* drbd_drain_block() just takes a data block
1767
* out of the socket input buffer, and discards it.
1768
*/
1769
static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1770
{
1771
struct page *page;
1772
int err = 0;
1773
void *data;
1774
1775
if (!data_size)
1776
return 0;
1777
1778
page = drbd_alloc_pages(peer_device, 1, 1);
1779
1780
data = kmap(page);
1781
while (data_size) {
1782
unsigned int len = min_t(int, data_size, PAGE_SIZE);
1783
1784
err = drbd_recv_all_warn(peer_device->connection, data, len);
1785
if (err)
1786
break;
1787
data_size -= len;
1788
}
1789
kunmap(page);
1790
drbd_free_pages(peer_device->device, page);
1791
return err;
1792
}
1793
1794
static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1795
sector_t sector, int data_size)
1796
{
1797
struct bio_vec bvec;
1798
struct bvec_iter iter;
1799
struct bio *bio;
1800
int digest_size, err, expect;
1801
void *dig_in = peer_device->connection->int_dig_in;
1802
void *dig_vv = peer_device->connection->int_dig_vv;
1803
1804
digest_size = 0;
1805
if (peer_device->connection->peer_integrity_tfm) {
1806
digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
1807
err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1808
if (err)
1809
return err;
1810
data_size -= digest_size;
1811
}
1812
1813
/* optimistically update recv_cnt. if receiving fails below,
1814
* we disconnect anyways, and counters will be reset. */
1815
peer_device->device->recv_cnt += data_size>>9;
1816
1817
bio = req->master_bio;
1818
D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1819
1820
bio_for_each_segment(bvec, bio, iter) {
1821
void *mapped = bvec_kmap_local(&bvec);
1822
expect = min_t(int, data_size, bvec.bv_len);
1823
err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1824
kunmap_local(mapped);
1825
if (err)
1826
return err;
1827
data_size -= expect;
1828
}
1829
1830
if (digest_size) {
1831
drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1832
if (memcmp(dig_in, dig_vv, digest_size)) {
1833
drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1834
return -EINVAL;
1835
}
1836
}
1837
1838
D_ASSERT(peer_device->device, data_size == 0);
1839
return 0;
1840
}
1841
1842
/*
1843
* e_end_resync_block() is called in ack_sender context via
1844
* drbd_finish_peer_reqs().
1845
*/
1846
static int e_end_resync_block(struct drbd_work *w, int unused)
1847
{
1848
struct drbd_peer_request *peer_req =
1849
container_of(w, struct drbd_peer_request, w);
1850
struct drbd_peer_device *peer_device = peer_req->peer_device;
1851
struct drbd_device *device = peer_device->device;
1852
sector_t sector = peer_req->i.sector;
1853
int err;
1854
1855
D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1856
1857
if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1858
drbd_set_in_sync(peer_device, sector, peer_req->i.size);
1859
err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1860
} else {
1861
/* Record failure to sync */
1862
drbd_rs_failed_io(peer_device, sector, peer_req->i.size);
1863
1864
err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1865
}
1866
dec_unacked(device);
1867
1868
return err;
1869
}
1870
1871
static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1872
struct packet_info *pi) __releases(local)
1873
{
1874
struct drbd_device *device = peer_device->device;
1875
struct drbd_peer_request *peer_req;
1876
1877
peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
1878
if (!peer_req)
1879
goto fail;
1880
1881
dec_rs_pending(peer_device);
1882
1883
inc_unacked(device);
1884
/* corresponding dec_unacked() in e_end_resync_block()
1885
* respective _drbd_clear_done_ee */
1886
1887
peer_req->w.cb = e_end_resync_block;
1888
peer_req->opf = REQ_OP_WRITE;
1889
peer_req->submit_jif = jiffies;
1890
1891
spin_lock_irq(&device->resource->req_lock);
1892
list_add_tail(&peer_req->w.list, &device->sync_ee);
1893
spin_unlock_irq(&device->resource->req_lock);
1894
1895
atomic_add(pi->size >> 9, &device->rs_sect_ev);
1896
if (drbd_submit_peer_request(peer_req) == 0)
1897
return 0;
1898
1899
/* don't care for the reason here */
1900
drbd_err(device, "submit failed, triggering re-connect\n");
1901
spin_lock_irq(&device->resource->req_lock);
1902
list_del(&peer_req->w.list);
1903
spin_unlock_irq(&device->resource->req_lock);
1904
1905
drbd_free_peer_req(device, peer_req);
1906
fail:
1907
put_ldev(device);
1908
return -EIO;
1909
}
1910
1911
static struct drbd_request *
1912
find_request(struct drbd_device *device, struct rb_root *root, u64 id,
1913
sector_t sector, bool missing_ok, const char *func)
1914
{
1915
struct drbd_request *req;
1916
1917
/* Request object according to our peer */
1918
req = (struct drbd_request *)(unsigned long)id;
1919
if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1920
return req;
1921
if (!missing_ok) {
1922
drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
1923
(unsigned long)id, (unsigned long long)sector);
1924
}
1925
return NULL;
1926
}
1927
1928
static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
1929
{
1930
struct drbd_peer_device *peer_device;
1931
struct drbd_device *device;
1932
struct drbd_request *req;
1933
sector_t sector;
1934
int err;
1935
struct p_data *p = pi->data;
1936
1937
peer_device = conn_peer_device(connection, pi->vnr);
1938
if (!peer_device)
1939
return -EIO;
1940
device = peer_device->device;
1941
1942
sector = be64_to_cpu(p->sector);
1943
1944
spin_lock_irq(&device->resource->req_lock);
1945
req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
1946
spin_unlock_irq(&device->resource->req_lock);
1947
if (unlikely(!req))
1948
return -EIO;
1949
1950
err = recv_dless_read(peer_device, req, sector, pi->size);
1951
if (!err)
1952
req_mod(req, DATA_RECEIVED, peer_device);
1953
/* else: nothing. handled from drbd_disconnect...
1954
* I don't think we may complete this just yet
1955
* in case we are "on-disconnect: freeze" */
1956
1957
return err;
1958
}
1959
1960
static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
1961
{
1962
struct drbd_peer_device *peer_device;
1963
struct drbd_device *device;
1964
sector_t sector;
1965
int err;
1966
struct p_data *p = pi->data;
1967
1968
peer_device = conn_peer_device(connection, pi->vnr);
1969
if (!peer_device)
1970
return -EIO;
1971
device = peer_device->device;
1972
1973
sector = be64_to_cpu(p->sector);
1974
D_ASSERT(device, p->block_id == ID_SYNCER);
1975
1976
if (get_ldev(device)) {
1977
/* data is submitted to disk within recv_resync_read.
1978
* corresponding put_ldev done below on error,
1979
* or in drbd_peer_request_endio. */
1980
err = recv_resync_read(peer_device, sector, pi);
1981
} else {
1982
if (drbd_ratelimit())
1983
drbd_err(device, "Can not write resync data to local disk.\n");
1984
1985
err = drbd_drain_block(peer_device, pi->size);
1986
1987
drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
1988
}
1989
1990
atomic_add(pi->size >> 9, &device->rs_sect_in);
1991
1992
return err;
1993
}
1994
1995
static void restart_conflicting_writes(struct drbd_device *device,
1996
sector_t sector, int size)
1997
{
1998
struct drbd_interval *i;
1999
struct drbd_request *req;
2000
2001
drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2002
if (!i->local)
2003
continue;
2004
req = container_of(i, struct drbd_request, i);
2005
if (req->rq_state & RQ_LOCAL_PENDING ||
2006
!(req->rq_state & RQ_POSTPONED))
2007
continue;
2008
/* as it is RQ_POSTPONED, this will cause it to
2009
* be queued on the retry workqueue. */
2010
__req_mod(req, CONFLICT_RESOLVED, NULL, NULL);
2011
}
2012
}
2013
2014
/*
2015
* e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
2016
*/
2017
static int e_end_block(struct drbd_work *w, int cancel)
2018
{
2019
struct drbd_peer_request *peer_req =
2020
container_of(w, struct drbd_peer_request, w);
2021
struct drbd_peer_device *peer_device = peer_req->peer_device;
2022
struct drbd_device *device = peer_device->device;
2023
sector_t sector = peer_req->i.sector;
2024
int err = 0, pcmd;
2025
2026
if (peer_req->flags & EE_SEND_WRITE_ACK) {
2027
if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2028
pcmd = (device->state.conn >= C_SYNC_SOURCE &&
2029
device->state.conn <= C_PAUSED_SYNC_T &&
2030
peer_req->flags & EE_MAY_SET_IN_SYNC) ?
2031
P_RS_WRITE_ACK : P_WRITE_ACK;
2032
err = drbd_send_ack(peer_device, pcmd, peer_req);
2033
if (pcmd == P_RS_WRITE_ACK)
2034
drbd_set_in_sync(peer_device, sector, peer_req->i.size);
2035
} else {
2036
err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2037
/* we expect it to be marked out of sync anyways...
2038
* maybe assert this? */
2039
}
2040
dec_unacked(device);
2041
}
2042
2043
/* we delete from the conflict detection hash _after_ we sent out the
2044
* P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
2045
if (peer_req->flags & EE_IN_INTERVAL_TREE) {
2046
spin_lock_irq(&device->resource->req_lock);
2047
D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
2048
drbd_remove_epoch_entry_interval(device, peer_req);
2049
if (peer_req->flags & EE_RESTART_REQUESTS)
2050
restart_conflicting_writes(device, sector, peer_req->i.size);
2051
spin_unlock_irq(&device->resource->req_lock);
2052
} else
2053
D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2054
2055
drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
2056
2057
return err;
2058
}
2059
2060
static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2061
{
2062
struct drbd_peer_request *peer_req =
2063
container_of(w, struct drbd_peer_request, w);
2064
struct drbd_peer_device *peer_device = peer_req->peer_device;
2065
int err;
2066
2067
err = drbd_send_ack(peer_device, ack, peer_req);
2068
dec_unacked(peer_device->device);
2069
2070
return err;
2071
}
2072
2073
static int e_send_superseded(struct drbd_work *w, int unused)
2074
{
2075
return e_send_ack(w, P_SUPERSEDED);
2076
}
2077
2078
static int e_send_retry_write(struct drbd_work *w, int unused)
2079
{
2080
struct drbd_peer_request *peer_req =
2081
container_of(w, struct drbd_peer_request, w);
2082
struct drbd_connection *connection = peer_req->peer_device->connection;
2083
2084
return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2085
P_RETRY_WRITE : P_SUPERSEDED);
2086
}
2087
2088
static bool seq_greater(u32 a, u32 b)
2089
{
2090
/*
2091
* We assume 32-bit wrap-around here.
2092
* For 24-bit wrap-around, we would have to shift:
2093
* a <<= 8; b <<= 8;
2094
*/
2095
return (s32)a - (s32)b > 0;
2096
}
2097
2098
static u32 seq_max(u32 a, u32 b)
2099
{
2100
return seq_greater(a, b) ? a : b;
2101
}
2102
2103
static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2104
{
2105
struct drbd_device *device = peer_device->device;
2106
unsigned int newest_peer_seq;
2107
2108
if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2109
spin_lock(&device->peer_seq_lock);
2110
newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2111
device->peer_seq = newest_peer_seq;
2112
spin_unlock(&device->peer_seq_lock);
2113
/* wake up only if we actually changed device->peer_seq */
2114
if (peer_seq == newest_peer_seq)
2115
wake_up(&device->seq_wait);
2116
}
2117
}
2118
2119
static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2120
{
2121
return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2122
}
2123
2124
/* maybe change sync_ee into interval trees as well? */
2125
static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2126
{
2127
struct drbd_peer_request *rs_req;
2128
bool rv = false;
2129
2130
spin_lock_irq(&device->resource->req_lock);
2131
list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2132
if (overlaps(peer_req->i.sector, peer_req->i.size,
2133
rs_req->i.sector, rs_req->i.size)) {
2134
rv = true;
2135
break;
2136
}
2137
}
2138
spin_unlock_irq(&device->resource->req_lock);
2139
2140
return rv;
2141
}
2142
2143
/* Called from receive_Data.
2144
* Synchronize packets on sock with packets on msock.
2145
*
2146
* This is here so even when a P_DATA packet traveling via sock overtook an Ack
2147
* packet traveling on msock, they are still processed in the order they have
2148
* been sent.
2149
*
2150
* Note: we don't care for Ack packets overtaking P_DATA packets.
2151
*
2152
* In case packet_seq is larger than device->peer_seq number, there are
2153
* outstanding packets on the msock. We wait for them to arrive.
2154
* In case we are the logically next packet, we update device->peer_seq
2155
* ourselves. Correctly handles 32bit wrap around.
2156
*
2157
* Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2158
* about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2159
* for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2160
* 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2161
*
2162
* returns 0 if we may process the packet,
2163
* -ERESTARTSYS if we were interrupted (by disconnect signal). */
2164
static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2165
{
2166
struct drbd_device *device = peer_device->device;
2167
DEFINE_WAIT(wait);
2168
long timeout;
2169
int ret = 0, tp;
2170
2171
if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2172
return 0;
2173
2174
spin_lock(&device->peer_seq_lock);
2175
for (;;) {
2176
if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2177
device->peer_seq = seq_max(device->peer_seq, peer_seq);
2178
break;
2179
}
2180
2181
if (signal_pending(current)) {
2182
ret = -ERESTARTSYS;
2183
break;
2184
}
2185
2186
rcu_read_lock();
2187
tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2188
rcu_read_unlock();
2189
2190
if (!tp)
2191
break;
2192
2193
/* Only need to wait if two_primaries is enabled */
2194
prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2195
spin_unlock(&device->peer_seq_lock);
2196
rcu_read_lock();
2197
timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2198
rcu_read_unlock();
2199
timeout = schedule_timeout(timeout);
2200
spin_lock(&device->peer_seq_lock);
2201
if (!timeout) {
2202
ret = -ETIMEDOUT;
2203
drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2204
break;
2205
}
2206
}
2207
spin_unlock(&device->peer_seq_lock);
2208
finish_wait(&device->seq_wait, &wait);
2209
return ret;
2210
}
2211
2212
static enum req_op wire_flags_to_bio_op(u32 dpf)
2213
{
2214
if (dpf & DP_ZEROES)
2215
return REQ_OP_WRITE_ZEROES;
2216
if (dpf & DP_DISCARD)
2217
return REQ_OP_DISCARD;
2218
else
2219
return REQ_OP_WRITE;
2220
}
2221
2222
/* see also bio_flags_to_wire() */
2223
static blk_opf_t wire_flags_to_bio(struct drbd_connection *connection, u32 dpf)
2224
{
2225
return wire_flags_to_bio_op(dpf) |
2226
(dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2227
(dpf & DP_FUA ? REQ_FUA : 0) |
2228
(dpf & DP_FLUSH ? REQ_PREFLUSH : 0);
2229
}
2230
2231
static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2232
unsigned int size)
2233
{
2234
struct drbd_peer_device *peer_device = first_peer_device(device);
2235
struct drbd_interval *i;
2236
2237
repeat:
2238
drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2239
struct drbd_request *req;
2240
struct bio_and_error m;
2241
2242
if (!i->local)
2243
continue;
2244
req = container_of(i, struct drbd_request, i);
2245
if (!(req->rq_state & RQ_POSTPONED))
2246
continue;
2247
req->rq_state &= ~RQ_POSTPONED;
2248
__req_mod(req, NEG_ACKED, peer_device, &m);
2249
spin_unlock_irq(&device->resource->req_lock);
2250
if (m.bio)
2251
complete_master_bio(device, &m);
2252
spin_lock_irq(&device->resource->req_lock);
2253
goto repeat;
2254
}
2255
}
2256
2257
static int handle_write_conflicts(struct drbd_device *device,
2258
struct drbd_peer_request *peer_req)
2259
{
2260
struct drbd_connection *connection = peer_req->peer_device->connection;
2261
bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2262
sector_t sector = peer_req->i.sector;
2263
const unsigned int size = peer_req->i.size;
2264
struct drbd_interval *i;
2265
bool equal;
2266
int err;
2267
2268
/*
2269
* Inserting the peer request into the write_requests tree will prevent
2270
* new conflicting local requests from being added.
2271
*/
2272
drbd_insert_interval(&device->write_requests, &peer_req->i);
2273
2274
repeat:
2275
drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2276
if (i == &peer_req->i)
2277
continue;
2278
if (i->completed)
2279
continue;
2280
2281
if (!i->local) {
2282
/*
2283
* Our peer has sent a conflicting remote request; this
2284
* should not happen in a two-node setup. Wait for the
2285
* earlier peer request to complete.
2286
*/
2287
err = drbd_wait_misc(device, i);
2288
if (err)
2289
goto out;
2290
goto repeat;
2291
}
2292
2293
equal = i->sector == sector && i->size == size;
2294
if (resolve_conflicts) {
2295
/*
2296
* If the peer request is fully contained within the
2297
* overlapping request, it can be considered overwritten
2298
* and thus superseded; otherwise, it will be retried
2299
* once all overlapping requests have completed.
2300
*/
2301
bool superseded = i->sector <= sector && i->sector +
2302
(i->size >> 9) >= sector + (size >> 9);
2303
2304
if (!equal)
2305
drbd_alert(device, "Concurrent writes detected: "
2306
"local=%llus +%u, remote=%llus +%u, "
2307
"assuming %s came first\n",
2308
(unsigned long long)i->sector, i->size,
2309
(unsigned long long)sector, size,
2310
superseded ? "local" : "remote");
2311
2312
peer_req->w.cb = superseded ? e_send_superseded :
2313
e_send_retry_write;
2314
list_add_tail(&peer_req->w.list, &device->done_ee);
2315
/* put is in drbd_send_acks_wf() */
2316
kref_get(&device->kref);
2317
if (!queue_work(connection->ack_sender,
2318
&peer_req->peer_device->send_acks_work))
2319
kref_put(&device->kref, drbd_destroy_device);
2320
2321
err = -ENOENT;
2322
goto out;
2323
} else {
2324
struct drbd_request *req =
2325
container_of(i, struct drbd_request, i);
2326
2327
if (!equal)
2328
drbd_alert(device, "Concurrent writes detected: "
2329
"local=%llus +%u, remote=%llus +%u\n",
2330
(unsigned long long)i->sector, i->size,
2331
(unsigned long long)sector, size);
2332
2333
if (req->rq_state & RQ_LOCAL_PENDING ||
2334
!(req->rq_state & RQ_POSTPONED)) {
2335
/*
2336
* Wait for the node with the discard flag to
2337
* decide if this request has been superseded
2338
* or needs to be retried.
2339
* Requests that have been superseded will
2340
* disappear from the write_requests tree.
2341
*
2342
* In addition, wait for the conflicting
2343
* request to finish locally before submitting
2344
* the conflicting peer request.
2345
*/
2346
err = drbd_wait_misc(device, &req->i);
2347
if (err) {
2348
_conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2349
fail_postponed_requests(device, sector, size);
2350
goto out;
2351
}
2352
goto repeat;
2353
}
2354
/*
2355
* Remember to restart the conflicting requests after
2356
* the new peer request has completed.
2357
*/
2358
peer_req->flags |= EE_RESTART_REQUESTS;
2359
}
2360
}
2361
err = 0;
2362
2363
out:
2364
if (err)
2365
drbd_remove_epoch_entry_interval(device, peer_req);
2366
return err;
2367
}
2368
2369
/* mirrored write */
2370
static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2371
{
2372
struct drbd_peer_device *peer_device;
2373
struct drbd_device *device;
2374
struct net_conf *nc;
2375
sector_t sector;
2376
struct drbd_peer_request *peer_req;
2377
struct p_data *p = pi->data;
2378
u32 peer_seq = be32_to_cpu(p->seq_num);
2379
u32 dp_flags;
2380
int err, tp;
2381
2382
peer_device = conn_peer_device(connection, pi->vnr);
2383
if (!peer_device)
2384
return -EIO;
2385
device = peer_device->device;
2386
2387
if (!get_ldev(device)) {
2388
int err2;
2389
2390
err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2391
drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2392
atomic_inc(&connection->current_epoch->epoch_size);
2393
err2 = drbd_drain_block(peer_device, pi->size);
2394
if (!err)
2395
err = err2;
2396
return err;
2397
}
2398
2399
/*
2400
* Corresponding put_ldev done either below (on various errors), or in
2401
* drbd_peer_request_endio, if we successfully submit the data at the
2402
* end of this function.
2403
*/
2404
2405
sector = be64_to_cpu(p->sector);
2406
peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2407
if (!peer_req) {
2408
put_ldev(device);
2409
return -EIO;
2410
}
2411
2412
peer_req->w.cb = e_end_block;
2413
peer_req->submit_jif = jiffies;
2414
peer_req->flags |= EE_APPLICATION;
2415
2416
dp_flags = be32_to_cpu(p->dp_flags);
2417
peer_req->opf = wire_flags_to_bio(connection, dp_flags);
2418
if (pi->cmd == P_TRIM) {
2419
D_ASSERT(peer_device, peer_req->i.size > 0);
2420
D_ASSERT(peer_device, peer_req_op(peer_req) == REQ_OP_DISCARD);
2421
D_ASSERT(peer_device, peer_req->pages == NULL);
2422
/* need to play safe: an older DRBD sender
2423
* may mean zero-out while sending P_TRIM. */
2424
if (0 == (connection->agreed_features & DRBD_FF_WZEROES))
2425
peer_req->flags |= EE_ZEROOUT;
2426
} else if (pi->cmd == P_ZEROES) {
2427
D_ASSERT(peer_device, peer_req->i.size > 0);
2428
D_ASSERT(peer_device, peer_req_op(peer_req) == REQ_OP_WRITE_ZEROES);
2429
D_ASSERT(peer_device, peer_req->pages == NULL);
2430
/* Do (not) pass down BLKDEV_ZERO_NOUNMAP? */
2431
if (dp_flags & DP_DISCARD)
2432
peer_req->flags |= EE_TRIM;
2433
} else if (peer_req->pages == NULL) {
2434
D_ASSERT(device, peer_req->i.size == 0);
2435
D_ASSERT(device, dp_flags & DP_FLUSH);
2436
}
2437
2438
if (dp_flags & DP_MAY_SET_IN_SYNC)
2439
peer_req->flags |= EE_MAY_SET_IN_SYNC;
2440
2441
spin_lock(&connection->epoch_lock);
2442
peer_req->epoch = connection->current_epoch;
2443
atomic_inc(&peer_req->epoch->epoch_size);
2444
atomic_inc(&peer_req->epoch->active);
2445
spin_unlock(&connection->epoch_lock);
2446
2447
rcu_read_lock();
2448
nc = rcu_dereference(peer_device->connection->net_conf);
2449
tp = nc->two_primaries;
2450
if (peer_device->connection->agreed_pro_version < 100) {
2451
switch (nc->wire_protocol) {
2452
case DRBD_PROT_C:
2453
dp_flags |= DP_SEND_WRITE_ACK;
2454
break;
2455
case DRBD_PROT_B:
2456
dp_flags |= DP_SEND_RECEIVE_ACK;
2457
break;
2458
}
2459
}
2460
rcu_read_unlock();
2461
2462
if (dp_flags & DP_SEND_WRITE_ACK) {
2463
peer_req->flags |= EE_SEND_WRITE_ACK;
2464
inc_unacked(device);
2465
/* corresponding dec_unacked() in e_end_block()
2466
* respective _drbd_clear_done_ee */
2467
}
2468
2469
if (dp_flags & DP_SEND_RECEIVE_ACK) {
2470
/* I really don't like it that the receiver thread
2471
* sends on the msock, but anyways */
2472
drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2473
}
2474
2475
if (tp) {
2476
/* two primaries implies protocol C */
2477
D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2478
peer_req->flags |= EE_IN_INTERVAL_TREE;
2479
err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2480
if (err)
2481
goto out_interrupted;
2482
spin_lock_irq(&device->resource->req_lock);
2483
err = handle_write_conflicts(device, peer_req);
2484
if (err) {
2485
spin_unlock_irq(&device->resource->req_lock);
2486
if (err == -ENOENT) {
2487
put_ldev(device);
2488
return 0;
2489
}
2490
goto out_interrupted;
2491
}
2492
} else {
2493
update_peer_seq(peer_device, peer_seq);
2494
spin_lock_irq(&device->resource->req_lock);
2495
}
2496
/* TRIM and is processed synchronously,
2497
* we wait for all pending requests, respectively wait for
2498
* active_ee to become empty in drbd_submit_peer_request();
2499
* better not add ourselves here. */
2500
if ((peer_req->flags & (EE_TRIM | EE_ZEROOUT)) == 0)
2501
list_add_tail(&peer_req->w.list, &device->active_ee);
2502
spin_unlock_irq(&device->resource->req_lock);
2503
2504
if (device->state.conn == C_SYNC_TARGET)
2505
wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2506
2507
if (device->state.pdsk < D_INCONSISTENT) {
2508
/* In case we have the only disk of the cluster, */
2509
drbd_set_out_of_sync(peer_device, peer_req->i.sector, peer_req->i.size);
2510
peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2511
drbd_al_begin_io(device, &peer_req->i);
2512
peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2513
}
2514
2515
err = drbd_submit_peer_request(peer_req);
2516
if (!err)
2517
return 0;
2518
2519
/* don't care for the reason here */
2520
drbd_err(device, "submit failed, triggering re-connect\n");
2521
spin_lock_irq(&device->resource->req_lock);
2522
list_del(&peer_req->w.list);
2523
drbd_remove_epoch_entry_interval(device, peer_req);
2524
spin_unlock_irq(&device->resource->req_lock);
2525
if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2526
peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2527
drbd_al_complete_io(device, &peer_req->i);
2528
}
2529
2530
out_interrupted:
2531
drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT | EV_CLEANUP);
2532
put_ldev(device);
2533
drbd_free_peer_req(device, peer_req);
2534
return err;
2535
}
2536
2537
/* We may throttle resync, if the lower device seems to be busy,
2538
* and current sync rate is above c_min_rate.
2539
*
2540
* To decide whether or not the lower device is busy, we use a scheme similar
2541
* to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2542
* (more than 64 sectors) of activity we cannot account for with our own resync
2543
* activity, it obviously is "busy".
2544
*
2545
* The current sync rate used here uses only the most recent two step marks,
2546
* to have a short time average so we can react faster.
2547
*/
2548
bool drbd_rs_should_slow_down(struct drbd_peer_device *peer_device, sector_t sector,
2549
bool throttle_if_app_is_waiting)
2550
{
2551
struct drbd_device *device = peer_device->device;
2552
struct lc_element *tmp;
2553
bool throttle = drbd_rs_c_min_rate_throttle(device);
2554
2555
if (!throttle || throttle_if_app_is_waiting)
2556
return throttle;
2557
2558
spin_lock_irq(&device->al_lock);
2559
tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2560
if (tmp) {
2561
struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2562
if (test_bit(BME_PRIORITY, &bm_ext->flags))
2563
throttle = false;
2564
/* Do not slow down if app IO is already waiting for this extent,
2565
* and our progress is necessary for application IO to complete. */
2566
}
2567
spin_unlock_irq(&device->al_lock);
2568
2569
return throttle;
2570
}
2571
2572
bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2573
{
2574
struct gendisk *disk = device->ldev->backing_bdev->bd_disk;
2575
unsigned long db, dt, dbdt;
2576
unsigned int c_min_rate;
2577
int curr_events;
2578
2579
rcu_read_lock();
2580
c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2581
rcu_read_unlock();
2582
2583
/* feature disabled? */
2584
if (c_min_rate == 0)
2585
return false;
2586
2587
curr_events = (int)part_stat_read_accum(disk->part0, sectors) -
2588
atomic_read(&device->rs_sect_ev);
2589
2590
if (atomic_read(&device->ap_actlog_cnt)
2591
|| curr_events - device->rs_last_events > 64) {
2592
unsigned long rs_left;
2593
int i;
2594
2595
device->rs_last_events = curr_events;
2596
2597
/* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2598
* approx. */
2599
i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2600
2601
if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2602
rs_left = device->ov_left;
2603
else
2604
rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2605
2606
dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2607
if (!dt)
2608
dt++;
2609
db = device->rs_mark_left[i] - rs_left;
2610
dbdt = Bit2KB(db/dt);
2611
2612
if (dbdt > c_min_rate)
2613
return true;
2614
}
2615
return false;
2616
}
2617
2618
static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2619
{
2620
struct drbd_peer_device *peer_device;
2621
struct drbd_device *device;
2622
sector_t sector;
2623
sector_t capacity;
2624
struct drbd_peer_request *peer_req;
2625
struct digest_info *di = NULL;
2626
int size, verb;
2627
struct p_block_req *p = pi->data;
2628
2629
peer_device = conn_peer_device(connection, pi->vnr);
2630
if (!peer_device)
2631
return -EIO;
2632
device = peer_device->device;
2633
capacity = get_capacity(device->vdisk);
2634
2635
sector = be64_to_cpu(p->sector);
2636
size = be32_to_cpu(p->blksize);
2637
2638
if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2639
drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2640
(unsigned long long)sector, size);
2641
return -EINVAL;
2642
}
2643
if (sector + (size>>9) > capacity) {
2644
drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2645
(unsigned long long)sector, size);
2646
return -EINVAL;
2647
}
2648
2649
if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2650
verb = 1;
2651
switch (pi->cmd) {
2652
case P_DATA_REQUEST:
2653
drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2654
break;
2655
case P_RS_THIN_REQ:
2656
case P_RS_DATA_REQUEST:
2657
case P_CSUM_RS_REQUEST:
2658
case P_OV_REQUEST:
2659
drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2660
break;
2661
case P_OV_REPLY:
2662
verb = 0;
2663
dec_rs_pending(peer_device);
2664
drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2665
break;
2666
default:
2667
BUG();
2668
}
2669
if (verb && drbd_ratelimit())
2670
drbd_err(device, "Can not satisfy peer's read request, "
2671
"no local data.\n");
2672
2673
/* drain possibly payload */
2674
return drbd_drain_block(peer_device, pi->size);
2675
}
2676
2677
/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2678
* "criss-cross" setup, that might cause write-out on some other DRBD,
2679
* which in turn might block on the other node at this very place. */
2680
peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2681
size, GFP_NOIO);
2682
if (!peer_req) {
2683
put_ldev(device);
2684
return -ENOMEM;
2685
}
2686
peer_req->opf = REQ_OP_READ;
2687
2688
switch (pi->cmd) {
2689
case P_DATA_REQUEST:
2690
peer_req->w.cb = w_e_end_data_req;
2691
/* application IO, don't drbd_rs_begin_io */
2692
peer_req->flags |= EE_APPLICATION;
2693
goto submit;
2694
2695
case P_RS_THIN_REQ:
2696
/* If at some point in the future we have a smart way to
2697
find out if this data block is completely deallocated,
2698
then we would do something smarter here than reading
2699
the block... */
2700
peer_req->flags |= EE_RS_THIN_REQ;
2701
fallthrough;
2702
case P_RS_DATA_REQUEST:
2703
peer_req->w.cb = w_e_end_rsdata_req;
2704
/* used in the sector offset progress display */
2705
device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2706
break;
2707
2708
case P_OV_REPLY:
2709
case P_CSUM_RS_REQUEST:
2710
di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2711
if (!di)
2712
goto out_free_e;
2713
2714
di->digest_size = pi->size;
2715
di->digest = (((char *)di)+sizeof(struct digest_info));
2716
2717
peer_req->digest = di;
2718
peer_req->flags |= EE_HAS_DIGEST;
2719
2720
if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2721
goto out_free_e;
2722
2723
if (pi->cmd == P_CSUM_RS_REQUEST) {
2724
D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2725
peer_req->w.cb = w_e_end_csum_rs_req;
2726
/* used in the sector offset progress display */
2727
device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2728
/* remember to report stats in drbd_resync_finished */
2729
device->use_csums = true;
2730
} else if (pi->cmd == P_OV_REPLY) {
2731
/* track progress, we may need to throttle */
2732
atomic_add(size >> 9, &device->rs_sect_in);
2733
peer_req->w.cb = w_e_end_ov_reply;
2734
dec_rs_pending(peer_device);
2735
/* drbd_rs_begin_io done when we sent this request,
2736
* but accounting still needs to be done. */
2737
goto submit_for_resync;
2738
}
2739
break;
2740
2741
case P_OV_REQUEST:
2742
if (device->ov_start_sector == ~(sector_t)0 &&
2743
peer_device->connection->agreed_pro_version >= 90) {
2744
unsigned long now = jiffies;
2745
int i;
2746
device->ov_start_sector = sector;
2747
device->ov_position = sector;
2748
device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2749
device->rs_total = device->ov_left;
2750
for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2751
device->rs_mark_left[i] = device->ov_left;
2752
device->rs_mark_time[i] = now;
2753
}
2754
drbd_info(device, "Online Verify start sector: %llu\n",
2755
(unsigned long long)sector);
2756
}
2757
peer_req->w.cb = w_e_end_ov_req;
2758
break;
2759
2760
default:
2761
BUG();
2762
}
2763
2764
/* Throttle, drbd_rs_begin_io and submit should become asynchronous
2765
* wrt the receiver, but it is not as straightforward as it may seem.
2766
* Various places in the resync start and stop logic assume resync
2767
* requests are processed in order, requeuing this on the worker thread
2768
* introduces a bunch of new code for synchronization between threads.
2769
*
2770
* Unlimited throttling before drbd_rs_begin_io may stall the resync
2771
* "forever", throttling after drbd_rs_begin_io will lock that extent
2772
* for application writes for the same time. For now, just throttle
2773
* here, where the rest of the code expects the receiver to sleep for
2774
* a while, anyways.
2775
*/
2776
2777
/* Throttle before drbd_rs_begin_io, as that locks out application IO;
2778
* this defers syncer requests for some time, before letting at least
2779
* on request through. The resync controller on the receiving side
2780
* will adapt to the incoming rate accordingly.
2781
*
2782
* We cannot throttle here if remote is Primary/SyncTarget:
2783
* we would also throttle its application reads.
2784
* In that case, throttling is done on the SyncTarget only.
2785
*/
2786
2787
/* Even though this may be a resync request, we do add to "read_ee";
2788
* "sync_ee" is only used for resync WRITEs.
2789
* Add to list early, so debugfs can find this request
2790
* even if we have to sleep below. */
2791
spin_lock_irq(&device->resource->req_lock);
2792
list_add_tail(&peer_req->w.list, &device->read_ee);
2793
spin_unlock_irq(&device->resource->req_lock);
2794
2795
update_receiver_timing_details(connection, drbd_rs_should_slow_down);
2796
if (device->state.peer != R_PRIMARY
2797
&& drbd_rs_should_slow_down(peer_device, sector, false))
2798
schedule_timeout_uninterruptible(HZ/10);
2799
update_receiver_timing_details(connection, drbd_rs_begin_io);
2800
if (drbd_rs_begin_io(device, sector))
2801
goto out_free_e;
2802
2803
submit_for_resync:
2804
atomic_add(size >> 9, &device->rs_sect_ev);
2805
2806
submit:
2807
update_receiver_timing_details(connection, drbd_submit_peer_request);
2808
inc_unacked(device);
2809
if (drbd_submit_peer_request(peer_req) == 0)
2810
return 0;
2811
2812
/* don't care for the reason here */
2813
drbd_err(device, "submit failed, triggering re-connect\n");
2814
2815
out_free_e:
2816
spin_lock_irq(&device->resource->req_lock);
2817
list_del(&peer_req->w.list);
2818
spin_unlock_irq(&device->resource->req_lock);
2819
/* no drbd_rs_complete_io(), we are dropping the connection anyways */
2820
2821
put_ldev(device);
2822
drbd_free_peer_req(device, peer_req);
2823
return -EIO;
2824
}
2825
2826
/*
2827
* drbd_asb_recover_0p - Recover after split-brain with no remaining primaries
2828
*/
2829
static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2830
{
2831
struct drbd_device *device = peer_device->device;
2832
int self, peer, rv = -100;
2833
unsigned long ch_self, ch_peer;
2834
enum drbd_after_sb_p after_sb_0p;
2835
2836
self = device->ldev->md.uuid[UI_BITMAP] & 1;
2837
peer = device->p_uuid[UI_BITMAP] & 1;
2838
2839
ch_peer = device->p_uuid[UI_SIZE];
2840
ch_self = device->comm_bm_set;
2841
2842
rcu_read_lock();
2843
after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2844
rcu_read_unlock();
2845
switch (after_sb_0p) {
2846
case ASB_CONSENSUS:
2847
case ASB_DISCARD_SECONDARY:
2848
case ASB_CALL_HELPER:
2849
case ASB_VIOLENTLY:
2850
drbd_err(device, "Configuration error.\n");
2851
break;
2852
case ASB_DISCONNECT:
2853
break;
2854
case ASB_DISCARD_YOUNGER_PRI:
2855
if (self == 0 && peer == 1) {
2856
rv = -1;
2857
break;
2858
}
2859
if (self == 1 && peer == 0) {
2860
rv = 1;
2861
break;
2862
}
2863
fallthrough; /* to one of the other strategies */
2864
case ASB_DISCARD_OLDER_PRI:
2865
if (self == 0 && peer == 1) {
2866
rv = 1;
2867
break;
2868
}
2869
if (self == 1 && peer == 0) {
2870
rv = -1;
2871
break;
2872
}
2873
/* Else fall through to one of the other strategies... */
2874
drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2875
"Using discard-least-changes instead\n");
2876
fallthrough;
2877
case ASB_DISCARD_ZERO_CHG:
2878
if (ch_peer == 0 && ch_self == 0) {
2879
rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2880
? -1 : 1;
2881
break;
2882
} else {
2883
if (ch_peer == 0) { rv = 1; break; }
2884
if (ch_self == 0) { rv = -1; break; }
2885
}
2886
if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2887
break;
2888
fallthrough;
2889
case ASB_DISCARD_LEAST_CHG:
2890
if (ch_self < ch_peer)
2891
rv = -1;
2892
else if (ch_self > ch_peer)
2893
rv = 1;
2894
else /* ( ch_self == ch_peer ) */
2895
/* Well, then use something else. */
2896
rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2897
? -1 : 1;
2898
break;
2899
case ASB_DISCARD_LOCAL:
2900
rv = -1;
2901
break;
2902
case ASB_DISCARD_REMOTE:
2903
rv = 1;
2904
}
2905
2906
return rv;
2907
}
2908
2909
/*
2910
* drbd_asb_recover_1p - Recover after split-brain with one remaining primary
2911
*/
2912
static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
2913
{
2914
struct drbd_device *device = peer_device->device;
2915
int hg, rv = -100;
2916
enum drbd_after_sb_p after_sb_1p;
2917
2918
rcu_read_lock();
2919
after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
2920
rcu_read_unlock();
2921
switch (after_sb_1p) {
2922
case ASB_DISCARD_YOUNGER_PRI:
2923
case ASB_DISCARD_OLDER_PRI:
2924
case ASB_DISCARD_LEAST_CHG:
2925
case ASB_DISCARD_LOCAL:
2926
case ASB_DISCARD_REMOTE:
2927
case ASB_DISCARD_ZERO_CHG:
2928
drbd_err(device, "Configuration error.\n");
2929
break;
2930
case ASB_DISCONNECT:
2931
break;
2932
case ASB_CONSENSUS:
2933
hg = drbd_asb_recover_0p(peer_device);
2934
if (hg == -1 && device->state.role == R_SECONDARY)
2935
rv = hg;
2936
if (hg == 1 && device->state.role == R_PRIMARY)
2937
rv = hg;
2938
break;
2939
case ASB_VIOLENTLY:
2940
rv = drbd_asb_recover_0p(peer_device);
2941
break;
2942
case ASB_DISCARD_SECONDARY:
2943
return device->state.role == R_PRIMARY ? 1 : -1;
2944
case ASB_CALL_HELPER:
2945
hg = drbd_asb_recover_0p(peer_device);
2946
if (hg == -1 && device->state.role == R_PRIMARY) {
2947
enum drbd_state_rv rv2;
2948
2949
/* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2950
* we might be here in C_WF_REPORT_PARAMS which is transient.
2951
* we do not need to wait for the after state change work either. */
2952
rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2953
if (rv2 != SS_SUCCESS) {
2954
drbd_khelper(device, "pri-lost-after-sb");
2955
} else {
2956
drbd_warn(device, "Successfully gave up primary role.\n");
2957
rv = hg;
2958
}
2959
} else
2960
rv = hg;
2961
}
2962
2963
return rv;
2964
}
2965
2966
/*
2967
* drbd_asb_recover_2p - Recover after split-brain with two remaining primaries
2968
*/
2969
static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
2970
{
2971
struct drbd_device *device = peer_device->device;
2972
int hg, rv = -100;
2973
enum drbd_after_sb_p after_sb_2p;
2974
2975
rcu_read_lock();
2976
after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
2977
rcu_read_unlock();
2978
switch (after_sb_2p) {
2979
case ASB_DISCARD_YOUNGER_PRI:
2980
case ASB_DISCARD_OLDER_PRI:
2981
case ASB_DISCARD_LEAST_CHG:
2982
case ASB_DISCARD_LOCAL:
2983
case ASB_DISCARD_REMOTE:
2984
case ASB_CONSENSUS:
2985
case ASB_DISCARD_SECONDARY:
2986
case ASB_DISCARD_ZERO_CHG:
2987
drbd_err(device, "Configuration error.\n");
2988
break;
2989
case ASB_VIOLENTLY:
2990
rv = drbd_asb_recover_0p(peer_device);
2991
break;
2992
case ASB_DISCONNECT:
2993
break;
2994
case ASB_CALL_HELPER:
2995
hg = drbd_asb_recover_0p(peer_device);
2996
if (hg == -1) {
2997
enum drbd_state_rv rv2;
2998
2999
/* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3000
* we might be here in C_WF_REPORT_PARAMS which is transient.
3001
* we do not need to wait for the after state change work either. */
3002
rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3003
if (rv2 != SS_SUCCESS) {
3004
drbd_khelper(device, "pri-lost-after-sb");
3005
} else {
3006
drbd_warn(device, "Successfully gave up primary role.\n");
3007
rv = hg;
3008
}
3009
} else
3010
rv = hg;
3011
}
3012
3013
return rv;
3014
}
3015
3016
static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
3017
u64 bits, u64 flags)
3018
{
3019
if (!uuid) {
3020
drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
3021
return;
3022
}
3023
drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
3024
text,
3025
(unsigned long long)uuid[UI_CURRENT],
3026
(unsigned long long)uuid[UI_BITMAP],
3027
(unsigned long long)uuid[UI_HISTORY_START],
3028
(unsigned long long)uuid[UI_HISTORY_END],
3029
(unsigned long long)bits,
3030
(unsigned long long)flags);
3031
}
3032
3033
/*
3034
100 after split brain try auto recover
3035
2 C_SYNC_SOURCE set BitMap
3036
1 C_SYNC_SOURCE use BitMap
3037
0 no Sync
3038
-1 C_SYNC_TARGET use BitMap
3039
-2 C_SYNC_TARGET set BitMap
3040
-100 after split brain, disconnect
3041
-1000 unrelated data
3042
-1091 requires proto 91
3043
-1096 requires proto 96
3044
*/
3045
3046
static int drbd_uuid_compare(struct drbd_peer_device *const peer_device,
3047
enum drbd_role const peer_role, int *rule_nr) __must_hold(local)
3048
{
3049
struct drbd_connection *const connection = peer_device->connection;
3050
struct drbd_device *device = peer_device->device;
3051
u64 self, peer;
3052
int i, j;
3053
3054
self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3055
peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3056
3057
*rule_nr = 10;
3058
if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
3059
return 0;
3060
3061
*rule_nr = 20;
3062
if ((self == UUID_JUST_CREATED || self == (u64)0) &&
3063
peer != UUID_JUST_CREATED)
3064
return -2;
3065
3066
*rule_nr = 30;
3067
if (self != UUID_JUST_CREATED &&
3068
(peer == UUID_JUST_CREATED || peer == (u64)0))
3069
return 2;
3070
3071
if (self == peer) {
3072
int rct, dc; /* roles at crash time */
3073
3074
if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
3075
3076
if (connection->agreed_pro_version < 91)
3077
return -1091;
3078
3079
if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
3080
(device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
3081
drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
3082
drbd_uuid_move_history(device);
3083
device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
3084
device->ldev->md.uuid[UI_BITMAP] = 0;
3085
3086
drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3087
device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3088
*rule_nr = 34;
3089
} else {
3090
drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
3091
*rule_nr = 36;
3092
}
3093
3094
return 1;
3095
}
3096
3097
if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
3098
3099
if (connection->agreed_pro_version < 91)
3100
return -1091;
3101
3102
if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3103
(device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
3104
drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3105
3106
device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3107
device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3108
device->p_uuid[UI_BITMAP] = 0UL;
3109
3110
drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3111
*rule_nr = 35;
3112
} else {
3113
drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3114
*rule_nr = 37;
3115
}
3116
3117
return -1;
3118
}
3119
3120
/* Common power [off|failure] */
3121
rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3122
(device->p_uuid[UI_FLAGS] & 2);
3123
/* lowest bit is set when we were primary,
3124
* next bit (weight 2) is set when peer was primary */
3125
*rule_nr = 40;
3126
3127
/* Neither has the "crashed primary" flag set,
3128
* only a replication link hickup. */
3129
if (rct == 0)
3130
return 0;
3131
3132
/* Current UUID equal and no bitmap uuid; does not necessarily
3133
* mean this was a "simultaneous hard crash", maybe IO was
3134
* frozen, so no UUID-bump happened.
3135
* This is a protocol change, overload DRBD_FF_WSAME as flag
3136
* for "new-enough" peer DRBD version. */
3137
if (device->state.role == R_PRIMARY || peer_role == R_PRIMARY) {
3138
*rule_nr = 41;
3139
if (!(connection->agreed_features & DRBD_FF_WSAME)) {
3140
drbd_warn(peer_device, "Equivalent unrotated UUIDs, but current primary present.\n");
3141
return -(0x10000 | PRO_VERSION_MAX | (DRBD_FF_WSAME << 8));
3142
}
3143
if (device->state.role == R_PRIMARY && peer_role == R_PRIMARY) {
3144
/* At least one has the "crashed primary" bit set,
3145
* both are primary now, but neither has rotated its UUIDs?
3146
* "Can not happen." */
3147
drbd_err(peer_device, "Equivalent unrotated UUIDs, but both are primary. Can not resolve this.\n");
3148
return -100;
3149
}
3150
if (device->state.role == R_PRIMARY)
3151
return 1;
3152
return -1;
3153
}
3154
3155
/* Both are secondary.
3156
* Really looks like recovery from simultaneous hard crash.
3157
* Check which had been primary before, and arbitrate. */
3158
switch (rct) {
3159
case 0: /* !self_pri && !peer_pri */ return 0; /* already handled */
3160
case 1: /* self_pri && !peer_pri */ return 1;
3161
case 2: /* !self_pri && peer_pri */ return -1;
3162
case 3: /* self_pri && peer_pri */
3163
dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3164
return dc ? -1 : 1;
3165
}
3166
}
3167
3168
*rule_nr = 50;
3169
peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3170
if (self == peer)
3171
return -1;
3172
3173
*rule_nr = 51;
3174
peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3175
if (self == peer) {
3176
if (connection->agreed_pro_version < 96 ?
3177
(device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3178
(device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3179
peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3180
/* The last P_SYNC_UUID did not get though. Undo the last start of
3181
resync as sync source modifications of the peer's UUIDs. */
3182
3183
if (connection->agreed_pro_version < 91)
3184
return -1091;
3185
3186
device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3187
device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3188
3189
drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3190
drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3191
3192
return -1;
3193
}
3194
}
3195
3196
*rule_nr = 60;
3197
self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3198
for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3199
peer = device->p_uuid[i] & ~((u64)1);
3200
if (self == peer)
3201
return -2;
3202
}
3203
3204
*rule_nr = 70;
3205
self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3206
peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3207
if (self == peer)
3208
return 1;
3209
3210
*rule_nr = 71;
3211
self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3212
if (self == peer) {
3213
if (connection->agreed_pro_version < 96 ?
3214
(device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3215
(device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3216
self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3217
/* The last P_SYNC_UUID did not get though. Undo the last start of
3218
resync as sync source modifications of our UUIDs. */
3219
3220
if (connection->agreed_pro_version < 91)
3221
return -1091;
3222
3223
__drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3224
__drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3225
3226
drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3227
drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3228
device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3229
3230
return 1;
3231
}
3232
}
3233
3234
3235
*rule_nr = 80;
3236
peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3237
for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3238
self = device->ldev->md.uuid[i] & ~((u64)1);
3239
if (self == peer)
3240
return 2;
3241
}
3242
3243
*rule_nr = 90;
3244
self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3245
peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3246
if (self == peer && self != ((u64)0))
3247
return 100;
3248
3249
*rule_nr = 100;
3250
for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3251
self = device->ldev->md.uuid[i] & ~((u64)1);
3252
for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3253
peer = device->p_uuid[j] & ~((u64)1);
3254
if (self == peer)
3255
return -100;
3256
}
3257
}
3258
3259
return -1000;
3260
}
3261
3262
/* drbd_sync_handshake() returns the new conn state on success, or
3263
CONN_MASK (-1) on failure.
3264
*/
3265
static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3266
enum drbd_role peer_role,
3267
enum drbd_disk_state peer_disk) __must_hold(local)
3268
{
3269
struct drbd_device *device = peer_device->device;
3270
enum drbd_conns rv = C_MASK;
3271
enum drbd_disk_state mydisk;
3272
struct net_conf *nc;
3273
int hg, rule_nr, rr_conflict, tentative, always_asbp;
3274
3275
mydisk = device->state.disk;
3276
if (mydisk == D_NEGOTIATING)
3277
mydisk = device->new_state_tmp.disk;
3278
3279
drbd_info(device, "drbd_sync_handshake:\n");
3280
3281
spin_lock_irq(&device->ldev->md.uuid_lock);
3282
drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3283
drbd_uuid_dump(device, "peer", device->p_uuid,
3284
device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3285
3286
hg = drbd_uuid_compare(peer_device, peer_role, &rule_nr);
3287
spin_unlock_irq(&device->ldev->md.uuid_lock);
3288
3289
drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3290
3291
if (hg == -1000) {
3292
drbd_alert(device, "Unrelated data, aborting!\n");
3293
return C_MASK;
3294
}
3295
if (hg < -0x10000) {
3296
int proto, fflags;
3297
hg = -hg;
3298
proto = hg & 0xff;
3299
fflags = (hg >> 8) & 0xff;
3300
drbd_alert(device, "To resolve this both sides have to support at least protocol %d and feature flags 0x%x\n",
3301
proto, fflags);
3302
return C_MASK;
3303
}
3304
if (hg < -1000) {
3305
drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3306
return C_MASK;
3307
}
3308
3309
if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3310
(peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
3311
int f = (hg == -100) || abs(hg) == 2;
3312
hg = mydisk > D_INCONSISTENT ? 1 : -1;
3313
if (f)
3314
hg = hg*2;
3315
drbd_info(device, "Becoming sync %s due to disk states.\n",
3316
hg > 0 ? "source" : "target");
3317
}
3318
3319
if (abs(hg) == 100)
3320
drbd_khelper(device, "initial-split-brain");
3321
3322
rcu_read_lock();
3323
nc = rcu_dereference(peer_device->connection->net_conf);
3324
always_asbp = nc->always_asbp;
3325
rr_conflict = nc->rr_conflict;
3326
tentative = nc->tentative;
3327
rcu_read_unlock();
3328
3329
if (hg == 100 || (hg == -100 && always_asbp)) {
3330
int pcount = (device->state.role == R_PRIMARY)
3331
+ (peer_role == R_PRIMARY);
3332
int forced = (hg == -100);
3333
3334
switch (pcount) {
3335
case 0:
3336
hg = drbd_asb_recover_0p(peer_device);
3337
break;
3338
case 1:
3339
hg = drbd_asb_recover_1p(peer_device);
3340
break;
3341
case 2:
3342
hg = drbd_asb_recover_2p(peer_device);
3343
break;
3344
}
3345
if (abs(hg) < 100) {
3346
drbd_warn(device, "Split-Brain detected, %d primaries, "
3347
"automatically solved. Sync from %s node\n",
3348
pcount, (hg < 0) ? "peer" : "this");
3349
if (forced) {
3350
drbd_warn(device, "Doing a full sync, since"
3351
" UUIDs where ambiguous.\n");
3352
hg = hg*2;
3353
}
3354
}
3355
}
3356
3357
if (hg == -100) {
3358
if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3359
hg = -1;
3360
if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3361
hg = 1;
3362
3363
if (abs(hg) < 100)
3364
drbd_warn(device, "Split-Brain detected, manually solved. "
3365
"Sync from %s node\n",
3366
(hg < 0) ? "peer" : "this");
3367
}
3368
3369
if (hg == -100) {
3370
/* FIXME this log message is not correct if we end up here
3371
* after an attempted attach on a diskless node.
3372
* We just refuse to attach -- well, we drop the "connection"
3373
* to that disk, in a way... */
3374
drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3375
drbd_khelper(device, "split-brain");
3376
return C_MASK;
3377
}
3378
3379
if (hg > 0 && mydisk <= D_INCONSISTENT) {
3380
drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3381
return C_MASK;
3382
}
3383
3384
if (hg < 0 && /* by intention we do not use mydisk here. */
3385
device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3386
switch (rr_conflict) {
3387
case ASB_CALL_HELPER:
3388
drbd_khelper(device, "pri-lost");
3389
fallthrough;
3390
case ASB_DISCONNECT:
3391
drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3392
return C_MASK;
3393
case ASB_VIOLENTLY:
3394
drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3395
"assumption\n");
3396
}
3397
}
3398
3399
if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3400
if (hg == 0)
3401
drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3402
else
3403
drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3404
drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3405
abs(hg) >= 2 ? "full" : "bit-map based");
3406
return C_MASK;
3407
}
3408
3409
if (abs(hg) >= 2) {
3410
drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3411
if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3412
BM_LOCKED_SET_ALLOWED, NULL))
3413
return C_MASK;
3414
}
3415
3416
if (hg > 0) { /* become sync source. */
3417
rv = C_WF_BITMAP_S;
3418
} else if (hg < 0) { /* become sync target */
3419
rv = C_WF_BITMAP_T;
3420
} else {
3421
rv = C_CONNECTED;
3422
if (drbd_bm_total_weight(device)) {
3423
drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3424
drbd_bm_total_weight(device));
3425
}
3426
}
3427
3428
return rv;
3429
}
3430
3431
static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3432
{
3433
/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3434
if (peer == ASB_DISCARD_REMOTE)
3435
return ASB_DISCARD_LOCAL;
3436
3437
/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3438
if (peer == ASB_DISCARD_LOCAL)
3439
return ASB_DISCARD_REMOTE;
3440
3441
/* everything else is valid if they are equal on both sides. */
3442
return peer;
3443
}
3444
3445
static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3446
{
3447
struct p_protocol *p = pi->data;
3448
enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3449
int p_proto, p_discard_my_data, p_two_primaries, cf;
3450
struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3451
char integrity_alg[SHARED_SECRET_MAX] = "";
3452
struct crypto_shash *peer_integrity_tfm = NULL;
3453
void *int_dig_in = NULL, *int_dig_vv = NULL;
3454
3455
p_proto = be32_to_cpu(p->protocol);
3456
p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
3457
p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
3458
p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
3459
p_two_primaries = be32_to_cpu(p->two_primaries);
3460
cf = be32_to_cpu(p->conn_flags);
3461
p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3462
3463
if (connection->agreed_pro_version >= 87) {
3464
int err;
3465
3466
if (pi->size > sizeof(integrity_alg))
3467
return -EIO;
3468
err = drbd_recv_all(connection, integrity_alg, pi->size);
3469
if (err)
3470
return err;
3471
integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3472
}
3473
3474
if (pi->cmd != P_PROTOCOL_UPDATE) {
3475
clear_bit(CONN_DRY_RUN, &connection->flags);
3476
3477
if (cf & CF_DRY_RUN)
3478
set_bit(CONN_DRY_RUN, &connection->flags);
3479
3480
rcu_read_lock();
3481
nc = rcu_dereference(connection->net_conf);
3482
3483
if (p_proto != nc->wire_protocol) {
3484
drbd_err(connection, "incompatible %s settings\n", "protocol");
3485
goto disconnect_rcu_unlock;
3486
}
3487
3488
if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3489
drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3490
goto disconnect_rcu_unlock;
3491
}
3492
3493
if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3494
drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3495
goto disconnect_rcu_unlock;
3496
}
3497
3498
if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3499
drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3500
goto disconnect_rcu_unlock;
3501
}
3502
3503
if (p_discard_my_data && nc->discard_my_data) {
3504
drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3505
goto disconnect_rcu_unlock;
3506
}
3507
3508
if (p_two_primaries != nc->two_primaries) {
3509
drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3510
goto disconnect_rcu_unlock;
3511
}
3512
3513
if (strcmp(integrity_alg, nc->integrity_alg)) {
3514
drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3515
goto disconnect_rcu_unlock;
3516
}
3517
3518
rcu_read_unlock();
3519
}
3520
3521
if (integrity_alg[0]) {
3522
int hash_size;
3523
3524
/*
3525
* We can only change the peer data integrity algorithm
3526
* here. Changing our own data integrity algorithm
3527
* requires that we send a P_PROTOCOL_UPDATE packet at
3528
* the same time; otherwise, the peer has no way to
3529
* tell between which packets the algorithm should
3530
* change.
3531
*/
3532
3533
peer_integrity_tfm = crypto_alloc_shash(integrity_alg, 0, 0);
3534
if (IS_ERR(peer_integrity_tfm)) {
3535
peer_integrity_tfm = NULL;
3536
drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3537
integrity_alg);
3538
goto disconnect;
3539
}
3540
3541
hash_size = crypto_shash_digestsize(peer_integrity_tfm);
3542
int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3543
int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3544
if (!(int_dig_in && int_dig_vv)) {
3545
drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3546
goto disconnect;
3547
}
3548
}
3549
3550
new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3551
if (!new_net_conf)
3552
goto disconnect;
3553
3554
mutex_lock(&connection->data.mutex);
3555
mutex_lock(&connection->resource->conf_update);
3556
old_net_conf = connection->net_conf;
3557
*new_net_conf = *old_net_conf;
3558
3559
new_net_conf->wire_protocol = p_proto;
3560
new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3561
new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3562
new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3563
new_net_conf->two_primaries = p_two_primaries;
3564
3565
rcu_assign_pointer(connection->net_conf, new_net_conf);
3566
mutex_unlock(&connection->resource->conf_update);
3567
mutex_unlock(&connection->data.mutex);
3568
3569
crypto_free_shash(connection->peer_integrity_tfm);
3570
kfree(connection->int_dig_in);
3571
kfree(connection->int_dig_vv);
3572
connection->peer_integrity_tfm = peer_integrity_tfm;
3573
connection->int_dig_in = int_dig_in;
3574
connection->int_dig_vv = int_dig_vv;
3575
3576
if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3577
drbd_info(connection, "peer data-integrity-alg: %s\n",
3578
integrity_alg[0] ? integrity_alg : "(none)");
3579
3580
kvfree_rcu_mightsleep(old_net_conf);
3581
return 0;
3582
3583
disconnect_rcu_unlock:
3584
rcu_read_unlock();
3585
disconnect:
3586
crypto_free_shash(peer_integrity_tfm);
3587
kfree(int_dig_in);
3588
kfree(int_dig_vv);
3589
conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3590
return -EIO;
3591
}
3592
3593
/* helper function
3594
* input: alg name, feature name
3595
* return: NULL (alg name was "")
3596
* ERR_PTR(error) if something goes wrong
3597
* or the crypto hash ptr, if it worked out ok. */
3598
static struct crypto_shash *drbd_crypto_alloc_digest_safe(
3599
const struct drbd_device *device,
3600
const char *alg, const char *name)
3601
{
3602
struct crypto_shash *tfm;
3603
3604
if (!alg[0])
3605
return NULL;
3606
3607
tfm = crypto_alloc_shash(alg, 0, 0);
3608
if (IS_ERR(tfm)) {
3609
drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3610
alg, name, PTR_ERR(tfm));
3611
return tfm;
3612
}
3613
return tfm;
3614
}
3615
3616
static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3617
{
3618
void *buffer = connection->data.rbuf;
3619
int size = pi->size;
3620
3621
while (size) {
3622
int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3623
s = drbd_recv(connection, buffer, s);
3624
if (s <= 0) {
3625
if (s < 0)
3626
return s;
3627
break;
3628
}
3629
size -= s;
3630
}
3631
if (size)
3632
return -EIO;
3633
return 0;
3634
}
3635
3636
/*
3637
* config_unknown_volume - device configuration command for unknown volume
3638
*
3639
* When a device is added to an existing connection, the node on which the
3640
* device is added first will send configuration commands to its peer but the
3641
* peer will not know about the device yet. It will warn and ignore these
3642
* commands. Once the device is added on the second node, the second node will
3643
* send the same device configuration commands, but in the other direction.
3644
*
3645
* (We can also end up here if drbd is misconfigured.)
3646
*/
3647
static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3648
{
3649
drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3650
cmdname(pi->cmd), pi->vnr);
3651
return ignore_remaining_packet(connection, pi);
3652
}
3653
3654
static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3655
{
3656
struct drbd_peer_device *peer_device;
3657
struct drbd_device *device;
3658
struct p_rs_param_95 *p;
3659
unsigned int header_size, data_size, exp_max_sz;
3660
struct crypto_shash *verify_tfm = NULL;
3661
struct crypto_shash *csums_tfm = NULL;
3662
struct net_conf *old_net_conf, *new_net_conf = NULL;
3663
struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3664
const int apv = connection->agreed_pro_version;
3665
struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3666
unsigned int fifo_size = 0;
3667
int err;
3668
3669
peer_device = conn_peer_device(connection, pi->vnr);
3670
if (!peer_device)
3671
return config_unknown_volume(connection, pi);
3672
device = peer_device->device;
3673
3674
exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
3675
: apv == 88 ? sizeof(struct p_rs_param)
3676
+ SHARED_SECRET_MAX
3677
: apv <= 94 ? sizeof(struct p_rs_param_89)
3678
: /* apv >= 95 */ sizeof(struct p_rs_param_95);
3679
3680
if (pi->size > exp_max_sz) {
3681
drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3682
pi->size, exp_max_sz);
3683
return -EIO;
3684
}
3685
3686
if (apv <= 88) {
3687
header_size = sizeof(struct p_rs_param);
3688
data_size = pi->size - header_size;
3689
} else if (apv <= 94) {
3690
header_size = sizeof(struct p_rs_param_89);
3691
data_size = pi->size - header_size;
3692
D_ASSERT(device, data_size == 0);
3693
} else {
3694
header_size = sizeof(struct p_rs_param_95);
3695
data_size = pi->size - header_size;
3696
D_ASSERT(device, data_size == 0);
3697
}
3698
3699
/* initialize verify_alg and csums_alg */
3700
p = pi->data;
3701
BUILD_BUG_ON(sizeof(p->algs) != 2 * SHARED_SECRET_MAX);
3702
memset(&p->algs, 0, sizeof(p->algs));
3703
3704
err = drbd_recv_all(peer_device->connection, p, header_size);
3705
if (err)
3706
return err;
3707
3708
mutex_lock(&connection->resource->conf_update);
3709
old_net_conf = peer_device->connection->net_conf;
3710
if (get_ldev(device)) {
3711
new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3712
if (!new_disk_conf) {
3713
put_ldev(device);
3714
mutex_unlock(&connection->resource->conf_update);
3715
drbd_err(device, "Allocation of new disk_conf failed\n");
3716
return -ENOMEM;
3717
}
3718
3719
old_disk_conf = device->ldev->disk_conf;
3720
*new_disk_conf = *old_disk_conf;
3721
3722
new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3723
}
3724
3725
if (apv >= 88) {
3726
if (apv == 88) {
3727
if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3728
drbd_err(device, "verify-alg of wrong size, "
3729
"peer wants %u, accepting only up to %u byte\n",
3730
data_size, SHARED_SECRET_MAX);
3731
goto reconnect;
3732
}
3733
3734
err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3735
if (err)
3736
goto reconnect;
3737
/* we expect NUL terminated string */
3738
/* but just in case someone tries to be evil */
3739
D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3740
p->verify_alg[data_size-1] = 0;
3741
3742
} else /* apv >= 89 */ {
3743
/* we still expect NUL terminated strings */
3744
/* but just in case someone tries to be evil */
3745
D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3746
D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3747
p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3748
p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3749
}
3750
3751
if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3752
if (device->state.conn == C_WF_REPORT_PARAMS) {
3753
drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3754
old_net_conf->verify_alg, p->verify_alg);
3755
goto disconnect;
3756
}
3757
verify_tfm = drbd_crypto_alloc_digest_safe(device,
3758
p->verify_alg, "verify-alg");
3759
if (IS_ERR(verify_tfm)) {
3760
verify_tfm = NULL;
3761
goto disconnect;
3762
}
3763
}
3764
3765
if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3766
if (device->state.conn == C_WF_REPORT_PARAMS) {
3767
drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3768
old_net_conf->csums_alg, p->csums_alg);
3769
goto disconnect;
3770
}
3771
csums_tfm = drbd_crypto_alloc_digest_safe(device,
3772
p->csums_alg, "csums-alg");
3773
if (IS_ERR(csums_tfm)) {
3774
csums_tfm = NULL;
3775
goto disconnect;
3776
}
3777
}
3778
3779
if (apv > 94 && new_disk_conf) {
3780
new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3781
new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3782
new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3783
new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3784
3785
fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3786
if (fifo_size != device->rs_plan_s->size) {
3787
new_plan = fifo_alloc(fifo_size);
3788
if (!new_plan) {
3789
drbd_err(device, "kmalloc of fifo_buffer failed");
3790
put_ldev(device);
3791
goto disconnect;
3792
}
3793
}
3794
}
3795
3796
if (verify_tfm || csums_tfm) {
3797
new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3798
if (!new_net_conf)
3799
goto disconnect;
3800
3801
*new_net_conf = *old_net_conf;
3802
3803
if (verify_tfm) {
3804
strcpy(new_net_conf->verify_alg, p->verify_alg);
3805
new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3806
crypto_free_shash(peer_device->connection->verify_tfm);
3807
peer_device->connection->verify_tfm = verify_tfm;
3808
drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3809
}
3810
if (csums_tfm) {
3811
strcpy(new_net_conf->csums_alg, p->csums_alg);
3812
new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3813
crypto_free_shash(peer_device->connection->csums_tfm);
3814
peer_device->connection->csums_tfm = csums_tfm;
3815
drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3816
}
3817
rcu_assign_pointer(connection->net_conf, new_net_conf);
3818
}
3819
}
3820
3821
if (new_disk_conf) {
3822
rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3823
put_ldev(device);
3824
}
3825
3826
if (new_plan) {
3827
old_plan = device->rs_plan_s;
3828
rcu_assign_pointer(device->rs_plan_s, new_plan);
3829
}
3830
3831
mutex_unlock(&connection->resource->conf_update);
3832
synchronize_rcu();
3833
if (new_net_conf)
3834
kfree(old_net_conf);
3835
kfree(old_disk_conf);
3836
kfree(old_plan);
3837
3838
return 0;
3839
3840
reconnect:
3841
if (new_disk_conf) {
3842
put_ldev(device);
3843
kfree(new_disk_conf);
3844
}
3845
mutex_unlock(&connection->resource->conf_update);
3846
return -EIO;
3847
3848
disconnect:
3849
kfree(new_plan);
3850
if (new_disk_conf) {
3851
put_ldev(device);
3852
kfree(new_disk_conf);
3853
}
3854
mutex_unlock(&connection->resource->conf_update);
3855
/* just for completeness: actually not needed,
3856
* as this is not reached if csums_tfm was ok. */
3857
crypto_free_shash(csums_tfm);
3858
/* but free the verify_tfm again, if csums_tfm did not work out */
3859
crypto_free_shash(verify_tfm);
3860
conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3861
return -EIO;
3862
}
3863
3864
/* warn if the arguments differ by more than 12.5% */
3865
static void warn_if_differ_considerably(struct drbd_device *device,
3866
const char *s, sector_t a, sector_t b)
3867
{
3868
sector_t d;
3869
if (a == 0 || b == 0)
3870
return;
3871
d = (a > b) ? (a - b) : (b - a);
3872
if (d > (a>>3) || d > (b>>3))
3873
drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3874
(unsigned long long)a, (unsigned long long)b);
3875
}
3876
3877
static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3878
{
3879
struct drbd_peer_device *peer_device;
3880
struct drbd_device *device;
3881
struct p_sizes *p = pi->data;
3882
struct o_qlim *o = (connection->agreed_features & DRBD_FF_WSAME) ? p->qlim : NULL;
3883
enum determine_dev_size dd = DS_UNCHANGED;
3884
sector_t p_size, p_usize, p_csize, my_usize;
3885
sector_t new_size, cur_size;
3886
int ldsc = 0; /* local disk size changed */
3887
enum dds_flags ddsf;
3888
3889
peer_device = conn_peer_device(connection, pi->vnr);
3890
if (!peer_device)
3891
return config_unknown_volume(connection, pi);
3892
device = peer_device->device;
3893
cur_size = get_capacity(device->vdisk);
3894
3895
p_size = be64_to_cpu(p->d_size);
3896
p_usize = be64_to_cpu(p->u_size);
3897
p_csize = be64_to_cpu(p->c_size);
3898
3899
/* just store the peer's disk size for now.
3900
* we still need to figure out whether we accept that. */
3901
device->p_size = p_size;
3902
3903
if (get_ldev(device)) {
3904
rcu_read_lock();
3905
my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
3906
rcu_read_unlock();
3907
3908
warn_if_differ_considerably(device, "lower level device sizes",
3909
p_size, drbd_get_max_capacity(device->ldev));
3910
warn_if_differ_considerably(device, "user requested size",
3911
p_usize, my_usize);
3912
3913
/* if this is the first connect, or an otherwise expected
3914
* param exchange, choose the minimum */
3915
if (device->state.conn == C_WF_REPORT_PARAMS)
3916
p_usize = min_not_zero(my_usize, p_usize);
3917
3918
/* Never shrink a device with usable data during connect,
3919
* or "attach" on the peer.
3920
* But allow online shrinking if we are connected. */
3921
new_size = drbd_new_dev_size(device, device->ldev, p_usize, 0);
3922
if (new_size < cur_size &&
3923
device->state.disk >= D_OUTDATED &&
3924
(device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS)) {
3925
drbd_err(device, "The peer's disk size is too small! (%llu < %llu sectors)\n",
3926
(unsigned long long)new_size, (unsigned long long)cur_size);
3927
conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3928
put_ldev(device);
3929
return -EIO;
3930
}
3931
3932
if (my_usize != p_usize) {
3933
struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3934
3935
new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3936
if (!new_disk_conf) {
3937
put_ldev(device);
3938
return -ENOMEM;
3939
}
3940
3941
mutex_lock(&connection->resource->conf_update);
3942
old_disk_conf = device->ldev->disk_conf;
3943
*new_disk_conf = *old_disk_conf;
3944
new_disk_conf->disk_size = p_usize;
3945
3946
rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3947
mutex_unlock(&connection->resource->conf_update);
3948
kvfree_rcu_mightsleep(old_disk_conf);
3949
3950
drbd_info(device, "Peer sets u_size to %lu sectors (old: %lu)\n",
3951
(unsigned long)p_usize, (unsigned long)my_usize);
3952
}
3953
3954
put_ldev(device);
3955
}
3956
3957
device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3958
/* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size().
3959
In case we cleared the QUEUE_FLAG_DISCARD from our queue in
3960
drbd_reconsider_queue_parameters(), we can be sure that after
3961
drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
3962
3963
ddsf = be16_to_cpu(p->dds_flags);
3964
if (get_ldev(device)) {
3965
drbd_reconsider_queue_parameters(device, device->ldev, o);
3966
dd = drbd_determine_dev_size(device, ddsf, NULL);
3967
put_ldev(device);
3968
if (dd == DS_ERROR)
3969
return -EIO;
3970
drbd_md_sync(device);
3971
} else {
3972
/*
3973
* I am diskless, need to accept the peer's *current* size.
3974
* I must NOT accept the peers backing disk size,
3975
* it may have been larger than mine all along...
3976
*
3977
* At this point, the peer knows more about my disk, or at
3978
* least about what we last agreed upon, than myself.
3979
* So if his c_size is less than his d_size, the most likely
3980
* reason is that *my* d_size was smaller last time we checked.
3981
*
3982
* However, if he sends a zero current size,
3983
* take his (user-capped or) backing disk size anyways.
3984
*
3985
* Unless of course he does not have a disk himself.
3986
* In which case we ignore this completely.
3987
*/
3988
sector_t new_size = p_csize ?: p_usize ?: p_size;
3989
drbd_reconsider_queue_parameters(device, NULL, o);
3990
if (new_size == 0) {
3991
/* Ignore, peer does not know nothing. */
3992
} else if (new_size == cur_size) {
3993
/* nothing to do */
3994
} else if (cur_size != 0 && p_size == 0) {
3995
drbd_warn(device, "Ignored diskless peer device size (peer:%llu != me:%llu sectors)!\n",
3996
(unsigned long long)new_size, (unsigned long long)cur_size);
3997
} else if (new_size < cur_size && device->state.role == R_PRIMARY) {
3998
drbd_err(device, "The peer's device size is too small! (%llu < %llu sectors); demote me first!\n",
3999
(unsigned long long)new_size, (unsigned long long)cur_size);
4000
conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4001
return -EIO;
4002
} else {
4003
/* I believe the peer, if
4004
* - I don't have a current size myself
4005
* - we agree on the size anyways
4006
* - I do have a current size, am Secondary,
4007
* and he has the only disk
4008
* - I do have a current size, am Primary,
4009
* and he has the only disk,
4010
* which is larger than my current size
4011
*/
4012
drbd_set_my_capacity(device, new_size);
4013
}
4014
}
4015
4016
if (get_ldev(device)) {
4017
if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
4018
device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
4019
ldsc = 1;
4020
}
4021
4022
put_ldev(device);
4023
}
4024
4025
if (device->state.conn > C_WF_REPORT_PARAMS) {
4026
if (be64_to_cpu(p->c_size) != get_capacity(device->vdisk) ||
4027
ldsc) {
4028
/* we have different sizes, probably peer
4029
* needs to know my new size... */
4030
drbd_send_sizes(peer_device, 0, ddsf);
4031
}
4032
if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
4033
(dd == DS_GREW && device->state.conn == C_CONNECTED)) {
4034
if (device->state.pdsk >= D_INCONSISTENT &&
4035
device->state.disk >= D_INCONSISTENT) {
4036
if (ddsf & DDSF_NO_RESYNC)
4037
drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
4038
else
4039
resync_after_online_grow(device);
4040
} else
4041
set_bit(RESYNC_AFTER_NEG, &device->flags);
4042
}
4043
}
4044
4045
return 0;
4046
}
4047
4048
static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
4049
{
4050
struct drbd_peer_device *peer_device;
4051
struct drbd_device *device;
4052
struct p_uuids *p = pi->data;
4053
u64 *p_uuid;
4054
int i, updated_uuids = 0;
4055
4056
peer_device = conn_peer_device(connection, pi->vnr);
4057
if (!peer_device)
4058
return config_unknown_volume(connection, pi);
4059
device = peer_device->device;
4060
4061
p_uuid = kmalloc_array(UI_EXTENDED_SIZE, sizeof(*p_uuid), GFP_NOIO);
4062
if (!p_uuid)
4063
return false;
4064
4065
for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
4066
p_uuid[i] = be64_to_cpu(p->uuid[i]);
4067
4068
kfree(device->p_uuid);
4069
device->p_uuid = p_uuid;
4070
4071
if ((device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS) &&
4072
device->state.disk < D_INCONSISTENT &&
4073
device->state.role == R_PRIMARY &&
4074
(device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
4075
drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
4076
(unsigned long long)device->ed_uuid);
4077
conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4078
return -EIO;
4079
}
4080
4081
if (get_ldev(device)) {
4082
int skip_initial_sync =
4083
device->state.conn == C_CONNECTED &&
4084
peer_device->connection->agreed_pro_version >= 90 &&
4085
device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
4086
(p_uuid[UI_FLAGS] & 8);
4087
if (skip_initial_sync) {
4088
drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
4089
drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
4090
"clear_n_write from receive_uuids",
4091
BM_LOCKED_TEST_ALLOWED, NULL);
4092
_drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
4093
_drbd_uuid_set(device, UI_BITMAP, 0);
4094
_drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
4095
CS_VERBOSE, NULL);
4096
drbd_md_sync(device);
4097
updated_uuids = 1;
4098
}
4099
put_ldev(device);
4100
} else if (device->state.disk < D_INCONSISTENT &&
4101
device->state.role == R_PRIMARY) {
4102
/* I am a diskless primary, the peer just created a new current UUID
4103
for me. */
4104
updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4105
}
4106
4107
/* Before we test for the disk state, we should wait until an eventually
4108
ongoing cluster wide state change is finished. That is important if
4109
we are primary and are detaching from our disk. We need to see the
4110
new disk state... */
4111
mutex_lock(device->state_mutex);
4112
mutex_unlock(device->state_mutex);
4113
if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
4114
updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4115
4116
if (updated_uuids)
4117
drbd_print_uuids(device, "receiver updated UUIDs to");
4118
4119
return 0;
4120
}
4121
4122
/**
4123
* convert_state() - Converts the peer's view of the cluster state to our point of view
4124
* @ps: The state as seen by the peer.
4125
*/
4126
static union drbd_state convert_state(union drbd_state ps)
4127
{
4128
union drbd_state ms;
4129
4130
static enum drbd_conns c_tab[] = {
4131
[C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
4132
[C_CONNECTED] = C_CONNECTED,
4133
4134
[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
4135
[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
4136
[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
4137
[C_VERIFY_S] = C_VERIFY_T,
4138
[C_MASK] = C_MASK,
4139
};
4140
4141
ms.i = ps.i;
4142
4143
ms.conn = c_tab[ps.conn];
4144
ms.peer = ps.role;
4145
ms.role = ps.peer;
4146
ms.pdsk = ps.disk;
4147
ms.disk = ps.pdsk;
4148
ms.peer_isp = (ps.aftr_isp | ps.user_isp);
4149
4150
return ms;
4151
}
4152
4153
static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
4154
{
4155
struct drbd_peer_device *peer_device;
4156
struct drbd_device *device;
4157
struct p_req_state *p = pi->data;
4158
union drbd_state mask, val;
4159
enum drbd_state_rv rv;
4160
4161
peer_device = conn_peer_device(connection, pi->vnr);
4162
if (!peer_device)
4163
return -EIO;
4164
device = peer_device->device;
4165
4166
mask.i = be32_to_cpu(p->mask);
4167
val.i = be32_to_cpu(p->val);
4168
4169
if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
4170
mutex_is_locked(device->state_mutex)) {
4171
drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
4172
return 0;
4173
}
4174
4175
mask = convert_state(mask);
4176
val = convert_state(val);
4177
4178
rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4179
drbd_send_sr_reply(peer_device, rv);
4180
4181
drbd_md_sync(device);
4182
4183
return 0;
4184
}
4185
4186
static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4187
{
4188
struct p_req_state *p = pi->data;
4189
union drbd_state mask, val;
4190
enum drbd_state_rv rv;
4191
4192
mask.i = be32_to_cpu(p->mask);
4193
val.i = be32_to_cpu(p->val);
4194
4195
if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4196
mutex_is_locked(&connection->cstate_mutex)) {
4197
conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4198
return 0;
4199
}
4200
4201
mask = convert_state(mask);
4202
val = convert_state(val);
4203
4204
rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4205
conn_send_sr_reply(connection, rv);
4206
4207
return 0;
4208
}
4209
4210
static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4211
{
4212
struct drbd_peer_device *peer_device;
4213
struct drbd_device *device;
4214
struct p_state *p = pi->data;
4215
union drbd_state os, ns, peer_state;
4216
enum drbd_disk_state real_peer_disk;
4217
enum chg_state_flags cs_flags;
4218
int rv;
4219
4220
peer_device = conn_peer_device(connection, pi->vnr);
4221
if (!peer_device)
4222
return config_unknown_volume(connection, pi);
4223
device = peer_device->device;
4224
4225
peer_state.i = be32_to_cpu(p->state);
4226
4227
real_peer_disk = peer_state.disk;
4228
if (peer_state.disk == D_NEGOTIATING) {
4229
real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4230
drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4231
}
4232
4233
spin_lock_irq(&device->resource->req_lock);
4234
retry:
4235
os = ns = drbd_read_state(device);
4236
spin_unlock_irq(&device->resource->req_lock);
4237
4238
/* If some other part of the code (ack_receiver thread, timeout)
4239
* already decided to close the connection again,
4240
* we must not "re-establish" it here. */
4241
if (os.conn <= C_TEAR_DOWN)
4242
return -ECONNRESET;
4243
4244
/* If this is the "end of sync" confirmation, usually the peer disk
4245
* transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4246
* set) resync started in PausedSyncT, or if the timing of pause-/
4247
* unpause-sync events has been "just right", the peer disk may
4248
* transition from D_CONSISTENT to D_UP_TO_DATE as well.
4249
*/
4250
if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4251
real_peer_disk == D_UP_TO_DATE &&
4252
os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4253
/* If we are (becoming) SyncSource, but peer is still in sync
4254
* preparation, ignore its uptodate-ness to avoid flapping, it
4255
* will change to inconsistent once the peer reaches active
4256
* syncing states.
4257
* It may have changed syncer-paused flags, however, so we
4258
* cannot ignore this completely. */
4259
if (peer_state.conn > C_CONNECTED &&
4260
peer_state.conn < C_SYNC_SOURCE)
4261
real_peer_disk = D_INCONSISTENT;
4262
4263
/* if peer_state changes to connected at the same time,
4264
* it explicitly notifies us that it finished resync.
4265
* Maybe we should finish it up, too? */
4266
else if (os.conn >= C_SYNC_SOURCE &&
4267
peer_state.conn == C_CONNECTED) {
4268
if (drbd_bm_total_weight(device) <= device->rs_failed)
4269
drbd_resync_finished(peer_device);
4270
return 0;
4271
}
4272
}
4273
4274
/* explicit verify finished notification, stop sector reached. */
4275
if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4276
peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4277
ov_out_of_sync_print(peer_device);
4278
drbd_resync_finished(peer_device);
4279
return 0;
4280
}
4281
4282
/* peer says his disk is inconsistent, while we think it is uptodate,
4283
* and this happens while the peer still thinks we have a sync going on,
4284
* but we think we are already done with the sync.
4285
* We ignore this to avoid flapping pdsk.
4286
* This should not happen, if the peer is a recent version of drbd. */
4287
if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4288
os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4289
real_peer_disk = D_UP_TO_DATE;
4290
4291
if (ns.conn == C_WF_REPORT_PARAMS)
4292
ns.conn = C_CONNECTED;
4293
4294
if (peer_state.conn == C_AHEAD)
4295
ns.conn = C_BEHIND;
4296
4297
/* TODO:
4298
* if (primary and diskless and peer uuid != effective uuid)
4299
* abort attach on peer;
4300
*
4301
* If this node does not have good data, was already connected, but
4302
* the peer did a late attach only now, trying to "negotiate" with me,
4303
* AND I am currently Primary, possibly frozen, with some specific
4304
* "effective" uuid, this should never be reached, really, because
4305
* we first send the uuids, then the current state.
4306
*
4307
* In this scenario, we already dropped the connection hard
4308
* when we received the unsuitable uuids (receive_uuids().
4309
*
4310
* Should we want to change this, that is: not drop the connection in
4311
* receive_uuids() already, then we would need to add a branch here
4312
* that aborts the attach of "unsuitable uuids" on the peer in case
4313
* this node is currently Diskless Primary.
4314
*/
4315
4316
if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4317
get_ldev_if_state(device, D_NEGOTIATING)) {
4318
int cr; /* consider resync */
4319
4320
/* if we established a new connection */
4321
cr = (os.conn < C_CONNECTED);
4322
/* if we had an established connection
4323
* and one of the nodes newly attaches a disk */
4324
cr |= (os.conn == C_CONNECTED &&
4325
(peer_state.disk == D_NEGOTIATING ||
4326
os.disk == D_NEGOTIATING));
4327
/* if we have both been inconsistent, and the peer has been
4328
* forced to be UpToDate with --force */
4329
cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4330
/* if we had been plain connected, and the admin requested to
4331
* start a sync by "invalidate" or "invalidate-remote" */
4332
cr |= (os.conn == C_CONNECTED &&
4333
(peer_state.conn >= C_STARTING_SYNC_S &&
4334
peer_state.conn <= C_WF_BITMAP_T));
4335
4336
if (cr)
4337
ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4338
4339
put_ldev(device);
4340
if (ns.conn == C_MASK) {
4341
ns.conn = C_CONNECTED;
4342
if (device->state.disk == D_NEGOTIATING) {
4343
drbd_force_state(device, NS(disk, D_FAILED));
4344
} else if (peer_state.disk == D_NEGOTIATING) {
4345
drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4346
peer_state.disk = D_DISKLESS;
4347
real_peer_disk = D_DISKLESS;
4348
} else {
4349
if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4350
return -EIO;
4351
D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4352
conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4353
return -EIO;
4354
}
4355
}
4356
}
4357
4358
spin_lock_irq(&device->resource->req_lock);
4359
if (os.i != drbd_read_state(device).i)
4360
goto retry;
4361
clear_bit(CONSIDER_RESYNC, &device->flags);
4362
ns.peer = peer_state.role;
4363
ns.pdsk = real_peer_disk;
4364
ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4365
if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4366
ns.disk = device->new_state_tmp.disk;
4367
cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4368
if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4369
test_bit(NEW_CUR_UUID, &device->flags)) {
4370
/* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4371
for temporal network outages! */
4372
spin_unlock_irq(&device->resource->req_lock);
4373
drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4374
tl_clear(peer_device->connection);
4375
drbd_uuid_new_current(device);
4376
clear_bit(NEW_CUR_UUID, &device->flags);
4377
conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4378
return -EIO;
4379
}
4380
rv = _drbd_set_state(device, ns, cs_flags, NULL);
4381
ns = drbd_read_state(device);
4382
spin_unlock_irq(&device->resource->req_lock);
4383
4384
if (rv < SS_SUCCESS) {
4385
conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4386
return -EIO;
4387
}
4388
4389
if (os.conn > C_WF_REPORT_PARAMS) {
4390
if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4391
peer_state.disk != D_NEGOTIATING ) {
4392
/* we want resync, peer has not yet decided to sync... */
4393
/* Nowadays only used when forcing a node into primary role and
4394
setting its disk to UpToDate with that */
4395
drbd_send_uuids(peer_device);
4396
drbd_send_current_state(peer_device);
4397
}
4398
}
4399
4400
clear_bit(DISCARD_MY_DATA, &device->flags);
4401
4402
drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4403
4404
return 0;
4405
}
4406
4407
static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4408
{
4409
struct drbd_peer_device *peer_device;
4410
struct drbd_device *device;
4411
struct p_rs_uuid *p = pi->data;
4412
4413
peer_device = conn_peer_device(connection, pi->vnr);
4414
if (!peer_device)
4415
return -EIO;
4416
device = peer_device->device;
4417
4418
wait_event(device->misc_wait,
4419
device->state.conn == C_WF_SYNC_UUID ||
4420
device->state.conn == C_BEHIND ||
4421
device->state.conn < C_CONNECTED ||
4422
device->state.disk < D_NEGOTIATING);
4423
4424
/* D_ASSERT(device, device->state.conn == C_WF_SYNC_UUID ); */
4425
4426
/* Here the _drbd_uuid_ functions are right, current should
4427
_not_ be rotated into the history */
4428
if (get_ldev_if_state(device, D_NEGOTIATING)) {
4429
_drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4430
_drbd_uuid_set(device, UI_BITMAP, 0UL);
4431
4432
drbd_print_uuids(device, "updated sync uuid");
4433
drbd_start_resync(device, C_SYNC_TARGET);
4434
4435
put_ldev(device);
4436
} else
4437
drbd_err(device, "Ignoring SyncUUID packet!\n");
4438
4439
return 0;
4440
}
4441
4442
/*
4443
* receive_bitmap_plain
4444
*
4445
* Return 0 when done, 1 when another iteration is needed, and a negative error
4446
* code upon failure.
4447
*/
4448
static int
4449
receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4450
unsigned long *p, struct bm_xfer_ctx *c)
4451
{
4452
unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4453
drbd_header_size(peer_device->connection);
4454
unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4455
c->bm_words - c->word_offset);
4456
unsigned int want = num_words * sizeof(*p);
4457
int err;
4458
4459
if (want != size) {
4460
drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4461
return -EIO;
4462
}
4463
if (want == 0)
4464
return 0;
4465
err = drbd_recv_all(peer_device->connection, p, want);
4466
if (err)
4467
return err;
4468
4469
drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4470
4471
c->word_offset += num_words;
4472
c->bit_offset = c->word_offset * BITS_PER_LONG;
4473
if (c->bit_offset > c->bm_bits)
4474
c->bit_offset = c->bm_bits;
4475
4476
return 1;
4477
}
4478
4479
static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4480
{
4481
return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4482
}
4483
4484
static int dcbp_get_start(struct p_compressed_bm *p)
4485
{
4486
return (p->encoding & 0x80) != 0;
4487
}
4488
4489
static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4490
{
4491
return (p->encoding >> 4) & 0x7;
4492
}
4493
4494
/*
4495
* recv_bm_rle_bits
4496
*
4497
* Return 0 when done, 1 when another iteration is needed, and a negative error
4498
* code upon failure.
4499
*/
4500
static int
4501
recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4502
struct p_compressed_bm *p,
4503
struct bm_xfer_ctx *c,
4504
unsigned int len)
4505
{
4506
struct bitstream bs;
4507
u64 look_ahead;
4508
u64 rl;
4509
u64 tmp;
4510
unsigned long s = c->bit_offset;
4511
unsigned long e;
4512
int toggle = dcbp_get_start(p);
4513
int have;
4514
int bits;
4515
4516
bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4517
4518
bits = bitstream_get_bits(&bs, &look_ahead, 64);
4519
if (bits < 0)
4520
return -EIO;
4521
4522
for (have = bits; have > 0; s += rl, toggle = !toggle) {
4523
bits = vli_decode_bits(&rl, look_ahead);
4524
if (bits <= 0)
4525
return -EIO;
4526
4527
if (toggle) {
4528
e = s + rl -1;
4529
if (e >= c->bm_bits) {
4530
drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4531
return -EIO;
4532
}
4533
_drbd_bm_set_bits(peer_device->device, s, e);
4534
}
4535
4536
if (have < bits) {
4537
drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4538
have, bits, look_ahead,
4539
(unsigned int)(bs.cur.b - p->code),
4540
(unsigned int)bs.buf_len);
4541
return -EIO;
4542
}
4543
/* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4544
if (likely(bits < 64))
4545
look_ahead >>= bits;
4546
else
4547
look_ahead = 0;
4548
have -= bits;
4549
4550
bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4551
if (bits < 0)
4552
return -EIO;
4553
look_ahead |= tmp << have;
4554
have += bits;
4555
}
4556
4557
c->bit_offset = s;
4558
bm_xfer_ctx_bit_to_word_offset(c);
4559
4560
return (s != c->bm_bits);
4561
}
4562
4563
/*
4564
* decode_bitmap_c
4565
*
4566
* Return 0 when done, 1 when another iteration is needed, and a negative error
4567
* code upon failure.
4568
*/
4569
static int
4570
decode_bitmap_c(struct drbd_peer_device *peer_device,
4571
struct p_compressed_bm *p,
4572
struct bm_xfer_ctx *c,
4573
unsigned int len)
4574
{
4575
if (dcbp_get_code(p) == RLE_VLI_Bits)
4576
return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4577
4578
/* other variants had been implemented for evaluation,
4579
* but have been dropped as this one turned out to be "best"
4580
* during all our tests. */
4581
4582
drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4583
conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4584
return -EIO;
4585
}
4586
4587
void INFO_bm_xfer_stats(struct drbd_peer_device *peer_device,
4588
const char *direction, struct bm_xfer_ctx *c)
4589
{
4590
/* what would it take to transfer it "plaintext" */
4591
unsigned int header_size = drbd_header_size(peer_device->connection);
4592
unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4593
unsigned int plain =
4594
header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4595
c->bm_words * sizeof(unsigned long);
4596
unsigned int total = c->bytes[0] + c->bytes[1];
4597
unsigned int r;
4598
4599
/* total can not be zero. but just in case: */
4600
if (total == 0)
4601
return;
4602
4603
/* don't report if not compressed */
4604
if (total >= plain)
4605
return;
4606
4607
/* total < plain. check for overflow, still */
4608
r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4609
: (1000 * total / plain);
4610
4611
if (r > 1000)
4612
r = 1000;
4613
4614
r = 1000 - r;
4615
drbd_info(peer_device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4616
"total %u; compression: %u.%u%%\n",
4617
direction,
4618
c->bytes[1], c->packets[1],
4619
c->bytes[0], c->packets[0],
4620
total, r/10, r % 10);
4621
}
4622
4623
/* Since we are processing the bitfield from lower addresses to higher,
4624
it does not matter if the process it in 32 bit chunks or 64 bit
4625
chunks as long as it is little endian. (Understand it as byte stream,
4626
beginning with the lowest byte...) If we would use big endian
4627
we would need to process it from the highest address to the lowest,
4628
in order to be agnostic to the 32 vs 64 bits issue.
4629
4630
returns 0 on failure, 1 if we successfully received it. */
4631
static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4632
{
4633
struct drbd_peer_device *peer_device;
4634
struct drbd_device *device;
4635
struct bm_xfer_ctx c;
4636
int err;
4637
4638
peer_device = conn_peer_device(connection, pi->vnr);
4639
if (!peer_device)
4640
return -EIO;
4641
device = peer_device->device;
4642
4643
drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4644
/* you are supposed to send additional out-of-sync information
4645
* if you actually set bits during this phase */
4646
4647
c = (struct bm_xfer_ctx) {
4648
.bm_bits = drbd_bm_bits(device),
4649
.bm_words = drbd_bm_words(device),
4650
};
4651
4652
for(;;) {
4653
if (pi->cmd == P_BITMAP)
4654
err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4655
else if (pi->cmd == P_COMPRESSED_BITMAP) {
4656
/* MAYBE: sanity check that we speak proto >= 90,
4657
* and the feature is enabled! */
4658
struct p_compressed_bm *p = pi->data;
4659
4660
if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4661
drbd_err(device, "ReportCBitmap packet too large\n");
4662
err = -EIO;
4663
goto out;
4664
}
4665
if (pi->size <= sizeof(*p)) {
4666
drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4667
err = -EIO;
4668
goto out;
4669
}
4670
err = drbd_recv_all(peer_device->connection, p, pi->size);
4671
if (err)
4672
goto out;
4673
err = decode_bitmap_c(peer_device, p, &c, pi->size);
4674
} else {
4675
drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4676
err = -EIO;
4677
goto out;
4678
}
4679
4680
c.packets[pi->cmd == P_BITMAP]++;
4681
c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4682
4683
if (err <= 0) {
4684
if (err < 0)
4685
goto out;
4686
break;
4687
}
4688
err = drbd_recv_header(peer_device->connection, pi);
4689
if (err)
4690
goto out;
4691
}
4692
4693
INFO_bm_xfer_stats(peer_device, "receive", &c);
4694
4695
if (device->state.conn == C_WF_BITMAP_T) {
4696
enum drbd_state_rv rv;
4697
4698
err = drbd_send_bitmap(device, peer_device);
4699
if (err)
4700
goto out;
4701
/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4702
rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4703
D_ASSERT(device, rv == SS_SUCCESS);
4704
} else if (device->state.conn != C_WF_BITMAP_S) {
4705
/* admin may have requested C_DISCONNECTING,
4706
* other threads may have noticed network errors */
4707
drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4708
drbd_conn_str(device->state.conn));
4709
}
4710
err = 0;
4711
4712
out:
4713
drbd_bm_unlock(device);
4714
if (!err && device->state.conn == C_WF_BITMAP_S)
4715
drbd_start_resync(device, C_SYNC_SOURCE);
4716
return err;
4717
}
4718
4719
static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4720
{
4721
drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4722
pi->cmd, pi->size);
4723
4724
return ignore_remaining_packet(connection, pi);
4725
}
4726
4727
static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4728
{
4729
/* Make sure we've acked all the TCP data associated
4730
* with the data requests being unplugged */
4731
tcp_sock_set_quickack(connection->data.socket->sk, 2);
4732
return 0;
4733
}
4734
4735
static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4736
{
4737
struct drbd_peer_device *peer_device;
4738
struct drbd_device *device;
4739
struct p_block_desc *p = pi->data;
4740
4741
peer_device = conn_peer_device(connection, pi->vnr);
4742
if (!peer_device)
4743
return -EIO;
4744
device = peer_device->device;
4745
4746
switch (device->state.conn) {
4747
case C_WF_SYNC_UUID:
4748
case C_WF_BITMAP_T:
4749
case C_BEHIND:
4750
break;
4751
default:
4752
drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4753
drbd_conn_str(device->state.conn));
4754
}
4755
4756
drbd_set_out_of_sync(peer_device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4757
4758
return 0;
4759
}
4760
4761
static int receive_rs_deallocated(struct drbd_connection *connection, struct packet_info *pi)
4762
{
4763
struct drbd_peer_device *peer_device;
4764
struct p_block_desc *p = pi->data;
4765
struct drbd_device *device;
4766
sector_t sector;
4767
int size, err = 0;
4768
4769
peer_device = conn_peer_device(connection, pi->vnr);
4770
if (!peer_device)
4771
return -EIO;
4772
device = peer_device->device;
4773
4774
sector = be64_to_cpu(p->sector);
4775
size = be32_to_cpu(p->blksize);
4776
4777
dec_rs_pending(peer_device);
4778
4779
if (get_ldev(device)) {
4780
struct drbd_peer_request *peer_req;
4781
4782
peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER, sector,
4783
size, 0, GFP_NOIO);
4784
if (!peer_req) {
4785
put_ldev(device);
4786
return -ENOMEM;
4787
}
4788
4789
peer_req->w.cb = e_end_resync_block;
4790
peer_req->opf = REQ_OP_DISCARD;
4791
peer_req->submit_jif = jiffies;
4792
peer_req->flags |= EE_TRIM;
4793
4794
spin_lock_irq(&device->resource->req_lock);
4795
list_add_tail(&peer_req->w.list, &device->sync_ee);
4796
spin_unlock_irq(&device->resource->req_lock);
4797
4798
atomic_add(pi->size >> 9, &device->rs_sect_ev);
4799
err = drbd_submit_peer_request(peer_req);
4800
4801
if (err) {
4802
spin_lock_irq(&device->resource->req_lock);
4803
list_del(&peer_req->w.list);
4804
spin_unlock_irq(&device->resource->req_lock);
4805
4806
drbd_free_peer_req(device, peer_req);
4807
put_ldev(device);
4808
err = 0;
4809
goto fail;
4810
}
4811
4812
inc_unacked(device);
4813
4814
/* No put_ldev() here. Gets called in drbd_endio_write_sec_final(),
4815
as well as drbd_rs_complete_io() */
4816
} else {
4817
fail:
4818
drbd_rs_complete_io(device, sector);
4819
drbd_send_ack_ex(peer_device, P_NEG_ACK, sector, size, ID_SYNCER);
4820
}
4821
4822
atomic_add(size >> 9, &device->rs_sect_in);
4823
4824
return err;
4825
}
4826
4827
struct data_cmd {
4828
int expect_payload;
4829
unsigned int pkt_size;
4830
int (*fn)(struct drbd_connection *, struct packet_info *);
4831
};
4832
4833
static struct data_cmd drbd_cmd_handler[] = {
4834
[P_DATA] = { 1, sizeof(struct p_data), receive_Data },
4835
[P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
4836
[P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4837
[P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4838
[P_BITMAP] = { 1, 0, receive_bitmap } ,
4839
[P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4840
[P_UNPLUG_REMOTE] = { 0, 0, receive_UnplugRemote },
4841
[P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4842
[P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4843
[P_SYNC_PARAM] = { 1, 0, receive_SyncParam },
4844
[P_SYNC_PARAM89] = { 1, 0, receive_SyncParam },
4845
[P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
4846
[P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
4847
[P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
4848
[P_STATE] = { 0, sizeof(struct p_state), receive_state },
4849
[P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
4850
[P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4851
[P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4852
[P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4853
[P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4854
[P_RS_THIN_REQ] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4855
[P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
4856
[P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4857
[P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4858
[P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4859
[P_TRIM] = { 0, sizeof(struct p_trim), receive_Data },
4860
[P_ZEROES] = { 0, sizeof(struct p_trim), receive_Data },
4861
[P_RS_DEALLOCATED] = { 0, sizeof(struct p_block_desc), receive_rs_deallocated },
4862
};
4863
4864
static void drbdd(struct drbd_connection *connection)
4865
{
4866
struct packet_info pi;
4867
size_t shs; /* sub header size */
4868
int err;
4869
4870
while (get_t_state(&connection->receiver) == RUNNING) {
4871
struct data_cmd const *cmd;
4872
4873
drbd_thread_current_set_cpu(&connection->receiver);
4874
update_receiver_timing_details(connection, drbd_recv_header_maybe_unplug);
4875
if (drbd_recv_header_maybe_unplug(connection, &pi))
4876
goto err_out;
4877
4878
cmd = &drbd_cmd_handler[pi.cmd];
4879
if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4880
drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4881
cmdname(pi.cmd), pi.cmd);
4882
goto err_out;
4883
}
4884
4885
shs = cmd->pkt_size;
4886
if (pi.cmd == P_SIZES && connection->agreed_features & DRBD_FF_WSAME)
4887
shs += sizeof(struct o_qlim);
4888
if (pi.size > shs && !cmd->expect_payload) {
4889
drbd_err(connection, "No payload expected %s l:%d\n",
4890
cmdname(pi.cmd), pi.size);
4891
goto err_out;
4892
}
4893
if (pi.size < shs) {
4894
drbd_err(connection, "%s: unexpected packet size, expected:%d received:%d\n",
4895
cmdname(pi.cmd), (int)shs, pi.size);
4896
goto err_out;
4897
}
4898
4899
if (shs) {
4900
update_receiver_timing_details(connection, drbd_recv_all_warn);
4901
err = drbd_recv_all_warn(connection, pi.data, shs);
4902
if (err)
4903
goto err_out;
4904
pi.size -= shs;
4905
}
4906
4907
update_receiver_timing_details(connection, cmd->fn);
4908
err = cmd->fn(connection, &pi);
4909
if (err) {
4910
drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4911
cmdname(pi.cmd), err, pi.size);
4912
goto err_out;
4913
}
4914
}
4915
return;
4916
4917
err_out:
4918
conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4919
}
4920
4921
static void conn_disconnect(struct drbd_connection *connection)
4922
{
4923
struct drbd_peer_device *peer_device;
4924
enum drbd_conns oc;
4925
int vnr;
4926
4927
if (connection->cstate == C_STANDALONE)
4928
return;
4929
4930
/* We are about to start the cleanup after connection loss.
4931
* Make sure drbd_make_request knows about that.
4932
* Usually we should be in some network failure state already,
4933
* but just in case we are not, we fix it up here.
4934
*/
4935
conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4936
4937
/* ack_receiver does not clean up anything. it must not interfere, either */
4938
drbd_thread_stop(&connection->ack_receiver);
4939
if (connection->ack_sender) {
4940
destroy_workqueue(connection->ack_sender);
4941
connection->ack_sender = NULL;
4942
}
4943
drbd_free_sock(connection);
4944
4945
rcu_read_lock();
4946
idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4947
struct drbd_device *device = peer_device->device;
4948
kref_get(&device->kref);
4949
rcu_read_unlock();
4950
drbd_disconnected(peer_device);
4951
kref_put(&device->kref, drbd_destroy_device);
4952
rcu_read_lock();
4953
}
4954
rcu_read_unlock();
4955
4956
if (!list_empty(&connection->current_epoch->list))
4957
drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4958
/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4959
atomic_set(&connection->current_epoch->epoch_size, 0);
4960
connection->send.seen_any_write_yet = false;
4961
4962
drbd_info(connection, "Connection closed\n");
4963
4964
if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
4965
conn_try_outdate_peer_async(connection);
4966
4967
spin_lock_irq(&connection->resource->req_lock);
4968
oc = connection->cstate;
4969
if (oc >= C_UNCONNECTED)
4970
_conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4971
4972
spin_unlock_irq(&connection->resource->req_lock);
4973
4974
if (oc == C_DISCONNECTING)
4975
conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4976
}
4977
4978
static int drbd_disconnected(struct drbd_peer_device *peer_device)
4979
{
4980
struct drbd_device *device = peer_device->device;
4981
unsigned int i;
4982
4983
/* wait for current activity to cease. */
4984
spin_lock_irq(&device->resource->req_lock);
4985
_drbd_wait_ee_list_empty(device, &device->active_ee);
4986
_drbd_wait_ee_list_empty(device, &device->sync_ee);
4987
_drbd_wait_ee_list_empty(device, &device->read_ee);
4988
spin_unlock_irq(&device->resource->req_lock);
4989
4990
/* We do not have data structures that would allow us to
4991
* get the rs_pending_cnt down to 0 again.
4992
* * On C_SYNC_TARGET we do not have any data structures describing
4993
* the pending RSDataRequest's we have sent.
4994
* * On C_SYNC_SOURCE there is no data structure that tracks
4995
* the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4996
* And no, it is not the sum of the reference counts in the
4997
* resync_LRU. The resync_LRU tracks the whole operation including
4998
* the disk-IO, while the rs_pending_cnt only tracks the blocks
4999
* on the fly. */
5000
drbd_rs_cancel_all(device);
5001
device->rs_total = 0;
5002
device->rs_failed = 0;
5003
atomic_set(&device->rs_pending_cnt, 0);
5004
wake_up(&device->misc_wait);
5005
5006
timer_delete_sync(&device->resync_timer);
5007
resync_timer_fn(&device->resync_timer);
5008
5009
/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
5010
* w_make_resync_request etc. which may still be on the worker queue
5011
* to be "canceled" */
5012
drbd_flush_workqueue(&peer_device->connection->sender_work);
5013
5014
drbd_finish_peer_reqs(device);
5015
5016
/* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
5017
might have issued a work again. The one before drbd_finish_peer_reqs() is
5018
necessary to reclain net_ee in drbd_finish_peer_reqs(). */
5019
drbd_flush_workqueue(&peer_device->connection->sender_work);
5020
5021
/* need to do it again, drbd_finish_peer_reqs() may have populated it
5022
* again via drbd_try_clear_on_disk_bm(). */
5023
drbd_rs_cancel_all(device);
5024
5025
kfree(device->p_uuid);
5026
device->p_uuid = NULL;
5027
5028
if (!drbd_suspended(device))
5029
tl_clear(peer_device->connection);
5030
5031
drbd_md_sync(device);
5032
5033
if (get_ldev(device)) {
5034
drbd_bitmap_io(device, &drbd_bm_write_copy_pages,
5035
"write from disconnected", BM_LOCKED_CHANGE_ALLOWED, NULL);
5036
put_ldev(device);
5037
}
5038
5039
i = atomic_read(&device->pp_in_use_by_net);
5040
if (i)
5041
drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
5042
i = atomic_read(&device->pp_in_use);
5043
if (i)
5044
drbd_info(device, "pp_in_use = %d, expected 0\n", i);
5045
5046
D_ASSERT(device, list_empty(&device->read_ee));
5047
D_ASSERT(device, list_empty(&device->active_ee));
5048
D_ASSERT(device, list_empty(&device->sync_ee));
5049
D_ASSERT(device, list_empty(&device->done_ee));
5050
5051
return 0;
5052
}
5053
5054
/*
5055
* We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
5056
* we can agree on is stored in agreed_pro_version.
5057
*
5058
* feature flags and the reserved array should be enough room for future
5059
* enhancements of the handshake protocol, and possible plugins...
5060
*
5061
* for now, they are expected to be zero, but ignored.
5062
*/
5063
static int drbd_send_features(struct drbd_connection *connection)
5064
{
5065
struct drbd_socket *sock;
5066
struct p_connection_features *p;
5067
5068
sock = &connection->data;
5069
p = conn_prepare_command(connection, sock);
5070
if (!p)
5071
return -EIO;
5072
memset(p, 0, sizeof(*p));
5073
p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
5074
p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
5075
p->feature_flags = cpu_to_be32(PRO_FEATURES);
5076
return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
5077
}
5078
5079
/*
5080
* return values:
5081
* 1 yes, we have a valid connection
5082
* 0 oops, did not work out, please try again
5083
* -1 peer talks different language,
5084
* no point in trying again, please go standalone.
5085
*/
5086
static int drbd_do_features(struct drbd_connection *connection)
5087
{
5088
/* ASSERT current == connection->receiver ... */
5089
struct p_connection_features *p;
5090
const int expect = sizeof(struct p_connection_features);
5091
struct packet_info pi;
5092
int err;
5093
5094
err = drbd_send_features(connection);
5095
if (err)
5096
return 0;
5097
5098
err = drbd_recv_header(connection, &pi);
5099
if (err)
5100
return 0;
5101
5102
if (pi.cmd != P_CONNECTION_FEATURES) {
5103
drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
5104
cmdname(pi.cmd), pi.cmd);
5105
return -1;
5106
}
5107
5108
if (pi.size != expect) {
5109
drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
5110
expect, pi.size);
5111
return -1;
5112
}
5113
5114
p = pi.data;
5115
err = drbd_recv_all_warn(connection, p, expect);
5116
if (err)
5117
return 0;
5118
5119
p->protocol_min = be32_to_cpu(p->protocol_min);
5120
p->protocol_max = be32_to_cpu(p->protocol_max);
5121
if (p->protocol_max == 0)
5122
p->protocol_max = p->protocol_min;
5123
5124
if (PRO_VERSION_MAX < p->protocol_min ||
5125
PRO_VERSION_MIN > p->protocol_max)
5126
goto incompat;
5127
5128
connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
5129
connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
5130
5131
drbd_info(connection, "Handshake successful: "
5132
"Agreed network protocol version %d\n", connection->agreed_pro_version);
5133
5134
drbd_info(connection, "Feature flags enabled on protocol level: 0x%x%s%s%s%s.\n",
5135
connection->agreed_features,
5136
connection->agreed_features & DRBD_FF_TRIM ? " TRIM" : "",
5137
connection->agreed_features & DRBD_FF_THIN_RESYNC ? " THIN_RESYNC" : "",
5138
connection->agreed_features & DRBD_FF_WSAME ? " WRITE_SAME" : "",
5139
connection->agreed_features & DRBD_FF_WZEROES ? " WRITE_ZEROES" :
5140
connection->agreed_features ? "" : " none");
5141
5142
return 1;
5143
5144
incompat:
5145
drbd_err(connection, "incompatible DRBD dialects: "
5146
"I support %d-%d, peer supports %d-%d\n",
5147
PRO_VERSION_MIN, PRO_VERSION_MAX,
5148
p->protocol_min, p->protocol_max);
5149
return -1;
5150
}
5151
5152
#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
5153
static int drbd_do_auth(struct drbd_connection *connection)
5154
{
5155
drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
5156
drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
5157
return -1;
5158
}
5159
#else
5160
#define CHALLENGE_LEN 64
5161
5162
/* Return value:
5163
1 - auth succeeded,
5164
0 - failed, try again (network error),
5165
-1 - auth failed, don't try again.
5166
*/
5167
5168
static int drbd_do_auth(struct drbd_connection *connection)
5169
{
5170
struct drbd_socket *sock;
5171
char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
5172
char *response = NULL;
5173
char *right_response = NULL;
5174
char *peers_ch = NULL;
5175
unsigned int key_len;
5176
char secret[SHARED_SECRET_MAX]; /* 64 byte */
5177
unsigned int resp_size;
5178
struct shash_desc *desc;
5179
struct packet_info pi;
5180
struct net_conf *nc;
5181
int err, rv;
5182
5183
/* FIXME: Put the challenge/response into the preallocated socket buffer. */
5184
5185
rcu_read_lock();
5186
nc = rcu_dereference(connection->net_conf);
5187
key_len = strlen(nc->shared_secret);
5188
memcpy(secret, nc->shared_secret, key_len);
5189
rcu_read_unlock();
5190
5191
desc = kmalloc(sizeof(struct shash_desc) +
5192
crypto_shash_descsize(connection->cram_hmac_tfm),
5193
GFP_KERNEL);
5194
if (!desc) {
5195
rv = -1;
5196
goto fail;
5197
}
5198
desc->tfm = connection->cram_hmac_tfm;
5199
5200
rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
5201
if (rv) {
5202
drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
5203
rv = -1;
5204
goto fail;
5205
}
5206
5207
get_random_bytes(my_challenge, CHALLENGE_LEN);
5208
5209
sock = &connection->data;
5210
if (!conn_prepare_command(connection, sock)) {
5211
rv = 0;
5212
goto fail;
5213
}
5214
rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
5215
my_challenge, CHALLENGE_LEN);
5216
if (!rv)
5217
goto fail;
5218
5219
err = drbd_recv_header(connection, &pi);
5220
if (err) {
5221
rv = 0;
5222
goto fail;
5223
}
5224
5225
if (pi.cmd != P_AUTH_CHALLENGE) {
5226
drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
5227
cmdname(pi.cmd), pi.cmd);
5228
rv = -1;
5229
goto fail;
5230
}
5231
5232
if (pi.size > CHALLENGE_LEN * 2) {
5233
drbd_err(connection, "expected AuthChallenge payload too big.\n");
5234
rv = -1;
5235
goto fail;
5236
}
5237
5238
if (pi.size < CHALLENGE_LEN) {
5239
drbd_err(connection, "AuthChallenge payload too small.\n");
5240
rv = -1;
5241
goto fail;
5242
}
5243
5244
peers_ch = kmalloc(pi.size, GFP_NOIO);
5245
if (!peers_ch) {
5246
rv = -1;
5247
goto fail;
5248
}
5249
5250
err = drbd_recv_all_warn(connection, peers_ch, pi.size);
5251
if (err) {
5252
rv = 0;
5253
goto fail;
5254
}
5255
5256
if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5257
drbd_err(connection, "Peer presented the same challenge!\n");
5258
rv = -1;
5259
goto fail;
5260
}
5261
5262
resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
5263
response = kmalloc(resp_size, GFP_NOIO);
5264
if (!response) {
5265
rv = -1;
5266
goto fail;
5267
}
5268
5269
rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
5270
if (rv) {
5271
drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5272
rv = -1;
5273
goto fail;
5274
}
5275
5276
if (!conn_prepare_command(connection, sock)) {
5277
rv = 0;
5278
goto fail;
5279
}
5280
rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5281
response, resp_size);
5282
if (!rv)
5283
goto fail;
5284
5285
err = drbd_recv_header(connection, &pi);
5286
if (err) {
5287
rv = 0;
5288
goto fail;
5289
}
5290
5291
if (pi.cmd != P_AUTH_RESPONSE) {
5292
drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5293
cmdname(pi.cmd), pi.cmd);
5294
rv = 0;
5295
goto fail;
5296
}
5297
5298
if (pi.size != resp_size) {
5299
drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5300
rv = 0;
5301
goto fail;
5302
}
5303
5304
err = drbd_recv_all_warn(connection, response , resp_size);
5305
if (err) {
5306
rv = 0;
5307
goto fail;
5308
}
5309
5310
right_response = kmalloc(resp_size, GFP_NOIO);
5311
if (!right_response) {
5312
rv = -1;
5313
goto fail;
5314
}
5315
5316
rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
5317
right_response);
5318
if (rv) {
5319
drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5320
rv = -1;
5321
goto fail;
5322
}
5323
5324
rv = !memcmp(response, right_response, resp_size);
5325
5326
if (rv)
5327
drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5328
resp_size);
5329
else
5330
rv = -1;
5331
5332
fail:
5333
kfree(peers_ch);
5334
kfree(response);
5335
kfree(right_response);
5336
if (desc) {
5337
shash_desc_zero(desc);
5338
kfree(desc);
5339
}
5340
5341
return rv;
5342
}
5343
#endif
5344
5345
int drbd_receiver(struct drbd_thread *thi)
5346
{
5347
struct drbd_connection *connection = thi->connection;
5348
int h;
5349
5350
drbd_info(connection, "receiver (re)started\n");
5351
5352
do {
5353
h = conn_connect(connection);
5354
if (h == 0) {
5355
conn_disconnect(connection);
5356
schedule_timeout_interruptible(HZ);
5357
}
5358
if (h == -1) {
5359
drbd_warn(connection, "Discarding network configuration.\n");
5360
conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5361
}
5362
} while (h == 0);
5363
5364
if (h > 0) {
5365
blk_start_plug(&connection->receiver_plug);
5366
drbdd(connection);
5367
blk_finish_plug(&connection->receiver_plug);
5368
}
5369
5370
conn_disconnect(connection);
5371
5372
drbd_info(connection, "receiver terminated\n");
5373
return 0;
5374
}
5375
5376
/* ********* acknowledge sender ******** */
5377
5378
static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5379
{
5380
struct p_req_state_reply *p = pi->data;
5381
int retcode = be32_to_cpu(p->retcode);
5382
5383
if (retcode >= SS_SUCCESS) {
5384
set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5385
} else {
5386
set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5387
drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5388
drbd_set_st_err_str(retcode), retcode);
5389
}
5390
wake_up(&connection->ping_wait);
5391
5392
return 0;
5393
}
5394
5395
static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5396
{
5397
struct drbd_peer_device *peer_device;
5398
struct drbd_device *device;
5399
struct p_req_state_reply *p = pi->data;
5400
int retcode = be32_to_cpu(p->retcode);
5401
5402
peer_device = conn_peer_device(connection, pi->vnr);
5403
if (!peer_device)
5404
return -EIO;
5405
device = peer_device->device;
5406
5407
if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5408
D_ASSERT(device, connection->agreed_pro_version < 100);
5409
return got_conn_RqSReply(connection, pi);
5410
}
5411
5412
if (retcode >= SS_SUCCESS) {
5413
set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5414
} else {
5415
set_bit(CL_ST_CHG_FAIL, &device->flags);
5416
drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5417
drbd_set_st_err_str(retcode), retcode);
5418
}
5419
wake_up(&device->state_wait);
5420
5421
return 0;
5422
}
5423
5424
static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5425
{
5426
return drbd_send_ping_ack(connection);
5427
5428
}
5429
5430
static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5431
{
5432
/* restore idle timeout */
5433
connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5434
if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5435
wake_up(&connection->ping_wait);
5436
5437
return 0;
5438
}
5439
5440
static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5441
{
5442
struct drbd_peer_device *peer_device;
5443
struct drbd_device *device;
5444
struct p_block_ack *p = pi->data;
5445
sector_t sector = be64_to_cpu(p->sector);
5446
int blksize = be32_to_cpu(p->blksize);
5447
5448
peer_device = conn_peer_device(connection, pi->vnr);
5449
if (!peer_device)
5450
return -EIO;
5451
device = peer_device->device;
5452
5453
D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5454
5455
update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5456
5457
if (get_ldev(device)) {
5458
drbd_rs_complete_io(device, sector);
5459
drbd_set_in_sync(peer_device, sector, blksize);
5460
/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5461
device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5462
put_ldev(device);
5463
}
5464
dec_rs_pending(peer_device);
5465
atomic_add(blksize >> 9, &device->rs_sect_in);
5466
5467
return 0;
5468
}
5469
5470
static int
5471
validate_req_change_req_state(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
5472
struct rb_root *root, const char *func,
5473
enum drbd_req_event what, bool missing_ok)
5474
{
5475
struct drbd_device *device = peer_device->device;
5476
struct drbd_request *req;
5477
struct bio_and_error m;
5478
5479
spin_lock_irq(&device->resource->req_lock);
5480
req = find_request(device, root, id, sector, missing_ok, func);
5481
if (unlikely(!req)) {
5482
spin_unlock_irq(&device->resource->req_lock);
5483
return -EIO;
5484
}
5485
__req_mod(req, what, peer_device, &m);
5486
spin_unlock_irq(&device->resource->req_lock);
5487
5488
if (m.bio)
5489
complete_master_bio(device, &m);
5490
return 0;
5491
}
5492
5493
static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5494
{
5495
struct drbd_peer_device *peer_device;
5496
struct drbd_device *device;
5497
struct p_block_ack *p = pi->data;
5498
sector_t sector = be64_to_cpu(p->sector);
5499
int blksize = be32_to_cpu(p->blksize);
5500
enum drbd_req_event what;
5501
5502
peer_device = conn_peer_device(connection, pi->vnr);
5503
if (!peer_device)
5504
return -EIO;
5505
device = peer_device->device;
5506
5507
update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5508
5509
if (p->block_id == ID_SYNCER) {
5510
drbd_set_in_sync(peer_device, sector, blksize);
5511
dec_rs_pending(peer_device);
5512
return 0;
5513
}
5514
switch (pi->cmd) {
5515
case P_RS_WRITE_ACK:
5516
what = WRITE_ACKED_BY_PEER_AND_SIS;
5517
break;
5518
case P_WRITE_ACK:
5519
what = WRITE_ACKED_BY_PEER;
5520
break;
5521
case P_RECV_ACK:
5522
what = RECV_ACKED_BY_PEER;
5523
break;
5524
case P_SUPERSEDED:
5525
what = CONFLICT_RESOLVED;
5526
break;
5527
case P_RETRY_WRITE:
5528
what = POSTPONE_WRITE;
5529
break;
5530
default:
5531
BUG();
5532
}
5533
5534
return validate_req_change_req_state(peer_device, p->block_id, sector,
5535
&device->write_requests, __func__,
5536
what, false);
5537
}
5538
5539
static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5540
{
5541
struct drbd_peer_device *peer_device;
5542
struct drbd_device *device;
5543
struct p_block_ack *p = pi->data;
5544
sector_t sector = be64_to_cpu(p->sector);
5545
int size = be32_to_cpu(p->blksize);
5546
int err;
5547
5548
peer_device = conn_peer_device(connection, pi->vnr);
5549
if (!peer_device)
5550
return -EIO;
5551
device = peer_device->device;
5552
5553
update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5554
5555
if (p->block_id == ID_SYNCER) {
5556
dec_rs_pending(peer_device);
5557
drbd_rs_failed_io(peer_device, sector, size);
5558
return 0;
5559
}
5560
5561
err = validate_req_change_req_state(peer_device, p->block_id, sector,
5562
&device->write_requests, __func__,
5563
NEG_ACKED, true);
5564
if (err) {
5565
/* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5566
The master bio might already be completed, therefore the
5567
request is no longer in the collision hash. */
5568
/* In Protocol B we might already have got a P_RECV_ACK
5569
but then get a P_NEG_ACK afterwards. */
5570
drbd_set_out_of_sync(peer_device, sector, size);
5571
}
5572
return 0;
5573
}
5574
5575
static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5576
{
5577
struct drbd_peer_device *peer_device;
5578
struct drbd_device *device;
5579
struct p_block_ack *p = pi->data;
5580
sector_t sector = be64_to_cpu(p->sector);
5581
5582
peer_device = conn_peer_device(connection, pi->vnr);
5583
if (!peer_device)
5584
return -EIO;
5585
device = peer_device->device;
5586
5587
update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5588
5589
drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5590
(unsigned long long)sector, be32_to_cpu(p->blksize));
5591
5592
return validate_req_change_req_state(peer_device, p->block_id, sector,
5593
&device->read_requests, __func__,
5594
NEG_ACKED, false);
5595
}
5596
5597
static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5598
{
5599
struct drbd_peer_device *peer_device;
5600
struct drbd_device *device;
5601
sector_t sector;
5602
int size;
5603
struct p_block_ack *p = pi->data;
5604
5605
peer_device = conn_peer_device(connection, pi->vnr);
5606
if (!peer_device)
5607
return -EIO;
5608
device = peer_device->device;
5609
5610
sector = be64_to_cpu(p->sector);
5611
size = be32_to_cpu(p->blksize);
5612
5613
update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5614
5615
dec_rs_pending(peer_device);
5616
5617
if (get_ldev_if_state(device, D_FAILED)) {
5618
drbd_rs_complete_io(device, sector);
5619
switch (pi->cmd) {
5620
case P_NEG_RS_DREPLY:
5621
drbd_rs_failed_io(peer_device, sector, size);
5622
break;
5623
case P_RS_CANCEL:
5624
break;
5625
default:
5626
BUG();
5627
}
5628
put_ldev(device);
5629
}
5630
5631
return 0;
5632
}
5633
5634
static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5635
{
5636
struct p_barrier_ack *p = pi->data;
5637
struct drbd_peer_device *peer_device;
5638
int vnr;
5639
5640
tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5641
5642
rcu_read_lock();
5643
idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5644
struct drbd_device *device = peer_device->device;
5645
5646
if (device->state.conn == C_AHEAD &&
5647
atomic_read(&device->ap_in_flight) == 0 &&
5648
!test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5649
device->start_resync_timer.expires = jiffies + HZ;
5650
add_timer(&device->start_resync_timer);
5651
}
5652
}
5653
rcu_read_unlock();
5654
5655
return 0;
5656
}
5657
5658
static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5659
{
5660
struct drbd_peer_device *peer_device;
5661
struct drbd_device *device;
5662
struct p_block_ack *p = pi->data;
5663
struct drbd_device_work *dw;
5664
sector_t sector;
5665
int size;
5666
5667
peer_device = conn_peer_device(connection, pi->vnr);
5668
if (!peer_device)
5669
return -EIO;
5670
device = peer_device->device;
5671
5672
sector = be64_to_cpu(p->sector);
5673
size = be32_to_cpu(p->blksize);
5674
5675
update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5676
5677
if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5678
drbd_ov_out_of_sync_found(peer_device, sector, size);
5679
else
5680
ov_out_of_sync_print(peer_device);
5681
5682
if (!get_ldev(device))
5683
return 0;
5684
5685
drbd_rs_complete_io(device, sector);
5686
dec_rs_pending(peer_device);
5687
5688
--device->ov_left;
5689
5690
/* let's advance progress step marks only for every other megabyte */
5691
if ((device->ov_left & 0x200) == 0x200)
5692
drbd_advance_rs_marks(peer_device, device->ov_left);
5693
5694
if (device->ov_left == 0) {
5695
dw = kmalloc(sizeof(*dw), GFP_NOIO);
5696
if (dw) {
5697
dw->w.cb = w_ov_finished;
5698
dw->device = device;
5699
drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5700
} else {
5701
drbd_err(device, "kmalloc(dw) failed.");
5702
ov_out_of_sync_print(peer_device);
5703
drbd_resync_finished(peer_device);
5704
}
5705
}
5706
put_ldev(device);
5707
return 0;
5708
}
5709
5710
static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5711
{
5712
return 0;
5713
}
5714
5715
struct meta_sock_cmd {
5716
size_t pkt_size;
5717
int (*fn)(struct drbd_connection *connection, struct packet_info *);
5718
};
5719
5720
static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
5721
{
5722
long t;
5723
struct net_conf *nc;
5724
5725
rcu_read_lock();
5726
nc = rcu_dereference(connection->net_conf);
5727
t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5728
rcu_read_unlock();
5729
5730
t *= HZ;
5731
if (ping_timeout)
5732
t /= 10;
5733
5734
connection->meta.socket->sk->sk_rcvtimeo = t;
5735
}
5736
5737
static void set_ping_timeout(struct drbd_connection *connection)
5738
{
5739
set_rcvtimeo(connection, 1);
5740
}
5741
5742
static void set_idle_timeout(struct drbd_connection *connection)
5743
{
5744
set_rcvtimeo(connection, 0);
5745
}
5746
5747
static struct meta_sock_cmd ack_receiver_tbl[] = {
5748
[P_PING] = { 0, got_Ping },
5749
[P_PING_ACK] = { 0, got_PingAck },
5750
[P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5751
[P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5752
[P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5753
[P_SUPERSEDED] = { sizeof(struct p_block_ack), got_BlockAck },
5754
[P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
5755
[P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
5756
[P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply },
5757
[P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
5758
[P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
5759
[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5760
[P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
5761
[P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
5762
[P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply },
5763
[P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5764
[P_RETRY_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
5765
};
5766
5767
int drbd_ack_receiver(struct drbd_thread *thi)
5768
{
5769
struct drbd_connection *connection = thi->connection;
5770
struct meta_sock_cmd *cmd = NULL;
5771
struct packet_info pi;
5772
unsigned long pre_recv_jif;
5773
int rv;
5774
void *buf = connection->meta.rbuf;
5775
int received = 0;
5776
unsigned int header_size = drbd_header_size(connection);
5777
int expect = header_size;
5778
bool ping_timeout_active = false;
5779
5780
sched_set_fifo_low(current);
5781
5782
while (get_t_state(thi) == RUNNING) {
5783
drbd_thread_current_set_cpu(thi);
5784
5785
if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5786
if (drbd_send_ping(connection)) {
5787
drbd_err(connection, "drbd_send_ping has failed\n");
5788
goto reconnect;
5789
}
5790
set_ping_timeout(connection);
5791
ping_timeout_active = true;
5792
}
5793
5794
pre_recv_jif = jiffies;
5795
rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5796
5797
/* Note:
5798
* -EINTR (on meta) we got a signal
5799
* -EAGAIN (on meta) rcvtimeo expired
5800
* -ECONNRESET other side closed the connection
5801
* -ERESTARTSYS (on data) we got a signal
5802
* rv < 0 other than above: unexpected error!
5803
* rv == expected: full header or command
5804
* rv < expected: "woken" by signal during receive
5805
* rv == 0 : "connection shut down by peer"
5806
*/
5807
if (likely(rv > 0)) {
5808
received += rv;
5809
buf += rv;
5810
} else if (rv == 0) {
5811
if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5812
long t;
5813
rcu_read_lock();
5814
t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5815
rcu_read_unlock();
5816
5817
t = wait_event_timeout(connection->ping_wait,
5818
connection->cstate < C_WF_REPORT_PARAMS,
5819
t);
5820
if (t)
5821
break;
5822
}
5823
drbd_err(connection, "meta connection shut down by peer.\n");
5824
goto reconnect;
5825
} else if (rv == -EAGAIN) {
5826
/* If the data socket received something meanwhile,
5827
* that is good enough: peer is still alive. */
5828
if (time_after(connection->last_received, pre_recv_jif))
5829
continue;
5830
if (ping_timeout_active) {
5831
drbd_err(connection, "PingAck did not arrive in time.\n");
5832
goto reconnect;
5833
}
5834
set_bit(SEND_PING, &connection->flags);
5835
continue;
5836
} else if (rv == -EINTR) {
5837
/* maybe drbd_thread_stop(): the while condition will notice.
5838
* maybe woken for send_ping: we'll send a ping above,
5839
* and change the rcvtimeo */
5840
flush_signals(current);
5841
continue;
5842
} else {
5843
drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5844
goto reconnect;
5845
}
5846
5847
if (received == expect && cmd == NULL) {
5848
if (decode_header(connection, connection->meta.rbuf, &pi))
5849
goto reconnect;
5850
cmd = &ack_receiver_tbl[pi.cmd];
5851
if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
5852
drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5853
cmdname(pi.cmd), pi.cmd);
5854
goto disconnect;
5855
}
5856
expect = header_size + cmd->pkt_size;
5857
if (pi.size != expect - header_size) {
5858
drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5859
pi.cmd, pi.size);
5860
goto reconnect;
5861
}
5862
}
5863
if (received == expect) {
5864
bool err;
5865
5866
err = cmd->fn(connection, &pi);
5867
if (err) {
5868
drbd_err(connection, "%ps failed\n", cmd->fn);
5869
goto reconnect;
5870
}
5871
5872
connection->last_received = jiffies;
5873
5874
if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
5875
set_idle_timeout(connection);
5876
ping_timeout_active = false;
5877
}
5878
5879
buf = connection->meta.rbuf;
5880
received = 0;
5881
expect = header_size;
5882
cmd = NULL;
5883
}
5884
}
5885
5886
if (0) {
5887
reconnect:
5888
conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5889
conn_md_sync(connection);
5890
}
5891
if (0) {
5892
disconnect:
5893
conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5894
}
5895
5896
drbd_info(connection, "ack_receiver terminated\n");
5897
5898
return 0;
5899
}
5900
5901
void drbd_send_acks_wf(struct work_struct *ws)
5902
{
5903
struct drbd_peer_device *peer_device =
5904
container_of(ws, struct drbd_peer_device, send_acks_work);
5905
struct drbd_connection *connection = peer_device->connection;
5906
struct drbd_device *device = peer_device->device;
5907
struct net_conf *nc;
5908
int tcp_cork, err;
5909
5910
rcu_read_lock();
5911
nc = rcu_dereference(connection->net_conf);
5912
tcp_cork = nc->tcp_cork;
5913
rcu_read_unlock();
5914
5915
if (tcp_cork)
5916
tcp_sock_set_cork(connection->meta.socket->sk, true);
5917
5918
err = drbd_finish_peer_reqs(device);
5919
kref_put(&device->kref, drbd_destroy_device);
5920
/* get is in drbd_endio_write_sec_final(). That is necessary to keep the
5921
struct work_struct send_acks_work alive, which is in the peer_device object */
5922
5923
if (err) {
5924
conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5925
return;
5926
}
5927
5928
if (tcp_cork)
5929
tcp_sock_set_cork(connection->meta.socket->sk, false);
5930
5931
return;
5932
}
5933
5934