Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
awilliam
GitHub Repository: awilliam/linux-vfio
Path: blob/master/drivers/block/drbd/drbd_receiver.c
15180 views
1
/*
2
drbd_receiver.c
3
4
This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6
Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7
Copyright (C) 1999-2008, Philipp Reisner <[email protected]>.
8
Copyright (C) 2002-2008, Lars Ellenberg <[email protected]>.
9
10
drbd is free software; you can redistribute it and/or modify
11
it under the terms of the GNU General Public License as published by
12
the Free Software Foundation; either version 2, or (at your option)
13
any later version.
14
15
drbd is distributed in the hope that it will be useful,
16
but WITHOUT ANY WARRANTY; without even the implied warranty of
17
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18
GNU General Public License for more details.
19
20
You should have received a copy of the GNU General Public License
21
along with drbd; see the file COPYING. If not, write to
22
the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
*/
24
25
26
#include <linux/module.h>
27
28
#include <asm/uaccess.h>
29
#include <net/sock.h>
30
31
#include <linux/drbd.h>
32
#include <linux/fs.h>
33
#include <linux/file.h>
34
#include <linux/in.h>
35
#include <linux/mm.h>
36
#include <linux/memcontrol.h>
37
#include <linux/mm_inline.h>
38
#include <linux/slab.h>
39
#include <linux/pkt_sched.h>
40
#define __KERNEL_SYSCALLS__
41
#include <linux/unistd.h>
42
#include <linux/vmalloc.h>
43
#include <linux/random.h>
44
#include <linux/string.h>
45
#include <linux/scatterlist.h>
46
#include "drbd_int.h"
47
#include "drbd_req.h"
48
49
#include "drbd_vli.h"
50
51
enum finish_epoch {
52
FE_STILL_LIVE,
53
FE_DESTROYED,
54
FE_RECYCLED,
55
};
56
57
static int drbd_do_handshake(struct drbd_conf *mdev);
58
static int drbd_do_auth(struct drbd_conf *mdev);
59
60
static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
61
static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
62
63
64
#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
65
66
/*
67
* some helper functions to deal with single linked page lists,
68
* page->private being our "next" pointer.
69
*/
70
71
/* If at least n pages are linked at head, get n pages off.
72
* Otherwise, don't modify head, and return NULL.
73
* Locking is the responsibility of the caller.
74
*/
75
static struct page *page_chain_del(struct page **head, int n)
76
{
77
struct page *page;
78
struct page *tmp;
79
80
BUG_ON(!n);
81
BUG_ON(!head);
82
83
page = *head;
84
85
if (!page)
86
return NULL;
87
88
while (page) {
89
tmp = page_chain_next(page);
90
if (--n == 0)
91
break; /* found sufficient pages */
92
if (tmp == NULL)
93
/* insufficient pages, don't use any of them. */
94
return NULL;
95
page = tmp;
96
}
97
98
/* add end of list marker for the returned list */
99
set_page_private(page, 0);
100
/* actual return value, and adjustment of head */
101
page = *head;
102
*head = tmp;
103
return page;
104
}
105
106
/* may be used outside of locks to find the tail of a (usually short)
107
* "private" page chain, before adding it back to a global chain head
108
* with page_chain_add() under a spinlock. */
109
static struct page *page_chain_tail(struct page *page, int *len)
110
{
111
struct page *tmp;
112
int i = 1;
113
while ((tmp = page_chain_next(page)))
114
++i, page = tmp;
115
if (len)
116
*len = i;
117
return page;
118
}
119
120
static int page_chain_free(struct page *page)
121
{
122
struct page *tmp;
123
int i = 0;
124
page_chain_for_each_safe(page, tmp) {
125
put_page(page);
126
++i;
127
}
128
return i;
129
}
130
131
static void page_chain_add(struct page **head,
132
struct page *chain_first, struct page *chain_last)
133
{
134
#if 1
135
struct page *tmp;
136
tmp = page_chain_tail(chain_first, NULL);
137
BUG_ON(tmp != chain_last);
138
#endif
139
140
/* add chain to head */
141
set_page_private(chain_last, (unsigned long)*head);
142
*head = chain_first;
143
}
144
145
static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
146
{
147
struct page *page = NULL;
148
struct page *tmp = NULL;
149
int i = 0;
150
151
/* Yes, testing drbd_pp_vacant outside the lock is racy.
152
* So what. It saves a spin_lock. */
153
if (drbd_pp_vacant >= number) {
154
spin_lock(&drbd_pp_lock);
155
page = page_chain_del(&drbd_pp_pool, number);
156
if (page)
157
drbd_pp_vacant -= number;
158
spin_unlock(&drbd_pp_lock);
159
if (page)
160
return page;
161
}
162
163
/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
164
* "criss-cross" setup, that might cause write-out on some other DRBD,
165
* which in turn might block on the other node at this very place. */
166
for (i = 0; i < number; i++) {
167
tmp = alloc_page(GFP_TRY);
168
if (!tmp)
169
break;
170
set_page_private(tmp, (unsigned long)page);
171
page = tmp;
172
}
173
174
if (i == number)
175
return page;
176
177
/* Not enough pages immediately available this time.
178
* No need to jump around here, drbd_pp_alloc will retry this
179
* function "soon". */
180
if (page) {
181
tmp = page_chain_tail(page, NULL);
182
spin_lock(&drbd_pp_lock);
183
page_chain_add(&drbd_pp_pool, page, tmp);
184
drbd_pp_vacant += i;
185
spin_unlock(&drbd_pp_lock);
186
}
187
return NULL;
188
}
189
190
static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
191
{
192
struct drbd_epoch_entry *e;
193
struct list_head *le, *tle;
194
195
/* The EEs are always appended to the end of the list. Since
196
they are sent in order over the wire, they have to finish
197
in order. As soon as we see the first not finished we can
198
stop to examine the list... */
199
200
list_for_each_safe(le, tle, &mdev->net_ee) {
201
e = list_entry(le, struct drbd_epoch_entry, w.list);
202
if (drbd_ee_has_active_page(e))
203
break;
204
list_move(le, to_be_freed);
205
}
206
}
207
208
static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
209
{
210
LIST_HEAD(reclaimed);
211
struct drbd_epoch_entry *e, *t;
212
213
spin_lock_irq(&mdev->req_lock);
214
reclaim_net_ee(mdev, &reclaimed);
215
spin_unlock_irq(&mdev->req_lock);
216
217
list_for_each_entry_safe(e, t, &reclaimed, w.list)
218
drbd_free_net_ee(mdev, e);
219
}
220
221
/**
222
* drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
223
* @mdev: DRBD device.
224
* @number: number of pages requested
225
* @retry: whether to retry, if not enough pages are available right now
226
*
227
* Tries to allocate number pages, first from our own page pool, then from
228
* the kernel, unless this allocation would exceed the max_buffers setting.
229
* Possibly retry until DRBD frees sufficient pages somewhere else.
230
*
231
* Returns a page chain linked via page->private.
232
*/
233
static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
234
{
235
struct page *page = NULL;
236
DEFINE_WAIT(wait);
237
238
/* Yes, we may run up to @number over max_buffers. If we
239
* follow it strictly, the admin will get it wrong anyways. */
240
if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
241
page = drbd_pp_first_pages_or_try_alloc(mdev, number);
242
243
while (page == NULL) {
244
prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
245
246
drbd_kick_lo_and_reclaim_net(mdev);
247
248
if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
249
page = drbd_pp_first_pages_or_try_alloc(mdev, number);
250
if (page)
251
break;
252
}
253
254
if (!retry)
255
break;
256
257
if (signal_pending(current)) {
258
dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
259
break;
260
}
261
262
schedule();
263
}
264
finish_wait(&drbd_pp_wait, &wait);
265
266
if (page)
267
atomic_add(number, &mdev->pp_in_use);
268
return page;
269
}
270
271
/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
272
* Is also used from inside an other spin_lock_irq(&mdev->req_lock);
273
* Either links the page chain back to the global pool,
274
* or returns all pages to the system. */
275
static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
276
{
277
atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
278
int i;
279
280
if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE)*minor_count)
281
i = page_chain_free(page);
282
else {
283
struct page *tmp;
284
tmp = page_chain_tail(page, &i);
285
spin_lock(&drbd_pp_lock);
286
page_chain_add(&drbd_pp_pool, page, tmp);
287
drbd_pp_vacant += i;
288
spin_unlock(&drbd_pp_lock);
289
}
290
i = atomic_sub_return(i, a);
291
if (i < 0)
292
dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
293
is_net ? "pp_in_use_by_net" : "pp_in_use", i);
294
wake_up(&drbd_pp_wait);
295
}
296
297
/*
298
You need to hold the req_lock:
299
_drbd_wait_ee_list_empty()
300
301
You must not have the req_lock:
302
drbd_free_ee()
303
drbd_alloc_ee()
304
drbd_init_ee()
305
drbd_release_ee()
306
drbd_ee_fix_bhs()
307
drbd_process_done_ee()
308
drbd_clear_done_ee()
309
drbd_wait_ee_list_empty()
310
*/
311
312
struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
313
u64 id,
314
sector_t sector,
315
unsigned int data_size,
316
gfp_t gfp_mask) __must_hold(local)
317
{
318
struct drbd_epoch_entry *e;
319
struct page *page;
320
unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
321
322
if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
323
return NULL;
324
325
e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
326
if (!e) {
327
if (!(gfp_mask & __GFP_NOWARN))
328
dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
329
return NULL;
330
}
331
332
page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
333
if (!page)
334
goto fail;
335
336
INIT_HLIST_NODE(&e->collision);
337
e->epoch = NULL;
338
e->mdev = mdev;
339
e->pages = page;
340
atomic_set(&e->pending_bios, 0);
341
e->size = data_size;
342
e->flags = 0;
343
e->sector = sector;
344
e->block_id = id;
345
346
return e;
347
348
fail:
349
mempool_free(e, drbd_ee_mempool);
350
return NULL;
351
}
352
353
void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e, int is_net)
354
{
355
if (e->flags & EE_HAS_DIGEST)
356
kfree(e->digest);
357
drbd_pp_free(mdev, e->pages, is_net);
358
D_ASSERT(atomic_read(&e->pending_bios) == 0);
359
D_ASSERT(hlist_unhashed(&e->collision));
360
mempool_free(e, drbd_ee_mempool);
361
}
362
363
int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
364
{
365
LIST_HEAD(work_list);
366
struct drbd_epoch_entry *e, *t;
367
int count = 0;
368
int is_net = list == &mdev->net_ee;
369
370
spin_lock_irq(&mdev->req_lock);
371
list_splice_init(list, &work_list);
372
spin_unlock_irq(&mdev->req_lock);
373
374
list_for_each_entry_safe(e, t, &work_list, w.list) {
375
drbd_free_some_ee(mdev, e, is_net);
376
count++;
377
}
378
return count;
379
}
380
381
382
/*
383
* This function is called from _asender only_
384
* but see also comments in _req_mod(,barrier_acked)
385
* and receive_Barrier.
386
*
387
* Move entries from net_ee to done_ee, if ready.
388
* Grab done_ee, call all callbacks, free the entries.
389
* The callbacks typically send out ACKs.
390
*/
391
static int drbd_process_done_ee(struct drbd_conf *mdev)
392
{
393
LIST_HEAD(work_list);
394
LIST_HEAD(reclaimed);
395
struct drbd_epoch_entry *e, *t;
396
int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
397
398
spin_lock_irq(&mdev->req_lock);
399
reclaim_net_ee(mdev, &reclaimed);
400
list_splice_init(&mdev->done_ee, &work_list);
401
spin_unlock_irq(&mdev->req_lock);
402
403
list_for_each_entry_safe(e, t, &reclaimed, w.list)
404
drbd_free_net_ee(mdev, e);
405
406
/* possible callbacks here:
407
* e_end_block, and e_end_resync_block, e_send_discard_ack.
408
* all ignore the last argument.
409
*/
410
list_for_each_entry_safe(e, t, &work_list, w.list) {
411
/* list_del not necessary, next/prev members not touched */
412
ok = e->w.cb(mdev, &e->w, !ok) && ok;
413
drbd_free_ee(mdev, e);
414
}
415
wake_up(&mdev->ee_wait);
416
417
return ok;
418
}
419
420
void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
421
{
422
DEFINE_WAIT(wait);
423
424
/* avoids spin_lock/unlock
425
* and calling prepare_to_wait in the fast path */
426
while (!list_empty(head)) {
427
prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
428
spin_unlock_irq(&mdev->req_lock);
429
io_schedule();
430
finish_wait(&mdev->ee_wait, &wait);
431
spin_lock_irq(&mdev->req_lock);
432
}
433
}
434
435
void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
436
{
437
spin_lock_irq(&mdev->req_lock);
438
_drbd_wait_ee_list_empty(mdev, head);
439
spin_unlock_irq(&mdev->req_lock);
440
}
441
442
/* see also kernel_accept; which is only present since 2.6.18.
443
* also we want to log which part of it failed, exactly */
444
static int drbd_accept(struct drbd_conf *mdev, const char **what,
445
struct socket *sock, struct socket **newsock)
446
{
447
struct sock *sk = sock->sk;
448
int err = 0;
449
450
*what = "listen";
451
err = sock->ops->listen(sock, 5);
452
if (err < 0)
453
goto out;
454
455
*what = "sock_create_lite";
456
err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
457
newsock);
458
if (err < 0)
459
goto out;
460
461
*what = "accept";
462
err = sock->ops->accept(sock, *newsock, 0);
463
if (err < 0) {
464
sock_release(*newsock);
465
*newsock = NULL;
466
goto out;
467
}
468
(*newsock)->ops = sock->ops;
469
470
out:
471
return err;
472
}
473
474
static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
475
void *buf, size_t size, int flags)
476
{
477
mm_segment_t oldfs;
478
struct kvec iov = {
479
.iov_base = buf,
480
.iov_len = size,
481
};
482
struct msghdr msg = {
483
.msg_iovlen = 1,
484
.msg_iov = (struct iovec *)&iov,
485
.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
486
};
487
int rv;
488
489
oldfs = get_fs();
490
set_fs(KERNEL_DS);
491
rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
492
set_fs(oldfs);
493
494
return rv;
495
}
496
497
static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
498
{
499
mm_segment_t oldfs;
500
struct kvec iov = {
501
.iov_base = buf,
502
.iov_len = size,
503
};
504
struct msghdr msg = {
505
.msg_iovlen = 1,
506
.msg_iov = (struct iovec *)&iov,
507
.msg_flags = MSG_WAITALL | MSG_NOSIGNAL
508
};
509
int rv;
510
511
oldfs = get_fs();
512
set_fs(KERNEL_DS);
513
514
for (;;) {
515
rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
516
if (rv == size)
517
break;
518
519
/* Note:
520
* ECONNRESET other side closed the connection
521
* ERESTARTSYS (on sock) we got a signal
522
*/
523
524
if (rv < 0) {
525
if (rv == -ECONNRESET)
526
dev_info(DEV, "sock was reset by peer\n");
527
else if (rv != -ERESTARTSYS)
528
dev_err(DEV, "sock_recvmsg returned %d\n", rv);
529
break;
530
} else if (rv == 0) {
531
dev_info(DEV, "sock was shut down by peer\n");
532
break;
533
} else {
534
/* signal came in, or peer/link went down,
535
* after we read a partial message
536
*/
537
/* D_ASSERT(signal_pending(current)); */
538
break;
539
}
540
};
541
542
set_fs(oldfs);
543
544
if (rv != size)
545
drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
546
547
return rv;
548
}
549
550
/* quoting tcp(7):
551
* On individual connections, the socket buffer size must be set prior to the
552
* listen(2) or connect(2) calls in order to have it take effect.
553
* This is our wrapper to do so.
554
*/
555
static void drbd_setbufsize(struct socket *sock, unsigned int snd,
556
unsigned int rcv)
557
{
558
/* open coded SO_SNDBUF, SO_RCVBUF */
559
if (snd) {
560
sock->sk->sk_sndbuf = snd;
561
sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
562
}
563
if (rcv) {
564
sock->sk->sk_rcvbuf = rcv;
565
sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
566
}
567
}
568
569
static struct socket *drbd_try_connect(struct drbd_conf *mdev)
570
{
571
const char *what;
572
struct socket *sock;
573
struct sockaddr_in6 src_in6;
574
int err;
575
int disconnect_on_error = 1;
576
577
if (!get_net_conf(mdev))
578
return NULL;
579
580
what = "sock_create_kern";
581
err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
582
SOCK_STREAM, IPPROTO_TCP, &sock);
583
if (err < 0) {
584
sock = NULL;
585
goto out;
586
}
587
588
sock->sk->sk_rcvtimeo =
589
sock->sk->sk_sndtimeo = mdev->net_conf->try_connect_int*HZ;
590
drbd_setbufsize(sock, mdev->net_conf->sndbuf_size,
591
mdev->net_conf->rcvbuf_size);
592
593
/* explicitly bind to the configured IP as source IP
594
* for the outgoing connections.
595
* This is needed for multihomed hosts and to be
596
* able to use lo: interfaces for drbd.
597
* Make sure to use 0 as port number, so linux selects
598
* a free one dynamically.
599
*/
600
memcpy(&src_in6, mdev->net_conf->my_addr,
601
min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
602
if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
603
src_in6.sin6_port = 0;
604
else
605
((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
606
607
what = "bind before connect";
608
err = sock->ops->bind(sock,
609
(struct sockaddr *) &src_in6,
610
mdev->net_conf->my_addr_len);
611
if (err < 0)
612
goto out;
613
614
/* connect may fail, peer not yet available.
615
* stay C_WF_CONNECTION, don't go Disconnecting! */
616
disconnect_on_error = 0;
617
what = "connect";
618
err = sock->ops->connect(sock,
619
(struct sockaddr *)mdev->net_conf->peer_addr,
620
mdev->net_conf->peer_addr_len, 0);
621
622
out:
623
if (err < 0) {
624
if (sock) {
625
sock_release(sock);
626
sock = NULL;
627
}
628
switch (-err) {
629
/* timeout, busy, signal pending */
630
case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
631
case EINTR: case ERESTARTSYS:
632
/* peer not (yet) available, network problem */
633
case ECONNREFUSED: case ENETUNREACH:
634
case EHOSTDOWN: case EHOSTUNREACH:
635
disconnect_on_error = 0;
636
break;
637
default:
638
dev_err(DEV, "%s failed, err = %d\n", what, err);
639
}
640
if (disconnect_on_error)
641
drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
642
}
643
put_net_conf(mdev);
644
return sock;
645
}
646
647
static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
648
{
649
int timeo, err;
650
struct socket *s_estab = NULL, *s_listen;
651
const char *what;
652
653
if (!get_net_conf(mdev))
654
return NULL;
655
656
what = "sock_create_kern";
657
err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
658
SOCK_STREAM, IPPROTO_TCP, &s_listen);
659
if (err) {
660
s_listen = NULL;
661
goto out;
662
}
663
664
timeo = mdev->net_conf->try_connect_int * HZ;
665
timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
666
667
s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
668
s_listen->sk->sk_rcvtimeo = timeo;
669
s_listen->sk->sk_sndtimeo = timeo;
670
drbd_setbufsize(s_listen, mdev->net_conf->sndbuf_size,
671
mdev->net_conf->rcvbuf_size);
672
673
what = "bind before listen";
674
err = s_listen->ops->bind(s_listen,
675
(struct sockaddr *) mdev->net_conf->my_addr,
676
mdev->net_conf->my_addr_len);
677
if (err < 0)
678
goto out;
679
680
err = drbd_accept(mdev, &what, s_listen, &s_estab);
681
682
out:
683
if (s_listen)
684
sock_release(s_listen);
685
if (err < 0) {
686
if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
687
dev_err(DEV, "%s failed, err = %d\n", what, err);
688
drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
689
}
690
}
691
put_net_conf(mdev);
692
693
return s_estab;
694
}
695
696
static int drbd_send_fp(struct drbd_conf *mdev,
697
struct socket *sock, enum drbd_packets cmd)
698
{
699
struct p_header80 *h = &mdev->data.sbuf.header.h80;
700
701
return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
702
}
703
704
static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
705
{
706
struct p_header80 *h = &mdev->data.rbuf.header.h80;
707
int rr;
708
709
rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
710
711
if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
712
return be16_to_cpu(h->command);
713
714
return 0xffff;
715
}
716
717
/**
718
* drbd_socket_okay() - Free the socket if its connection is not okay
719
* @mdev: DRBD device.
720
* @sock: pointer to the pointer to the socket.
721
*/
722
static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
723
{
724
int rr;
725
char tb[4];
726
727
if (!*sock)
728
return false;
729
730
rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
731
732
if (rr > 0 || rr == -EAGAIN) {
733
return true;
734
} else {
735
sock_release(*sock);
736
*sock = NULL;
737
return false;
738
}
739
}
740
741
/*
742
* return values:
743
* 1 yes, we have a valid connection
744
* 0 oops, did not work out, please try again
745
* -1 peer talks different language,
746
* no point in trying again, please go standalone.
747
* -2 We do not have a network config...
748
*/
749
static int drbd_connect(struct drbd_conf *mdev)
750
{
751
struct socket *s, *sock, *msock;
752
int try, h, ok;
753
754
D_ASSERT(!mdev->data.socket);
755
756
if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
757
return -2;
758
759
clear_bit(DISCARD_CONCURRENT, &mdev->flags);
760
761
sock = NULL;
762
msock = NULL;
763
764
do {
765
for (try = 0;;) {
766
/* 3 tries, this should take less than a second! */
767
s = drbd_try_connect(mdev);
768
if (s || ++try >= 3)
769
break;
770
/* give the other side time to call bind() & listen() */
771
schedule_timeout_interruptible(HZ / 10);
772
}
773
774
if (s) {
775
if (!sock) {
776
drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
777
sock = s;
778
s = NULL;
779
} else if (!msock) {
780
drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
781
msock = s;
782
s = NULL;
783
} else {
784
dev_err(DEV, "Logic error in drbd_connect()\n");
785
goto out_release_sockets;
786
}
787
}
788
789
if (sock && msock) {
790
schedule_timeout_interruptible(mdev->net_conf->ping_timeo*HZ/10);
791
ok = drbd_socket_okay(mdev, &sock);
792
ok = drbd_socket_okay(mdev, &msock) && ok;
793
if (ok)
794
break;
795
}
796
797
retry:
798
s = drbd_wait_for_connect(mdev);
799
if (s) {
800
try = drbd_recv_fp(mdev, s);
801
drbd_socket_okay(mdev, &sock);
802
drbd_socket_okay(mdev, &msock);
803
switch (try) {
804
case P_HAND_SHAKE_S:
805
if (sock) {
806
dev_warn(DEV, "initial packet S crossed\n");
807
sock_release(sock);
808
}
809
sock = s;
810
break;
811
case P_HAND_SHAKE_M:
812
if (msock) {
813
dev_warn(DEV, "initial packet M crossed\n");
814
sock_release(msock);
815
}
816
msock = s;
817
set_bit(DISCARD_CONCURRENT, &mdev->flags);
818
break;
819
default:
820
dev_warn(DEV, "Error receiving initial packet\n");
821
sock_release(s);
822
if (random32() & 1)
823
goto retry;
824
}
825
}
826
827
if (mdev->state.conn <= C_DISCONNECTING)
828
goto out_release_sockets;
829
if (signal_pending(current)) {
830
flush_signals(current);
831
smp_rmb();
832
if (get_t_state(&mdev->receiver) == Exiting)
833
goto out_release_sockets;
834
}
835
836
if (sock && msock) {
837
ok = drbd_socket_okay(mdev, &sock);
838
ok = drbd_socket_okay(mdev, &msock) && ok;
839
if (ok)
840
break;
841
}
842
} while (1);
843
844
msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
845
sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
846
847
sock->sk->sk_allocation = GFP_NOIO;
848
msock->sk->sk_allocation = GFP_NOIO;
849
850
sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
851
msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
852
853
/* NOT YET ...
854
* sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
855
* sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
856
* first set it to the P_HAND_SHAKE timeout,
857
* which we set to 4x the configured ping_timeout. */
858
sock->sk->sk_sndtimeo =
859
sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
860
861
msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
862
msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
863
864
/* we don't want delays.
865
* we use TCP_CORK where appropriate, though */
866
drbd_tcp_nodelay(sock);
867
drbd_tcp_nodelay(msock);
868
869
mdev->data.socket = sock;
870
mdev->meta.socket = msock;
871
mdev->last_received = jiffies;
872
873
D_ASSERT(mdev->asender.task == NULL);
874
875
h = drbd_do_handshake(mdev);
876
if (h <= 0)
877
return h;
878
879
if (mdev->cram_hmac_tfm) {
880
/* drbd_request_state(mdev, NS(conn, WFAuth)); */
881
switch (drbd_do_auth(mdev)) {
882
case -1:
883
dev_err(DEV, "Authentication of peer failed\n");
884
return -1;
885
case 0:
886
dev_err(DEV, "Authentication of peer failed, trying again.\n");
887
return 0;
888
}
889
}
890
891
if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
892
return 0;
893
894
sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
895
sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
896
897
atomic_set(&mdev->packet_seq, 0);
898
mdev->peer_seq = 0;
899
900
drbd_thread_start(&mdev->asender);
901
902
if (drbd_send_protocol(mdev) == -1)
903
return -1;
904
drbd_send_sync_param(mdev, &mdev->sync_conf);
905
drbd_send_sizes(mdev, 0, 0);
906
drbd_send_uuids(mdev);
907
drbd_send_state(mdev);
908
clear_bit(USE_DEGR_WFC_T, &mdev->flags);
909
clear_bit(RESIZE_PENDING, &mdev->flags);
910
mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
911
912
return 1;
913
914
out_release_sockets:
915
if (sock)
916
sock_release(sock);
917
if (msock)
918
sock_release(msock);
919
return -1;
920
}
921
922
static int drbd_recv_header(struct drbd_conf *mdev, enum drbd_packets *cmd, unsigned int *packet_size)
923
{
924
union p_header *h = &mdev->data.rbuf.header;
925
int r;
926
927
r = drbd_recv(mdev, h, sizeof(*h));
928
if (unlikely(r != sizeof(*h))) {
929
if (!signal_pending(current))
930
dev_warn(DEV, "short read expecting header on sock: r=%d\n", r);
931
return false;
932
}
933
934
if (likely(h->h80.magic == BE_DRBD_MAGIC)) {
935
*cmd = be16_to_cpu(h->h80.command);
936
*packet_size = be16_to_cpu(h->h80.length);
937
} else if (h->h95.magic == BE_DRBD_MAGIC_BIG) {
938
*cmd = be16_to_cpu(h->h95.command);
939
*packet_size = be32_to_cpu(h->h95.length);
940
} else {
941
dev_err(DEV, "magic?? on data m: 0x%08x c: %d l: %d\n",
942
be32_to_cpu(h->h80.magic),
943
be16_to_cpu(h->h80.command),
944
be16_to_cpu(h->h80.length));
945
return false;
946
}
947
mdev->last_received = jiffies;
948
949
return true;
950
}
951
952
static void drbd_flush(struct drbd_conf *mdev)
953
{
954
int rv;
955
956
if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
957
rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
958
NULL);
959
if (rv) {
960
dev_err(DEV, "local disk flush failed with status %d\n", rv);
961
/* would rather check on EOPNOTSUPP, but that is not reliable.
962
* don't try again for ANY return value != 0
963
* if (rv == -EOPNOTSUPP) */
964
drbd_bump_write_ordering(mdev, WO_drain_io);
965
}
966
put_ldev(mdev);
967
}
968
}
969
970
/**
971
* drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
972
* @mdev: DRBD device.
973
* @epoch: Epoch object.
974
* @ev: Epoch event.
975
*/
976
static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
977
struct drbd_epoch *epoch,
978
enum epoch_event ev)
979
{
980
int epoch_size;
981
struct drbd_epoch *next_epoch;
982
enum finish_epoch rv = FE_STILL_LIVE;
983
984
spin_lock(&mdev->epoch_lock);
985
do {
986
next_epoch = NULL;
987
988
epoch_size = atomic_read(&epoch->epoch_size);
989
990
switch (ev & ~EV_CLEANUP) {
991
case EV_PUT:
992
atomic_dec(&epoch->active);
993
break;
994
case EV_GOT_BARRIER_NR:
995
set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
996
break;
997
case EV_BECAME_LAST:
998
/* nothing to do*/
999
break;
1000
}
1001
1002
if (epoch_size != 0 &&
1003
atomic_read(&epoch->active) == 0 &&
1004
test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
1005
if (!(ev & EV_CLEANUP)) {
1006
spin_unlock(&mdev->epoch_lock);
1007
drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1008
spin_lock(&mdev->epoch_lock);
1009
}
1010
dec_unacked(mdev);
1011
1012
if (mdev->current_epoch != epoch) {
1013
next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1014
list_del(&epoch->list);
1015
ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1016
mdev->epochs--;
1017
kfree(epoch);
1018
1019
if (rv == FE_STILL_LIVE)
1020
rv = FE_DESTROYED;
1021
} else {
1022
epoch->flags = 0;
1023
atomic_set(&epoch->epoch_size, 0);
1024
/* atomic_set(&epoch->active, 0); is already zero */
1025
if (rv == FE_STILL_LIVE)
1026
rv = FE_RECYCLED;
1027
wake_up(&mdev->ee_wait);
1028
}
1029
}
1030
1031
if (!next_epoch)
1032
break;
1033
1034
epoch = next_epoch;
1035
} while (1);
1036
1037
spin_unlock(&mdev->epoch_lock);
1038
1039
return rv;
1040
}
1041
1042
/**
1043
* drbd_bump_write_ordering() - Fall back to an other write ordering method
1044
* @mdev: DRBD device.
1045
* @wo: Write ordering method to try.
1046
*/
1047
void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1048
{
1049
enum write_ordering_e pwo;
1050
static char *write_ordering_str[] = {
1051
[WO_none] = "none",
1052
[WO_drain_io] = "drain",
1053
[WO_bdev_flush] = "flush",
1054
};
1055
1056
pwo = mdev->write_ordering;
1057
wo = min(pwo, wo);
1058
if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1059
wo = WO_drain_io;
1060
if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1061
wo = WO_none;
1062
mdev->write_ordering = wo;
1063
if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1064
dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1065
}
1066
1067
/**
1068
* drbd_submit_ee()
1069
* @mdev: DRBD device.
1070
* @e: epoch entry
1071
* @rw: flag field, see bio->bi_rw
1072
*
1073
* May spread the pages to multiple bios,
1074
* depending on bio_add_page restrictions.
1075
*
1076
* Returns 0 if all bios have been submitted,
1077
* -ENOMEM if we could not allocate enough bios,
1078
* -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1079
* single page to an empty bio (which should never happen and likely indicates
1080
* that the lower level IO stack is in some way broken). This has been observed
1081
* on certain Xen deployments.
1082
*/
1083
/* TODO allocate from our own bio_set. */
1084
int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1085
const unsigned rw, const int fault_type)
1086
{
1087
struct bio *bios = NULL;
1088
struct bio *bio;
1089
struct page *page = e->pages;
1090
sector_t sector = e->sector;
1091
unsigned ds = e->size;
1092
unsigned n_bios = 0;
1093
unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1094
int err = -ENOMEM;
1095
1096
/* In most cases, we will only need one bio. But in case the lower
1097
* level restrictions happen to be different at this offset on this
1098
* side than those of the sending peer, we may need to submit the
1099
* request in more than one bio. */
1100
next_bio:
1101
bio = bio_alloc(GFP_NOIO, nr_pages);
1102
if (!bio) {
1103
dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1104
goto fail;
1105
}
1106
/* > e->sector, unless this is the first bio */
1107
bio->bi_sector = sector;
1108
bio->bi_bdev = mdev->ldev->backing_bdev;
1109
bio->bi_rw = rw;
1110
bio->bi_private = e;
1111
bio->bi_end_io = drbd_endio_sec;
1112
1113
bio->bi_next = bios;
1114
bios = bio;
1115
++n_bios;
1116
1117
page_chain_for_each(page) {
1118
unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1119
if (!bio_add_page(bio, page, len, 0)) {
1120
/* A single page must always be possible!
1121
* But in case it fails anyways,
1122
* we deal with it, and complain (below). */
1123
if (bio->bi_vcnt == 0) {
1124
dev_err(DEV,
1125
"bio_add_page failed for len=%u, "
1126
"bi_vcnt=0 (bi_sector=%llu)\n",
1127
len, (unsigned long long)bio->bi_sector);
1128
err = -ENOSPC;
1129
goto fail;
1130
}
1131
goto next_bio;
1132
}
1133
ds -= len;
1134
sector += len >> 9;
1135
--nr_pages;
1136
}
1137
D_ASSERT(page == NULL);
1138
D_ASSERT(ds == 0);
1139
1140
atomic_set(&e->pending_bios, n_bios);
1141
do {
1142
bio = bios;
1143
bios = bios->bi_next;
1144
bio->bi_next = NULL;
1145
1146
drbd_generic_make_request(mdev, fault_type, bio);
1147
} while (bios);
1148
return 0;
1149
1150
fail:
1151
while (bios) {
1152
bio = bios;
1153
bios = bios->bi_next;
1154
bio_put(bio);
1155
}
1156
return err;
1157
}
1158
1159
static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1160
{
1161
int rv;
1162
struct p_barrier *p = &mdev->data.rbuf.barrier;
1163
struct drbd_epoch *epoch;
1164
1165
inc_unacked(mdev);
1166
1167
mdev->current_epoch->barrier_nr = p->barrier;
1168
rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1169
1170
/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1171
* the activity log, which means it would not be resynced in case the
1172
* R_PRIMARY crashes now.
1173
* Therefore we must send the barrier_ack after the barrier request was
1174
* completed. */
1175
switch (mdev->write_ordering) {
1176
case WO_none:
1177
if (rv == FE_RECYCLED)
1178
return true;
1179
1180
/* receiver context, in the writeout path of the other node.
1181
* avoid potential distributed deadlock */
1182
epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1183
if (epoch)
1184
break;
1185
else
1186
dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1187
/* Fall through */
1188
1189
case WO_bdev_flush:
1190
case WO_drain_io:
1191
drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1192
drbd_flush(mdev);
1193
1194
if (atomic_read(&mdev->current_epoch->epoch_size)) {
1195
epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1196
if (epoch)
1197
break;
1198
}
1199
1200
epoch = mdev->current_epoch;
1201
wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1202
1203
D_ASSERT(atomic_read(&epoch->active) == 0);
1204
D_ASSERT(epoch->flags == 0);
1205
1206
return true;
1207
default:
1208
dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1209
return false;
1210
}
1211
1212
epoch->flags = 0;
1213
atomic_set(&epoch->epoch_size, 0);
1214
atomic_set(&epoch->active, 0);
1215
1216
spin_lock(&mdev->epoch_lock);
1217
if (atomic_read(&mdev->current_epoch->epoch_size)) {
1218
list_add(&epoch->list, &mdev->current_epoch->list);
1219
mdev->current_epoch = epoch;
1220
mdev->epochs++;
1221
} else {
1222
/* The current_epoch got recycled while we allocated this one... */
1223
kfree(epoch);
1224
}
1225
spin_unlock(&mdev->epoch_lock);
1226
1227
return true;
1228
}
1229
1230
/* used from receive_RSDataReply (recv_resync_read)
1231
* and from receive_Data */
1232
static struct drbd_epoch_entry *
1233
read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1234
{
1235
const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1236
struct drbd_epoch_entry *e;
1237
struct page *page;
1238
int dgs, ds, rr;
1239
void *dig_in = mdev->int_dig_in;
1240
void *dig_vv = mdev->int_dig_vv;
1241
unsigned long *data;
1242
1243
dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1244
crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1245
1246
if (dgs) {
1247
rr = drbd_recv(mdev, dig_in, dgs);
1248
if (rr != dgs) {
1249
if (!signal_pending(current))
1250
dev_warn(DEV,
1251
"short read receiving data digest: read %d expected %d\n",
1252
rr, dgs);
1253
return NULL;
1254
}
1255
}
1256
1257
data_size -= dgs;
1258
1259
ERR_IF(data_size == 0) return NULL;
1260
ERR_IF(data_size & 0x1ff) return NULL;
1261
ERR_IF(data_size > DRBD_MAX_BIO_SIZE) return NULL;
1262
1263
/* even though we trust out peer,
1264
* we sometimes have to double check. */
1265
if (sector + (data_size>>9) > capacity) {
1266
dev_err(DEV, "request from peer beyond end of local disk: "
1267
"capacity: %llus < sector: %llus + size: %u\n",
1268
(unsigned long long)capacity,
1269
(unsigned long long)sector, data_size);
1270
return NULL;
1271
}
1272
1273
/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1274
* "criss-cross" setup, that might cause write-out on some other DRBD,
1275
* which in turn might block on the other node at this very place. */
1276
e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1277
if (!e)
1278
return NULL;
1279
1280
ds = data_size;
1281
page = e->pages;
1282
page_chain_for_each(page) {
1283
unsigned len = min_t(int, ds, PAGE_SIZE);
1284
data = kmap(page);
1285
rr = drbd_recv(mdev, data, len);
1286
if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1287
dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1288
data[0] = data[0] ^ (unsigned long)-1;
1289
}
1290
kunmap(page);
1291
if (rr != len) {
1292
drbd_free_ee(mdev, e);
1293
if (!signal_pending(current))
1294
dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1295
rr, len);
1296
return NULL;
1297
}
1298
ds -= rr;
1299
}
1300
1301
if (dgs) {
1302
drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
1303
if (memcmp(dig_in, dig_vv, dgs)) {
1304
dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1305
(unsigned long long)sector, data_size);
1306
drbd_bcast_ee(mdev, "digest failed",
1307
dgs, dig_in, dig_vv, e);
1308
drbd_free_ee(mdev, e);
1309
return NULL;
1310
}
1311
}
1312
mdev->recv_cnt += data_size>>9;
1313
return e;
1314
}
1315
1316
/* drbd_drain_block() just takes a data block
1317
* out of the socket input buffer, and discards it.
1318
*/
1319
static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1320
{
1321
struct page *page;
1322
int rr, rv = 1;
1323
void *data;
1324
1325
if (!data_size)
1326
return true;
1327
1328
page = drbd_pp_alloc(mdev, 1, 1);
1329
1330
data = kmap(page);
1331
while (data_size) {
1332
rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1333
if (rr != min_t(int, data_size, PAGE_SIZE)) {
1334
rv = 0;
1335
if (!signal_pending(current))
1336
dev_warn(DEV,
1337
"short read receiving data: read %d expected %d\n",
1338
rr, min_t(int, data_size, PAGE_SIZE));
1339
break;
1340
}
1341
data_size -= rr;
1342
}
1343
kunmap(page);
1344
drbd_pp_free(mdev, page, 0);
1345
return rv;
1346
}
1347
1348
static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1349
sector_t sector, int data_size)
1350
{
1351
struct bio_vec *bvec;
1352
struct bio *bio;
1353
int dgs, rr, i, expect;
1354
void *dig_in = mdev->int_dig_in;
1355
void *dig_vv = mdev->int_dig_vv;
1356
1357
dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1358
crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1359
1360
if (dgs) {
1361
rr = drbd_recv(mdev, dig_in, dgs);
1362
if (rr != dgs) {
1363
if (!signal_pending(current))
1364
dev_warn(DEV,
1365
"short read receiving data reply digest: read %d expected %d\n",
1366
rr, dgs);
1367
return 0;
1368
}
1369
}
1370
1371
data_size -= dgs;
1372
1373
/* optimistically update recv_cnt. if receiving fails below,
1374
* we disconnect anyways, and counters will be reset. */
1375
mdev->recv_cnt += data_size>>9;
1376
1377
bio = req->master_bio;
1378
D_ASSERT(sector == bio->bi_sector);
1379
1380
bio_for_each_segment(bvec, bio, i) {
1381
expect = min_t(int, data_size, bvec->bv_len);
1382
rr = drbd_recv(mdev,
1383
kmap(bvec->bv_page)+bvec->bv_offset,
1384
expect);
1385
kunmap(bvec->bv_page);
1386
if (rr != expect) {
1387
if (!signal_pending(current))
1388
dev_warn(DEV, "short read receiving data reply: "
1389
"read %d expected %d\n",
1390
rr, expect);
1391
return 0;
1392
}
1393
data_size -= rr;
1394
}
1395
1396
if (dgs) {
1397
drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1398
if (memcmp(dig_in, dig_vv, dgs)) {
1399
dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1400
return 0;
1401
}
1402
}
1403
1404
D_ASSERT(data_size == 0);
1405
return 1;
1406
}
1407
1408
/* e_end_resync_block() is called via
1409
* drbd_process_done_ee() by asender only */
1410
static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1411
{
1412
struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1413
sector_t sector = e->sector;
1414
int ok;
1415
1416
D_ASSERT(hlist_unhashed(&e->collision));
1417
1418
if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1419
drbd_set_in_sync(mdev, sector, e->size);
1420
ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1421
} else {
1422
/* Record failure to sync */
1423
drbd_rs_failed_io(mdev, sector, e->size);
1424
1425
ok = drbd_send_ack(mdev, P_NEG_ACK, e);
1426
}
1427
dec_unacked(mdev);
1428
1429
return ok;
1430
}
1431
1432
static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1433
{
1434
struct drbd_epoch_entry *e;
1435
1436
e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1437
if (!e)
1438
goto fail;
1439
1440
dec_rs_pending(mdev);
1441
1442
inc_unacked(mdev);
1443
/* corresponding dec_unacked() in e_end_resync_block()
1444
* respective _drbd_clear_done_ee */
1445
1446
e->w.cb = e_end_resync_block;
1447
1448
spin_lock_irq(&mdev->req_lock);
1449
list_add(&e->w.list, &mdev->sync_ee);
1450
spin_unlock_irq(&mdev->req_lock);
1451
1452
atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1453
if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
1454
return true;
1455
1456
/* don't care for the reason here */
1457
dev_err(DEV, "submit failed, triggering re-connect\n");
1458
spin_lock_irq(&mdev->req_lock);
1459
list_del(&e->w.list);
1460
spin_unlock_irq(&mdev->req_lock);
1461
1462
drbd_free_ee(mdev, e);
1463
fail:
1464
put_ldev(mdev);
1465
return false;
1466
}
1467
1468
static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1469
{
1470
struct drbd_request *req;
1471
sector_t sector;
1472
int ok;
1473
struct p_data *p = &mdev->data.rbuf.data;
1474
1475
sector = be64_to_cpu(p->sector);
1476
1477
spin_lock_irq(&mdev->req_lock);
1478
req = _ar_id_to_req(mdev, p->block_id, sector);
1479
spin_unlock_irq(&mdev->req_lock);
1480
if (unlikely(!req)) {
1481
dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1482
return false;
1483
}
1484
1485
/* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1486
* special casing it there for the various failure cases.
1487
* still no race with drbd_fail_pending_reads */
1488
ok = recv_dless_read(mdev, req, sector, data_size);
1489
1490
if (ok)
1491
req_mod(req, data_received);
1492
/* else: nothing. handled from drbd_disconnect...
1493
* I don't think we may complete this just yet
1494
* in case we are "on-disconnect: freeze" */
1495
1496
return ok;
1497
}
1498
1499
static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1500
{
1501
sector_t sector;
1502
int ok;
1503
struct p_data *p = &mdev->data.rbuf.data;
1504
1505
sector = be64_to_cpu(p->sector);
1506
D_ASSERT(p->block_id == ID_SYNCER);
1507
1508
if (get_ldev(mdev)) {
1509
/* data is submitted to disk within recv_resync_read.
1510
* corresponding put_ldev done below on error,
1511
* or in drbd_endio_write_sec. */
1512
ok = recv_resync_read(mdev, sector, data_size);
1513
} else {
1514
if (__ratelimit(&drbd_ratelimit_state))
1515
dev_err(DEV, "Can not write resync data to local disk.\n");
1516
1517
ok = drbd_drain_block(mdev, data_size);
1518
1519
drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1520
}
1521
1522
atomic_add(data_size >> 9, &mdev->rs_sect_in);
1523
1524
return ok;
1525
}
1526
1527
/* e_end_block() is called via drbd_process_done_ee().
1528
* this means this function only runs in the asender thread
1529
*/
1530
static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1531
{
1532
struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1533
sector_t sector = e->sector;
1534
int ok = 1, pcmd;
1535
1536
if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1537
if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1538
pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1539
mdev->state.conn <= C_PAUSED_SYNC_T &&
1540
e->flags & EE_MAY_SET_IN_SYNC) ?
1541
P_RS_WRITE_ACK : P_WRITE_ACK;
1542
ok &= drbd_send_ack(mdev, pcmd, e);
1543
if (pcmd == P_RS_WRITE_ACK)
1544
drbd_set_in_sync(mdev, sector, e->size);
1545
} else {
1546
ok = drbd_send_ack(mdev, P_NEG_ACK, e);
1547
/* we expect it to be marked out of sync anyways...
1548
* maybe assert this? */
1549
}
1550
dec_unacked(mdev);
1551
}
1552
/* we delete from the conflict detection hash _after_ we sent out the
1553
* P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
1554
if (mdev->net_conf->two_primaries) {
1555
spin_lock_irq(&mdev->req_lock);
1556
D_ASSERT(!hlist_unhashed(&e->collision));
1557
hlist_del_init(&e->collision);
1558
spin_unlock_irq(&mdev->req_lock);
1559
} else {
1560
D_ASSERT(hlist_unhashed(&e->collision));
1561
}
1562
1563
drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1564
1565
return ok;
1566
}
1567
1568
static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1569
{
1570
struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1571
int ok = 1;
1572
1573
D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1574
ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1575
1576
spin_lock_irq(&mdev->req_lock);
1577
D_ASSERT(!hlist_unhashed(&e->collision));
1578
hlist_del_init(&e->collision);
1579
spin_unlock_irq(&mdev->req_lock);
1580
1581
dec_unacked(mdev);
1582
1583
return ok;
1584
}
1585
1586
/* Called from receive_Data.
1587
* Synchronize packets on sock with packets on msock.
1588
*
1589
* This is here so even when a P_DATA packet traveling via sock overtook an Ack
1590
* packet traveling on msock, they are still processed in the order they have
1591
* been sent.
1592
*
1593
* Note: we don't care for Ack packets overtaking P_DATA packets.
1594
*
1595
* In case packet_seq is larger than mdev->peer_seq number, there are
1596
* outstanding packets on the msock. We wait for them to arrive.
1597
* In case we are the logically next packet, we update mdev->peer_seq
1598
* ourselves. Correctly handles 32bit wrap around.
1599
*
1600
* Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1601
* about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1602
* for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1603
* 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1604
*
1605
* returns 0 if we may process the packet,
1606
* -ERESTARTSYS if we were interrupted (by disconnect signal). */
1607
static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1608
{
1609
DEFINE_WAIT(wait);
1610
unsigned int p_seq;
1611
long timeout;
1612
int ret = 0;
1613
spin_lock(&mdev->peer_seq_lock);
1614
for (;;) {
1615
prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1616
if (seq_le(packet_seq, mdev->peer_seq+1))
1617
break;
1618
if (signal_pending(current)) {
1619
ret = -ERESTARTSYS;
1620
break;
1621
}
1622
p_seq = mdev->peer_seq;
1623
spin_unlock(&mdev->peer_seq_lock);
1624
timeout = schedule_timeout(30*HZ);
1625
spin_lock(&mdev->peer_seq_lock);
1626
if (timeout == 0 && p_seq == mdev->peer_seq) {
1627
ret = -ETIMEDOUT;
1628
dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1629
break;
1630
}
1631
}
1632
finish_wait(&mdev->seq_wait, &wait);
1633
if (mdev->peer_seq+1 == packet_seq)
1634
mdev->peer_seq++;
1635
spin_unlock(&mdev->peer_seq_lock);
1636
return ret;
1637
}
1638
1639
/* see also bio_flags_to_wire()
1640
* DRBD_REQ_*, because we need to semantically map the flags to data packet
1641
* flags and back. We may replicate to other kernel versions. */
1642
static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1643
{
1644
return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1645
(dpf & DP_FUA ? REQ_FUA : 0) |
1646
(dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1647
(dpf & DP_DISCARD ? REQ_DISCARD : 0);
1648
}
1649
1650
/* mirrored write */
1651
static int receive_Data(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1652
{
1653
sector_t sector;
1654
struct drbd_epoch_entry *e;
1655
struct p_data *p = &mdev->data.rbuf.data;
1656
int rw = WRITE;
1657
u32 dp_flags;
1658
1659
if (!get_ldev(mdev)) {
1660
spin_lock(&mdev->peer_seq_lock);
1661
if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1662
mdev->peer_seq++;
1663
spin_unlock(&mdev->peer_seq_lock);
1664
1665
drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1666
atomic_inc(&mdev->current_epoch->epoch_size);
1667
return drbd_drain_block(mdev, data_size);
1668
}
1669
1670
/* get_ldev(mdev) successful.
1671
* Corresponding put_ldev done either below (on various errors),
1672
* or in drbd_endio_write_sec, if we successfully submit the data at
1673
* the end of this function. */
1674
1675
sector = be64_to_cpu(p->sector);
1676
e = read_in_block(mdev, p->block_id, sector, data_size);
1677
if (!e) {
1678
put_ldev(mdev);
1679
return false;
1680
}
1681
1682
e->w.cb = e_end_block;
1683
1684
dp_flags = be32_to_cpu(p->dp_flags);
1685
rw |= wire_flags_to_bio(mdev, dp_flags);
1686
1687
if (dp_flags & DP_MAY_SET_IN_SYNC)
1688
e->flags |= EE_MAY_SET_IN_SYNC;
1689
1690
spin_lock(&mdev->epoch_lock);
1691
e->epoch = mdev->current_epoch;
1692
atomic_inc(&e->epoch->epoch_size);
1693
atomic_inc(&e->epoch->active);
1694
spin_unlock(&mdev->epoch_lock);
1695
1696
/* I'm the receiver, I do hold a net_cnt reference. */
1697
if (!mdev->net_conf->two_primaries) {
1698
spin_lock_irq(&mdev->req_lock);
1699
} else {
1700
/* don't get the req_lock yet,
1701
* we may sleep in drbd_wait_peer_seq */
1702
const int size = e->size;
1703
const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1704
DEFINE_WAIT(wait);
1705
struct drbd_request *i;
1706
struct hlist_node *n;
1707
struct hlist_head *slot;
1708
int first;
1709
1710
D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1711
BUG_ON(mdev->ee_hash == NULL);
1712
BUG_ON(mdev->tl_hash == NULL);
1713
1714
/* conflict detection and handling:
1715
* 1. wait on the sequence number,
1716
* in case this data packet overtook ACK packets.
1717
* 2. check our hash tables for conflicting requests.
1718
* we only need to walk the tl_hash, since an ee can not
1719
* have a conflict with an other ee: on the submitting
1720
* node, the corresponding req had already been conflicting,
1721
* and a conflicting req is never sent.
1722
*
1723
* Note: for two_primaries, we are protocol C,
1724
* so there cannot be any request that is DONE
1725
* but still on the transfer log.
1726
*
1727
* unconditionally add to the ee_hash.
1728
*
1729
* if no conflicting request is found:
1730
* submit.
1731
*
1732
* if any conflicting request is found
1733
* that has not yet been acked,
1734
* AND I have the "discard concurrent writes" flag:
1735
* queue (via done_ee) the P_DISCARD_ACK; OUT.
1736
*
1737
* if any conflicting request is found:
1738
* block the receiver, waiting on misc_wait
1739
* until no more conflicting requests are there,
1740
* or we get interrupted (disconnect).
1741
*
1742
* we do not just write after local io completion of those
1743
* requests, but only after req is done completely, i.e.
1744
* we wait for the P_DISCARD_ACK to arrive!
1745
*
1746
* then proceed normally, i.e. submit.
1747
*/
1748
if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1749
goto out_interrupted;
1750
1751
spin_lock_irq(&mdev->req_lock);
1752
1753
hlist_add_head(&e->collision, ee_hash_slot(mdev, sector));
1754
1755
#define OVERLAPS overlaps(i->sector, i->size, sector, size)
1756
slot = tl_hash_slot(mdev, sector);
1757
first = 1;
1758
for (;;) {
1759
int have_unacked = 0;
1760
int have_conflict = 0;
1761
prepare_to_wait(&mdev->misc_wait, &wait,
1762
TASK_INTERRUPTIBLE);
1763
hlist_for_each_entry(i, n, slot, collision) {
1764
if (OVERLAPS) {
1765
/* only ALERT on first iteration,
1766
* we may be woken up early... */
1767
if (first)
1768
dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1769
" new: %llus +%u; pending: %llus +%u\n",
1770
current->comm, current->pid,
1771
(unsigned long long)sector, size,
1772
(unsigned long long)i->sector, i->size);
1773
if (i->rq_state & RQ_NET_PENDING)
1774
++have_unacked;
1775
++have_conflict;
1776
}
1777
}
1778
#undef OVERLAPS
1779
if (!have_conflict)
1780
break;
1781
1782
/* Discard Ack only for the _first_ iteration */
1783
if (first && discard && have_unacked) {
1784
dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1785
(unsigned long long)sector);
1786
inc_unacked(mdev);
1787
e->w.cb = e_send_discard_ack;
1788
list_add_tail(&e->w.list, &mdev->done_ee);
1789
1790
spin_unlock_irq(&mdev->req_lock);
1791
1792
/* we could probably send that P_DISCARD_ACK ourselves,
1793
* but I don't like the receiver using the msock */
1794
1795
put_ldev(mdev);
1796
wake_asender(mdev);
1797
finish_wait(&mdev->misc_wait, &wait);
1798
return true;
1799
}
1800
1801
if (signal_pending(current)) {
1802
hlist_del_init(&e->collision);
1803
1804
spin_unlock_irq(&mdev->req_lock);
1805
1806
finish_wait(&mdev->misc_wait, &wait);
1807
goto out_interrupted;
1808
}
1809
1810
spin_unlock_irq(&mdev->req_lock);
1811
if (first) {
1812
first = 0;
1813
dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1814
"sec=%llus\n", (unsigned long long)sector);
1815
} else if (discard) {
1816
/* we had none on the first iteration.
1817
* there must be none now. */
1818
D_ASSERT(have_unacked == 0);
1819
}
1820
schedule();
1821
spin_lock_irq(&mdev->req_lock);
1822
}
1823
finish_wait(&mdev->misc_wait, &wait);
1824
}
1825
1826
list_add(&e->w.list, &mdev->active_ee);
1827
spin_unlock_irq(&mdev->req_lock);
1828
1829
switch (mdev->net_conf->wire_protocol) {
1830
case DRBD_PROT_C:
1831
inc_unacked(mdev);
1832
/* corresponding dec_unacked() in e_end_block()
1833
* respective _drbd_clear_done_ee */
1834
break;
1835
case DRBD_PROT_B:
1836
/* I really don't like it that the receiver thread
1837
* sends on the msock, but anyways */
1838
drbd_send_ack(mdev, P_RECV_ACK, e);
1839
break;
1840
case DRBD_PROT_A:
1841
/* nothing to do */
1842
break;
1843
}
1844
1845
if (mdev->state.pdsk < D_INCONSISTENT) {
1846
/* In case we have the only disk of the cluster, */
1847
drbd_set_out_of_sync(mdev, e->sector, e->size);
1848
e->flags |= EE_CALL_AL_COMPLETE_IO;
1849
e->flags &= ~EE_MAY_SET_IN_SYNC;
1850
drbd_al_begin_io(mdev, e->sector);
1851
}
1852
1853
if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
1854
return true;
1855
1856
/* don't care for the reason here */
1857
dev_err(DEV, "submit failed, triggering re-connect\n");
1858
spin_lock_irq(&mdev->req_lock);
1859
list_del(&e->w.list);
1860
hlist_del_init(&e->collision);
1861
spin_unlock_irq(&mdev->req_lock);
1862
if (e->flags & EE_CALL_AL_COMPLETE_IO)
1863
drbd_al_complete_io(mdev, e->sector);
1864
1865
out_interrupted:
1866
drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + EV_CLEANUP);
1867
put_ldev(mdev);
1868
drbd_free_ee(mdev, e);
1869
return false;
1870
}
1871
1872
/* We may throttle resync, if the lower device seems to be busy,
1873
* and current sync rate is above c_min_rate.
1874
*
1875
* To decide whether or not the lower device is busy, we use a scheme similar
1876
* to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
1877
* (more than 64 sectors) of activity we cannot account for with our own resync
1878
* activity, it obviously is "busy".
1879
*
1880
* The current sync rate used here uses only the most recent two step marks,
1881
* to have a short time average so we can react faster.
1882
*/
1883
int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
1884
{
1885
struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
1886
unsigned long db, dt, dbdt;
1887
struct lc_element *tmp;
1888
int curr_events;
1889
int throttle = 0;
1890
1891
/* feature disabled? */
1892
if (mdev->sync_conf.c_min_rate == 0)
1893
return 0;
1894
1895
spin_lock_irq(&mdev->al_lock);
1896
tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
1897
if (tmp) {
1898
struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
1899
if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
1900
spin_unlock_irq(&mdev->al_lock);
1901
return 0;
1902
}
1903
/* Do not slow down if app IO is already waiting for this extent */
1904
}
1905
spin_unlock_irq(&mdev->al_lock);
1906
1907
curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
1908
(int)part_stat_read(&disk->part0, sectors[1]) -
1909
atomic_read(&mdev->rs_sect_ev);
1910
1911
if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
1912
unsigned long rs_left;
1913
int i;
1914
1915
mdev->rs_last_events = curr_events;
1916
1917
/* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
1918
* approx. */
1919
i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
1920
1921
if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
1922
rs_left = mdev->ov_left;
1923
else
1924
rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
1925
1926
dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
1927
if (!dt)
1928
dt++;
1929
db = mdev->rs_mark_left[i] - rs_left;
1930
dbdt = Bit2KB(db/dt);
1931
1932
if (dbdt > mdev->sync_conf.c_min_rate)
1933
throttle = 1;
1934
}
1935
return throttle;
1936
}
1937
1938
1939
static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int digest_size)
1940
{
1941
sector_t sector;
1942
const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1943
struct drbd_epoch_entry *e;
1944
struct digest_info *di = NULL;
1945
int size, verb;
1946
unsigned int fault_type;
1947
struct p_block_req *p = &mdev->data.rbuf.block_req;
1948
1949
sector = be64_to_cpu(p->sector);
1950
size = be32_to_cpu(p->blksize);
1951
1952
if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_BIO_SIZE) {
1953
dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1954
(unsigned long long)sector, size);
1955
return false;
1956
}
1957
if (sector + (size>>9) > capacity) {
1958
dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1959
(unsigned long long)sector, size);
1960
return false;
1961
}
1962
1963
if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
1964
verb = 1;
1965
switch (cmd) {
1966
case P_DATA_REQUEST:
1967
drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
1968
break;
1969
case P_RS_DATA_REQUEST:
1970
case P_CSUM_RS_REQUEST:
1971
case P_OV_REQUEST:
1972
drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
1973
break;
1974
case P_OV_REPLY:
1975
verb = 0;
1976
dec_rs_pending(mdev);
1977
drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
1978
break;
1979
default:
1980
dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
1981
cmdname(cmd));
1982
}
1983
if (verb && __ratelimit(&drbd_ratelimit_state))
1984
dev_err(DEV, "Can not satisfy peer's read request, "
1985
"no local data.\n");
1986
1987
/* drain possibly payload */
1988
return drbd_drain_block(mdev, digest_size);
1989
}
1990
1991
/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1992
* "criss-cross" setup, that might cause write-out on some other DRBD,
1993
* which in turn might block on the other node at this very place. */
1994
e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
1995
if (!e) {
1996
put_ldev(mdev);
1997
return false;
1998
}
1999
2000
switch (cmd) {
2001
case P_DATA_REQUEST:
2002
e->w.cb = w_e_end_data_req;
2003
fault_type = DRBD_FAULT_DT_RD;
2004
/* application IO, don't drbd_rs_begin_io */
2005
goto submit;
2006
2007
case P_RS_DATA_REQUEST:
2008
e->w.cb = w_e_end_rsdata_req;
2009
fault_type = DRBD_FAULT_RS_RD;
2010
/* used in the sector offset progress display */
2011
mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2012
break;
2013
2014
case P_OV_REPLY:
2015
case P_CSUM_RS_REQUEST:
2016
fault_type = DRBD_FAULT_RS_RD;
2017
di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2018
if (!di)
2019
goto out_free_e;
2020
2021
di->digest_size = digest_size;
2022
di->digest = (((char *)di)+sizeof(struct digest_info));
2023
2024
e->digest = di;
2025
e->flags |= EE_HAS_DIGEST;
2026
2027
if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2028
goto out_free_e;
2029
2030
if (cmd == P_CSUM_RS_REQUEST) {
2031
D_ASSERT(mdev->agreed_pro_version >= 89);
2032
e->w.cb = w_e_end_csum_rs_req;
2033
/* used in the sector offset progress display */
2034
mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2035
} else if (cmd == P_OV_REPLY) {
2036
/* track progress, we may need to throttle */
2037
atomic_add(size >> 9, &mdev->rs_sect_in);
2038
e->w.cb = w_e_end_ov_reply;
2039
dec_rs_pending(mdev);
2040
/* drbd_rs_begin_io done when we sent this request,
2041
* but accounting still needs to be done. */
2042
goto submit_for_resync;
2043
}
2044
break;
2045
2046
case P_OV_REQUEST:
2047
if (mdev->ov_start_sector == ~(sector_t)0 &&
2048
mdev->agreed_pro_version >= 90) {
2049
unsigned long now = jiffies;
2050
int i;
2051
mdev->ov_start_sector = sector;
2052
mdev->ov_position = sector;
2053
mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2054
mdev->rs_total = mdev->ov_left;
2055
for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2056
mdev->rs_mark_left[i] = mdev->ov_left;
2057
mdev->rs_mark_time[i] = now;
2058
}
2059
dev_info(DEV, "Online Verify start sector: %llu\n",
2060
(unsigned long long)sector);
2061
}
2062
e->w.cb = w_e_end_ov_req;
2063
fault_type = DRBD_FAULT_RS_RD;
2064
break;
2065
2066
default:
2067
dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2068
cmdname(cmd));
2069
fault_type = DRBD_FAULT_MAX;
2070
goto out_free_e;
2071
}
2072
2073
/* Throttle, drbd_rs_begin_io and submit should become asynchronous
2074
* wrt the receiver, but it is not as straightforward as it may seem.
2075
* Various places in the resync start and stop logic assume resync
2076
* requests are processed in order, requeuing this on the worker thread
2077
* introduces a bunch of new code for synchronization between threads.
2078
*
2079
* Unlimited throttling before drbd_rs_begin_io may stall the resync
2080
* "forever", throttling after drbd_rs_begin_io will lock that extent
2081
* for application writes for the same time. For now, just throttle
2082
* here, where the rest of the code expects the receiver to sleep for
2083
* a while, anyways.
2084
*/
2085
2086
/* Throttle before drbd_rs_begin_io, as that locks out application IO;
2087
* this defers syncer requests for some time, before letting at least
2088
* on request through. The resync controller on the receiving side
2089
* will adapt to the incoming rate accordingly.
2090
*
2091
* We cannot throttle here if remote is Primary/SyncTarget:
2092
* we would also throttle its application reads.
2093
* In that case, throttling is done on the SyncTarget only.
2094
*/
2095
if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2096
schedule_timeout_uninterruptible(HZ/10);
2097
if (drbd_rs_begin_io(mdev, sector))
2098
goto out_free_e;
2099
2100
submit_for_resync:
2101
atomic_add(size >> 9, &mdev->rs_sect_ev);
2102
2103
submit:
2104
inc_unacked(mdev);
2105
spin_lock_irq(&mdev->req_lock);
2106
list_add_tail(&e->w.list, &mdev->read_ee);
2107
spin_unlock_irq(&mdev->req_lock);
2108
2109
if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
2110
return true;
2111
2112
/* don't care for the reason here */
2113
dev_err(DEV, "submit failed, triggering re-connect\n");
2114
spin_lock_irq(&mdev->req_lock);
2115
list_del(&e->w.list);
2116
spin_unlock_irq(&mdev->req_lock);
2117
/* no drbd_rs_complete_io(), we are dropping the connection anyways */
2118
2119
out_free_e:
2120
put_ldev(mdev);
2121
drbd_free_ee(mdev, e);
2122
return false;
2123
}
2124
2125
static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2126
{
2127
int self, peer, rv = -100;
2128
unsigned long ch_self, ch_peer;
2129
2130
self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2131
peer = mdev->p_uuid[UI_BITMAP] & 1;
2132
2133
ch_peer = mdev->p_uuid[UI_SIZE];
2134
ch_self = mdev->comm_bm_set;
2135
2136
switch (mdev->net_conf->after_sb_0p) {
2137
case ASB_CONSENSUS:
2138
case ASB_DISCARD_SECONDARY:
2139
case ASB_CALL_HELPER:
2140
dev_err(DEV, "Configuration error.\n");
2141
break;
2142
case ASB_DISCONNECT:
2143
break;
2144
case ASB_DISCARD_YOUNGER_PRI:
2145
if (self == 0 && peer == 1) {
2146
rv = -1;
2147
break;
2148
}
2149
if (self == 1 && peer == 0) {
2150
rv = 1;
2151
break;
2152
}
2153
/* Else fall through to one of the other strategies... */
2154
case ASB_DISCARD_OLDER_PRI:
2155
if (self == 0 && peer == 1) {
2156
rv = 1;
2157
break;
2158
}
2159
if (self == 1 && peer == 0) {
2160
rv = -1;
2161
break;
2162
}
2163
/* Else fall through to one of the other strategies... */
2164
dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2165
"Using discard-least-changes instead\n");
2166
case ASB_DISCARD_ZERO_CHG:
2167
if (ch_peer == 0 && ch_self == 0) {
2168
rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2169
? -1 : 1;
2170
break;
2171
} else {
2172
if (ch_peer == 0) { rv = 1; break; }
2173
if (ch_self == 0) { rv = -1; break; }
2174
}
2175
if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2176
break;
2177
case ASB_DISCARD_LEAST_CHG:
2178
if (ch_self < ch_peer)
2179
rv = -1;
2180
else if (ch_self > ch_peer)
2181
rv = 1;
2182
else /* ( ch_self == ch_peer ) */
2183
/* Well, then use something else. */
2184
rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2185
? -1 : 1;
2186
break;
2187
case ASB_DISCARD_LOCAL:
2188
rv = -1;
2189
break;
2190
case ASB_DISCARD_REMOTE:
2191
rv = 1;
2192
}
2193
2194
return rv;
2195
}
2196
2197
static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2198
{
2199
int hg, rv = -100;
2200
2201
switch (mdev->net_conf->after_sb_1p) {
2202
case ASB_DISCARD_YOUNGER_PRI:
2203
case ASB_DISCARD_OLDER_PRI:
2204
case ASB_DISCARD_LEAST_CHG:
2205
case ASB_DISCARD_LOCAL:
2206
case ASB_DISCARD_REMOTE:
2207
dev_err(DEV, "Configuration error.\n");
2208
break;
2209
case ASB_DISCONNECT:
2210
break;
2211
case ASB_CONSENSUS:
2212
hg = drbd_asb_recover_0p(mdev);
2213
if (hg == -1 && mdev->state.role == R_SECONDARY)
2214
rv = hg;
2215
if (hg == 1 && mdev->state.role == R_PRIMARY)
2216
rv = hg;
2217
break;
2218
case ASB_VIOLENTLY:
2219
rv = drbd_asb_recover_0p(mdev);
2220
break;
2221
case ASB_DISCARD_SECONDARY:
2222
return mdev->state.role == R_PRIMARY ? 1 : -1;
2223
case ASB_CALL_HELPER:
2224
hg = drbd_asb_recover_0p(mdev);
2225
if (hg == -1 && mdev->state.role == R_PRIMARY) {
2226
enum drbd_state_rv rv2;
2227
2228
drbd_set_role(mdev, R_SECONDARY, 0);
2229
/* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2230
* we might be here in C_WF_REPORT_PARAMS which is transient.
2231
* we do not need to wait for the after state change work either. */
2232
rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2233
if (rv2 != SS_SUCCESS) {
2234
drbd_khelper(mdev, "pri-lost-after-sb");
2235
} else {
2236
dev_warn(DEV, "Successfully gave up primary role.\n");
2237
rv = hg;
2238
}
2239
} else
2240
rv = hg;
2241
}
2242
2243
return rv;
2244
}
2245
2246
static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2247
{
2248
int hg, rv = -100;
2249
2250
switch (mdev->net_conf->after_sb_2p) {
2251
case ASB_DISCARD_YOUNGER_PRI:
2252
case ASB_DISCARD_OLDER_PRI:
2253
case ASB_DISCARD_LEAST_CHG:
2254
case ASB_DISCARD_LOCAL:
2255
case ASB_DISCARD_REMOTE:
2256
case ASB_CONSENSUS:
2257
case ASB_DISCARD_SECONDARY:
2258
dev_err(DEV, "Configuration error.\n");
2259
break;
2260
case ASB_VIOLENTLY:
2261
rv = drbd_asb_recover_0p(mdev);
2262
break;
2263
case ASB_DISCONNECT:
2264
break;
2265
case ASB_CALL_HELPER:
2266
hg = drbd_asb_recover_0p(mdev);
2267
if (hg == -1) {
2268
enum drbd_state_rv rv2;
2269
2270
/* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2271
* we might be here in C_WF_REPORT_PARAMS which is transient.
2272
* we do not need to wait for the after state change work either. */
2273
rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2274
if (rv2 != SS_SUCCESS) {
2275
drbd_khelper(mdev, "pri-lost-after-sb");
2276
} else {
2277
dev_warn(DEV, "Successfully gave up primary role.\n");
2278
rv = hg;
2279
}
2280
} else
2281
rv = hg;
2282
}
2283
2284
return rv;
2285
}
2286
2287
static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2288
u64 bits, u64 flags)
2289
{
2290
if (!uuid) {
2291
dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2292
return;
2293
}
2294
dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2295
text,
2296
(unsigned long long)uuid[UI_CURRENT],
2297
(unsigned long long)uuid[UI_BITMAP],
2298
(unsigned long long)uuid[UI_HISTORY_START],
2299
(unsigned long long)uuid[UI_HISTORY_END],
2300
(unsigned long long)bits,
2301
(unsigned long long)flags);
2302
}
2303
2304
/*
2305
100 after split brain try auto recover
2306
2 C_SYNC_SOURCE set BitMap
2307
1 C_SYNC_SOURCE use BitMap
2308
0 no Sync
2309
-1 C_SYNC_TARGET use BitMap
2310
-2 C_SYNC_TARGET set BitMap
2311
-100 after split brain, disconnect
2312
-1000 unrelated data
2313
-1091 requires proto 91
2314
-1096 requires proto 96
2315
*/
2316
static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2317
{
2318
u64 self, peer;
2319
int i, j;
2320
2321
self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2322
peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2323
2324
*rule_nr = 10;
2325
if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2326
return 0;
2327
2328
*rule_nr = 20;
2329
if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2330
peer != UUID_JUST_CREATED)
2331
return -2;
2332
2333
*rule_nr = 30;
2334
if (self != UUID_JUST_CREATED &&
2335
(peer == UUID_JUST_CREATED || peer == (u64)0))
2336
return 2;
2337
2338
if (self == peer) {
2339
int rct, dc; /* roles at crash time */
2340
2341
if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2342
2343
if (mdev->agreed_pro_version < 91)
2344
return -1091;
2345
2346
if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2347
(mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2348
dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2349
drbd_uuid_set_bm(mdev, 0UL);
2350
2351
drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2352
mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2353
*rule_nr = 34;
2354
} else {
2355
dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2356
*rule_nr = 36;
2357
}
2358
2359
return 1;
2360
}
2361
2362
if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2363
2364
if (mdev->agreed_pro_version < 91)
2365
return -1091;
2366
2367
if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2368
(mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2369
dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2370
2371
mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2372
mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2373
mdev->p_uuid[UI_BITMAP] = 0UL;
2374
2375
drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2376
*rule_nr = 35;
2377
} else {
2378
dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2379
*rule_nr = 37;
2380
}
2381
2382
return -1;
2383
}
2384
2385
/* Common power [off|failure] */
2386
rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2387
(mdev->p_uuid[UI_FLAGS] & 2);
2388
/* lowest bit is set when we were primary,
2389
* next bit (weight 2) is set when peer was primary */
2390
*rule_nr = 40;
2391
2392
switch (rct) {
2393
case 0: /* !self_pri && !peer_pri */ return 0;
2394
case 1: /* self_pri && !peer_pri */ return 1;
2395
case 2: /* !self_pri && peer_pri */ return -1;
2396
case 3: /* self_pri && peer_pri */
2397
dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2398
return dc ? -1 : 1;
2399
}
2400
}
2401
2402
*rule_nr = 50;
2403
peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2404
if (self == peer)
2405
return -1;
2406
2407
*rule_nr = 51;
2408
peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2409
if (self == peer) {
2410
if (mdev->agreed_pro_version < 96 ?
2411
(mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2412
(mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2413
peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2414
/* The last P_SYNC_UUID did not get though. Undo the last start of
2415
resync as sync source modifications of the peer's UUIDs. */
2416
2417
if (mdev->agreed_pro_version < 91)
2418
return -1091;
2419
2420
mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2421
mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2422
2423
dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2424
drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2425
2426
return -1;
2427
}
2428
}
2429
2430
*rule_nr = 60;
2431
self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2432
for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2433
peer = mdev->p_uuid[i] & ~((u64)1);
2434
if (self == peer)
2435
return -2;
2436
}
2437
2438
*rule_nr = 70;
2439
self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2440
peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2441
if (self == peer)
2442
return 1;
2443
2444
*rule_nr = 71;
2445
self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2446
if (self == peer) {
2447
if (mdev->agreed_pro_version < 96 ?
2448
(mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2449
(mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2450
self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2451
/* The last P_SYNC_UUID did not get though. Undo the last start of
2452
resync as sync source modifications of our UUIDs. */
2453
2454
if (mdev->agreed_pro_version < 91)
2455
return -1091;
2456
2457
_drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2458
_drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2459
2460
dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2461
drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2462
mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2463
2464
return 1;
2465
}
2466
}
2467
2468
2469
*rule_nr = 80;
2470
peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2471
for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2472
self = mdev->ldev->md.uuid[i] & ~((u64)1);
2473
if (self == peer)
2474
return 2;
2475
}
2476
2477
*rule_nr = 90;
2478
self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2479
peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2480
if (self == peer && self != ((u64)0))
2481
return 100;
2482
2483
*rule_nr = 100;
2484
for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2485
self = mdev->ldev->md.uuid[i] & ~((u64)1);
2486
for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2487
peer = mdev->p_uuid[j] & ~((u64)1);
2488
if (self == peer)
2489
return -100;
2490
}
2491
}
2492
2493
return -1000;
2494
}
2495
2496
/* drbd_sync_handshake() returns the new conn state on success, or
2497
CONN_MASK (-1) on failure.
2498
*/
2499
static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2500
enum drbd_disk_state peer_disk) __must_hold(local)
2501
{
2502
int hg, rule_nr;
2503
enum drbd_conns rv = C_MASK;
2504
enum drbd_disk_state mydisk;
2505
2506
mydisk = mdev->state.disk;
2507
if (mydisk == D_NEGOTIATING)
2508
mydisk = mdev->new_state_tmp.disk;
2509
2510
dev_info(DEV, "drbd_sync_handshake:\n");
2511
drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2512
drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2513
mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2514
2515
hg = drbd_uuid_compare(mdev, &rule_nr);
2516
2517
dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2518
2519
if (hg == -1000) {
2520
dev_alert(DEV, "Unrelated data, aborting!\n");
2521
return C_MASK;
2522
}
2523
if (hg < -1000) {
2524
dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2525
return C_MASK;
2526
}
2527
2528
if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2529
(peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2530
int f = (hg == -100) || abs(hg) == 2;
2531
hg = mydisk > D_INCONSISTENT ? 1 : -1;
2532
if (f)
2533
hg = hg*2;
2534
dev_info(DEV, "Becoming sync %s due to disk states.\n",
2535
hg > 0 ? "source" : "target");
2536
}
2537
2538
if (abs(hg) == 100)
2539
drbd_khelper(mdev, "initial-split-brain");
2540
2541
if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2542
int pcount = (mdev->state.role == R_PRIMARY)
2543
+ (peer_role == R_PRIMARY);
2544
int forced = (hg == -100);
2545
2546
switch (pcount) {
2547
case 0:
2548
hg = drbd_asb_recover_0p(mdev);
2549
break;
2550
case 1:
2551
hg = drbd_asb_recover_1p(mdev);
2552
break;
2553
case 2:
2554
hg = drbd_asb_recover_2p(mdev);
2555
break;
2556
}
2557
if (abs(hg) < 100) {
2558
dev_warn(DEV, "Split-Brain detected, %d primaries, "
2559
"automatically solved. Sync from %s node\n",
2560
pcount, (hg < 0) ? "peer" : "this");
2561
if (forced) {
2562
dev_warn(DEV, "Doing a full sync, since"
2563
" UUIDs where ambiguous.\n");
2564
hg = hg*2;
2565
}
2566
}
2567
}
2568
2569
if (hg == -100) {
2570
if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2571
hg = -1;
2572
if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2573
hg = 1;
2574
2575
if (abs(hg) < 100)
2576
dev_warn(DEV, "Split-Brain detected, manually solved. "
2577
"Sync from %s node\n",
2578
(hg < 0) ? "peer" : "this");
2579
}
2580
2581
if (hg == -100) {
2582
/* FIXME this log message is not correct if we end up here
2583
* after an attempted attach on a diskless node.
2584
* We just refuse to attach -- well, we drop the "connection"
2585
* to that disk, in a way... */
2586
dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2587
drbd_khelper(mdev, "split-brain");
2588
return C_MASK;
2589
}
2590
2591
if (hg > 0 && mydisk <= D_INCONSISTENT) {
2592
dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2593
return C_MASK;
2594
}
2595
2596
if (hg < 0 && /* by intention we do not use mydisk here. */
2597
mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2598
switch (mdev->net_conf->rr_conflict) {
2599
case ASB_CALL_HELPER:
2600
drbd_khelper(mdev, "pri-lost");
2601
/* fall through */
2602
case ASB_DISCONNECT:
2603
dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2604
return C_MASK;
2605
case ASB_VIOLENTLY:
2606
dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2607
"assumption\n");
2608
}
2609
}
2610
2611
if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2612
if (hg == 0)
2613
dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2614
else
2615
dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2616
drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2617
abs(hg) >= 2 ? "full" : "bit-map based");
2618
return C_MASK;
2619
}
2620
2621
if (abs(hg) >= 2) {
2622
dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2623
if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2624
BM_LOCKED_SET_ALLOWED))
2625
return C_MASK;
2626
}
2627
2628
if (hg > 0) { /* become sync source. */
2629
rv = C_WF_BITMAP_S;
2630
} else if (hg < 0) { /* become sync target */
2631
rv = C_WF_BITMAP_T;
2632
} else {
2633
rv = C_CONNECTED;
2634
if (drbd_bm_total_weight(mdev)) {
2635
dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2636
drbd_bm_total_weight(mdev));
2637
}
2638
}
2639
2640
return rv;
2641
}
2642
2643
/* returns 1 if invalid */
2644
static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2645
{
2646
/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2647
if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2648
(self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2649
return 0;
2650
2651
/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2652
if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2653
self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2654
return 1;
2655
2656
/* everything else is valid if they are equal on both sides. */
2657
if (peer == self)
2658
return 0;
2659
2660
/* everything es is invalid. */
2661
return 1;
2662
}
2663
2664
static int receive_protocol(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
2665
{
2666
struct p_protocol *p = &mdev->data.rbuf.protocol;
2667
int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2668
int p_want_lose, p_two_primaries, cf;
2669
char p_integrity_alg[SHARED_SECRET_MAX] = "";
2670
2671
p_proto = be32_to_cpu(p->protocol);
2672
p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2673
p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2674
p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
2675
p_two_primaries = be32_to_cpu(p->two_primaries);
2676
cf = be32_to_cpu(p->conn_flags);
2677
p_want_lose = cf & CF_WANT_LOSE;
2678
2679
clear_bit(CONN_DRY_RUN, &mdev->flags);
2680
2681
if (cf & CF_DRY_RUN)
2682
set_bit(CONN_DRY_RUN, &mdev->flags);
2683
2684
if (p_proto != mdev->net_conf->wire_protocol) {
2685
dev_err(DEV, "incompatible communication protocols\n");
2686
goto disconnect;
2687
}
2688
2689
if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2690
dev_err(DEV, "incompatible after-sb-0pri settings\n");
2691
goto disconnect;
2692
}
2693
2694
if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2695
dev_err(DEV, "incompatible after-sb-1pri settings\n");
2696
goto disconnect;
2697
}
2698
2699
if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2700
dev_err(DEV, "incompatible after-sb-2pri settings\n");
2701
goto disconnect;
2702
}
2703
2704
if (p_want_lose && mdev->net_conf->want_lose) {
2705
dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2706
goto disconnect;
2707
}
2708
2709
if (p_two_primaries != mdev->net_conf->two_primaries) {
2710
dev_err(DEV, "incompatible setting of the two-primaries options\n");
2711
goto disconnect;
2712
}
2713
2714
if (mdev->agreed_pro_version >= 87) {
2715
unsigned char *my_alg = mdev->net_conf->integrity_alg;
2716
2717
if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2718
return false;
2719
2720
p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2721
if (strcmp(p_integrity_alg, my_alg)) {
2722
dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2723
goto disconnect;
2724
}
2725
dev_info(DEV, "data-integrity-alg: %s\n",
2726
my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2727
}
2728
2729
return true;
2730
2731
disconnect:
2732
drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2733
return false;
2734
}
2735
2736
/* helper function
2737
* input: alg name, feature name
2738
* return: NULL (alg name was "")
2739
* ERR_PTR(error) if something goes wrong
2740
* or the crypto hash ptr, if it worked out ok. */
2741
struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2742
const char *alg, const char *name)
2743
{
2744
struct crypto_hash *tfm;
2745
2746
if (!alg[0])
2747
return NULL;
2748
2749
tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2750
if (IS_ERR(tfm)) {
2751
dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2752
alg, name, PTR_ERR(tfm));
2753
return tfm;
2754
}
2755
if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2756
crypto_free_hash(tfm);
2757
dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2758
return ERR_PTR(-EINVAL);
2759
}
2760
return tfm;
2761
}
2762
2763
static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int packet_size)
2764
{
2765
int ok = true;
2766
struct p_rs_param_95 *p = &mdev->data.rbuf.rs_param_95;
2767
unsigned int header_size, data_size, exp_max_sz;
2768
struct crypto_hash *verify_tfm = NULL;
2769
struct crypto_hash *csums_tfm = NULL;
2770
const int apv = mdev->agreed_pro_version;
2771
int *rs_plan_s = NULL;
2772
int fifo_size = 0;
2773
2774
exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
2775
: apv == 88 ? sizeof(struct p_rs_param)
2776
+ SHARED_SECRET_MAX
2777
: apv <= 94 ? sizeof(struct p_rs_param_89)
2778
: /* apv >= 95 */ sizeof(struct p_rs_param_95);
2779
2780
if (packet_size > exp_max_sz) {
2781
dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2782
packet_size, exp_max_sz);
2783
return false;
2784
}
2785
2786
if (apv <= 88) {
2787
header_size = sizeof(struct p_rs_param) - sizeof(struct p_header80);
2788
data_size = packet_size - header_size;
2789
} else if (apv <= 94) {
2790
header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header80);
2791
data_size = packet_size - header_size;
2792
D_ASSERT(data_size == 0);
2793
} else {
2794
header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header80);
2795
data_size = packet_size - header_size;
2796
D_ASSERT(data_size == 0);
2797
}
2798
2799
/* initialize verify_alg and csums_alg */
2800
memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2801
2802
if (drbd_recv(mdev, &p->head.payload, header_size) != header_size)
2803
return false;
2804
2805
mdev->sync_conf.rate = be32_to_cpu(p->rate);
2806
2807
if (apv >= 88) {
2808
if (apv == 88) {
2809
if (data_size > SHARED_SECRET_MAX) {
2810
dev_err(DEV, "verify-alg too long, "
2811
"peer wants %u, accepting only %u byte\n",
2812
data_size, SHARED_SECRET_MAX);
2813
return false;
2814
}
2815
2816
if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2817
return false;
2818
2819
/* we expect NUL terminated string */
2820
/* but just in case someone tries to be evil */
2821
D_ASSERT(p->verify_alg[data_size-1] == 0);
2822
p->verify_alg[data_size-1] = 0;
2823
2824
} else /* apv >= 89 */ {
2825
/* we still expect NUL terminated strings */
2826
/* but just in case someone tries to be evil */
2827
D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2828
D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2829
p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2830
p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2831
}
2832
2833
if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2834
if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2835
dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2836
mdev->sync_conf.verify_alg, p->verify_alg);
2837
goto disconnect;
2838
}
2839
verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2840
p->verify_alg, "verify-alg");
2841
if (IS_ERR(verify_tfm)) {
2842
verify_tfm = NULL;
2843
goto disconnect;
2844
}
2845
}
2846
2847
if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2848
if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2849
dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2850
mdev->sync_conf.csums_alg, p->csums_alg);
2851
goto disconnect;
2852
}
2853
csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2854
p->csums_alg, "csums-alg");
2855
if (IS_ERR(csums_tfm)) {
2856
csums_tfm = NULL;
2857
goto disconnect;
2858
}
2859
}
2860
2861
if (apv > 94) {
2862
mdev->sync_conf.rate = be32_to_cpu(p->rate);
2863
mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
2864
mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target);
2865
mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target);
2866
mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate);
2867
2868
fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
2869
if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
2870
rs_plan_s = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
2871
if (!rs_plan_s) {
2872
dev_err(DEV, "kmalloc of fifo_buffer failed");
2873
goto disconnect;
2874
}
2875
}
2876
}
2877
2878
spin_lock(&mdev->peer_seq_lock);
2879
/* lock against drbd_nl_syncer_conf() */
2880
if (verify_tfm) {
2881
strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2882
mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2883
crypto_free_hash(mdev->verify_tfm);
2884
mdev->verify_tfm = verify_tfm;
2885
dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2886
}
2887
if (csums_tfm) {
2888
strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2889
mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2890
crypto_free_hash(mdev->csums_tfm);
2891
mdev->csums_tfm = csums_tfm;
2892
dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2893
}
2894
if (fifo_size != mdev->rs_plan_s.size) {
2895
kfree(mdev->rs_plan_s.values);
2896
mdev->rs_plan_s.values = rs_plan_s;
2897
mdev->rs_plan_s.size = fifo_size;
2898
mdev->rs_planed = 0;
2899
}
2900
spin_unlock(&mdev->peer_seq_lock);
2901
}
2902
2903
return ok;
2904
disconnect:
2905
/* just for completeness: actually not needed,
2906
* as this is not reached if csums_tfm was ok. */
2907
crypto_free_hash(csums_tfm);
2908
/* but free the verify_tfm again, if csums_tfm did not work out */
2909
crypto_free_hash(verify_tfm);
2910
drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2911
return false;
2912
}
2913
2914
/* warn if the arguments differ by more than 12.5% */
2915
static void warn_if_differ_considerably(struct drbd_conf *mdev,
2916
const char *s, sector_t a, sector_t b)
2917
{
2918
sector_t d;
2919
if (a == 0 || b == 0)
2920
return;
2921
d = (a > b) ? (a - b) : (b - a);
2922
if (d > (a>>3) || d > (b>>3))
2923
dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2924
(unsigned long long)a, (unsigned long long)b);
2925
}
2926
2927
static int receive_sizes(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
2928
{
2929
struct p_sizes *p = &mdev->data.rbuf.sizes;
2930
enum determine_dev_size dd = unchanged;
2931
sector_t p_size, p_usize, my_usize;
2932
int ldsc = 0; /* local disk size changed */
2933
enum dds_flags ddsf;
2934
2935
p_size = be64_to_cpu(p->d_size);
2936
p_usize = be64_to_cpu(p->u_size);
2937
2938
if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2939
dev_err(DEV, "some backing storage is needed\n");
2940
drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2941
return false;
2942
}
2943
2944
/* just store the peer's disk size for now.
2945
* we still need to figure out whether we accept that. */
2946
mdev->p_size = p_size;
2947
2948
if (get_ldev(mdev)) {
2949
warn_if_differ_considerably(mdev, "lower level device sizes",
2950
p_size, drbd_get_max_capacity(mdev->ldev));
2951
warn_if_differ_considerably(mdev, "user requested size",
2952
p_usize, mdev->ldev->dc.disk_size);
2953
2954
/* if this is the first connect, or an otherwise expected
2955
* param exchange, choose the minimum */
2956
if (mdev->state.conn == C_WF_REPORT_PARAMS)
2957
p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2958
p_usize);
2959
2960
my_usize = mdev->ldev->dc.disk_size;
2961
2962
if (mdev->ldev->dc.disk_size != p_usize) {
2963
mdev->ldev->dc.disk_size = p_usize;
2964
dev_info(DEV, "Peer sets u_size to %lu sectors\n",
2965
(unsigned long)mdev->ldev->dc.disk_size);
2966
}
2967
2968
/* Never shrink a device with usable data during connect.
2969
But allow online shrinking if we are connected. */
2970
if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
2971
drbd_get_capacity(mdev->this_bdev) &&
2972
mdev->state.disk >= D_OUTDATED &&
2973
mdev->state.conn < C_CONNECTED) {
2974
dev_err(DEV, "The peer's disk size is too small!\n");
2975
drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2976
mdev->ldev->dc.disk_size = my_usize;
2977
put_ldev(mdev);
2978
return false;
2979
}
2980
put_ldev(mdev);
2981
}
2982
2983
ddsf = be16_to_cpu(p->dds_flags);
2984
if (get_ldev(mdev)) {
2985
dd = drbd_determine_dev_size(mdev, ddsf);
2986
put_ldev(mdev);
2987
if (dd == dev_size_error)
2988
return false;
2989
drbd_md_sync(mdev);
2990
} else {
2991
/* I am diskless, need to accept the peer's size. */
2992
drbd_set_my_capacity(mdev, p_size);
2993
}
2994
2995
mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
2996
drbd_reconsider_max_bio_size(mdev);
2997
2998
if (get_ldev(mdev)) {
2999
if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3000
mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3001
ldsc = 1;
3002
}
3003
3004
put_ldev(mdev);
3005
}
3006
3007
if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3008
if (be64_to_cpu(p->c_size) !=
3009
drbd_get_capacity(mdev->this_bdev) || ldsc) {
3010
/* we have different sizes, probably peer
3011
* needs to know my new size... */
3012
drbd_send_sizes(mdev, 0, ddsf);
3013
}
3014
if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3015
(dd == grew && mdev->state.conn == C_CONNECTED)) {
3016
if (mdev->state.pdsk >= D_INCONSISTENT &&
3017
mdev->state.disk >= D_INCONSISTENT) {
3018
if (ddsf & DDSF_NO_RESYNC)
3019
dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3020
else
3021
resync_after_online_grow(mdev);
3022
} else
3023
set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3024
}
3025
}
3026
3027
return true;
3028
}
3029
3030
static int receive_uuids(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3031
{
3032
struct p_uuids *p = &mdev->data.rbuf.uuids;
3033
u64 *p_uuid;
3034
int i, updated_uuids = 0;
3035
3036
p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3037
3038
for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3039
p_uuid[i] = be64_to_cpu(p->uuid[i]);
3040
3041
kfree(mdev->p_uuid);
3042
mdev->p_uuid = p_uuid;
3043
3044
if (mdev->state.conn < C_CONNECTED &&
3045
mdev->state.disk < D_INCONSISTENT &&
3046
mdev->state.role == R_PRIMARY &&
3047
(mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3048
dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3049
(unsigned long long)mdev->ed_uuid);
3050
drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3051
return false;
3052
}
3053
3054
if (get_ldev(mdev)) {
3055
int skip_initial_sync =
3056
mdev->state.conn == C_CONNECTED &&
3057
mdev->agreed_pro_version >= 90 &&
3058
mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3059
(p_uuid[UI_FLAGS] & 8);
3060
if (skip_initial_sync) {
3061
dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3062
drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3063
"clear_n_write from receive_uuids",
3064
BM_LOCKED_TEST_ALLOWED);
3065
_drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3066
_drbd_uuid_set(mdev, UI_BITMAP, 0);
3067
_drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3068
CS_VERBOSE, NULL);
3069
drbd_md_sync(mdev);
3070
updated_uuids = 1;
3071
}
3072
put_ldev(mdev);
3073
} else if (mdev->state.disk < D_INCONSISTENT &&
3074
mdev->state.role == R_PRIMARY) {
3075
/* I am a diskless primary, the peer just created a new current UUID
3076
for me. */
3077
updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3078
}
3079
3080
/* Before we test for the disk state, we should wait until an eventually
3081
ongoing cluster wide state change is finished. That is important if
3082
we are primary and are detaching from our disk. We need to see the
3083
new disk state... */
3084
wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3085
if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3086
updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3087
3088
if (updated_uuids)
3089
drbd_print_uuids(mdev, "receiver updated UUIDs to");
3090
3091
return true;
3092
}
3093
3094
/**
3095
* convert_state() - Converts the peer's view of the cluster state to our point of view
3096
* @ps: The state as seen by the peer.
3097
*/
3098
static union drbd_state convert_state(union drbd_state ps)
3099
{
3100
union drbd_state ms;
3101
3102
static enum drbd_conns c_tab[] = {
3103
[C_CONNECTED] = C_CONNECTED,
3104
3105
[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3106
[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3107
[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3108
[C_VERIFY_S] = C_VERIFY_T,
3109
[C_MASK] = C_MASK,
3110
};
3111
3112
ms.i = ps.i;
3113
3114
ms.conn = c_tab[ps.conn];
3115
ms.peer = ps.role;
3116
ms.role = ps.peer;
3117
ms.pdsk = ps.disk;
3118
ms.disk = ps.pdsk;
3119
ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3120
3121
return ms;
3122
}
3123
3124
static int receive_req_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3125
{
3126
struct p_req_state *p = &mdev->data.rbuf.req_state;
3127
union drbd_state mask, val;
3128
enum drbd_state_rv rv;
3129
3130
mask.i = be32_to_cpu(p->mask);
3131
val.i = be32_to_cpu(p->val);
3132
3133
if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3134
test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3135
drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3136
return true;
3137
}
3138
3139
mask = convert_state(mask);
3140
val = convert_state(val);
3141
3142
rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3143
3144
drbd_send_sr_reply(mdev, rv);
3145
drbd_md_sync(mdev);
3146
3147
return true;
3148
}
3149
3150
static int receive_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3151
{
3152
struct p_state *p = &mdev->data.rbuf.state;
3153
union drbd_state os, ns, peer_state;
3154
enum drbd_disk_state real_peer_disk;
3155
enum chg_state_flags cs_flags;
3156
int rv;
3157
3158
peer_state.i = be32_to_cpu(p->state);
3159
3160
real_peer_disk = peer_state.disk;
3161
if (peer_state.disk == D_NEGOTIATING) {
3162
real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3163
dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3164
}
3165
3166
spin_lock_irq(&mdev->req_lock);
3167
retry:
3168
os = ns = mdev->state;
3169
spin_unlock_irq(&mdev->req_lock);
3170
3171
/* peer says his disk is uptodate, while we think it is inconsistent,
3172
* and this happens while we think we have a sync going on. */
3173
if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3174
os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3175
/* If we are (becoming) SyncSource, but peer is still in sync
3176
* preparation, ignore its uptodate-ness to avoid flapping, it
3177
* will change to inconsistent once the peer reaches active
3178
* syncing states.
3179
* It may have changed syncer-paused flags, however, so we
3180
* cannot ignore this completely. */
3181
if (peer_state.conn > C_CONNECTED &&
3182
peer_state.conn < C_SYNC_SOURCE)
3183
real_peer_disk = D_INCONSISTENT;
3184
3185
/* if peer_state changes to connected at the same time,
3186
* it explicitly notifies us that it finished resync.
3187
* Maybe we should finish it up, too? */
3188
else if (os.conn >= C_SYNC_SOURCE &&
3189
peer_state.conn == C_CONNECTED) {
3190
if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3191
drbd_resync_finished(mdev);
3192
return true;
3193
}
3194
}
3195
3196
/* peer says his disk is inconsistent, while we think it is uptodate,
3197
* and this happens while the peer still thinks we have a sync going on,
3198
* but we think we are already done with the sync.
3199
* We ignore this to avoid flapping pdsk.
3200
* This should not happen, if the peer is a recent version of drbd. */
3201
if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3202
os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3203
real_peer_disk = D_UP_TO_DATE;
3204
3205
if (ns.conn == C_WF_REPORT_PARAMS)
3206
ns.conn = C_CONNECTED;
3207
3208
if (peer_state.conn == C_AHEAD)
3209
ns.conn = C_BEHIND;
3210
3211
if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3212
get_ldev_if_state(mdev, D_NEGOTIATING)) {
3213
int cr; /* consider resync */
3214
3215
/* if we established a new connection */
3216
cr = (os.conn < C_CONNECTED);
3217
/* if we had an established connection
3218
* and one of the nodes newly attaches a disk */
3219
cr |= (os.conn == C_CONNECTED &&
3220
(peer_state.disk == D_NEGOTIATING ||
3221
os.disk == D_NEGOTIATING));
3222
/* if we have both been inconsistent, and the peer has been
3223
* forced to be UpToDate with --overwrite-data */
3224
cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3225
/* if we had been plain connected, and the admin requested to
3226
* start a sync by "invalidate" or "invalidate-remote" */
3227
cr |= (os.conn == C_CONNECTED &&
3228
(peer_state.conn >= C_STARTING_SYNC_S &&
3229
peer_state.conn <= C_WF_BITMAP_T));
3230
3231
if (cr)
3232
ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3233
3234
put_ldev(mdev);
3235
if (ns.conn == C_MASK) {
3236
ns.conn = C_CONNECTED;
3237
if (mdev->state.disk == D_NEGOTIATING) {
3238
drbd_force_state(mdev, NS(disk, D_FAILED));
3239
} else if (peer_state.disk == D_NEGOTIATING) {
3240
dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3241
peer_state.disk = D_DISKLESS;
3242
real_peer_disk = D_DISKLESS;
3243
} else {
3244
if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3245
return false;
3246
D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3247
drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3248
return false;
3249
}
3250
}
3251
}
3252
3253
spin_lock_irq(&mdev->req_lock);
3254
if (mdev->state.i != os.i)
3255
goto retry;
3256
clear_bit(CONSIDER_RESYNC, &mdev->flags);
3257
ns.peer = peer_state.role;
3258
ns.pdsk = real_peer_disk;
3259
ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3260
if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3261
ns.disk = mdev->new_state_tmp.disk;
3262
cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3263
if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3264
test_bit(NEW_CUR_UUID, &mdev->flags)) {
3265
/* Do not allow tl_restart(resend) for a rebooted peer. We can only allow this
3266
for temporal network outages! */
3267
spin_unlock_irq(&mdev->req_lock);
3268
dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3269
tl_clear(mdev);
3270
drbd_uuid_new_current(mdev);
3271
clear_bit(NEW_CUR_UUID, &mdev->flags);
3272
drbd_force_state(mdev, NS2(conn, C_PROTOCOL_ERROR, susp, 0));
3273
return false;
3274
}
3275
rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3276
ns = mdev->state;
3277
spin_unlock_irq(&mdev->req_lock);
3278
3279
if (rv < SS_SUCCESS) {
3280
drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3281
return false;
3282
}
3283
3284
if (os.conn > C_WF_REPORT_PARAMS) {
3285
if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3286
peer_state.disk != D_NEGOTIATING ) {
3287
/* we want resync, peer has not yet decided to sync... */
3288
/* Nowadays only used when forcing a node into primary role and
3289
setting its disk to UpToDate with that */
3290
drbd_send_uuids(mdev);
3291
drbd_send_state(mdev);
3292
}
3293
}
3294
3295
mdev->net_conf->want_lose = 0;
3296
3297
drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3298
3299
return true;
3300
}
3301
3302
static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3303
{
3304
struct p_rs_uuid *p = &mdev->data.rbuf.rs_uuid;
3305
3306
wait_event(mdev->misc_wait,
3307
mdev->state.conn == C_WF_SYNC_UUID ||
3308
mdev->state.conn == C_BEHIND ||
3309
mdev->state.conn < C_CONNECTED ||
3310
mdev->state.disk < D_NEGOTIATING);
3311
3312
/* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3313
3314
/* Here the _drbd_uuid_ functions are right, current should
3315
_not_ be rotated into the history */
3316
if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3317
_drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3318
_drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3319
3320
drbd_print_uuids(mdev, "updated sync uuid");
3321
drbd_start_resync(mdev, C_SYNC_TARGET);
3322
3323
put_ldev(mdev);
3324
} else
3325
dev_err(DEV, "Ignoring SyncUUID packet!\n");
3326
3327
return true;
3328
}
3329
3330
/**
3331
* receive_bitmap_plain
3332
*
3333
* Return 0 when done, 1 when another iteration is needed, and a negative error
3334
* code upon failure.
3335
*/
3336
static int
3337
receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3338
unsigned long *buffer, struct bm_xfer_ctx *c)
3339
{
3340
unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3341
unsigned want = num_words * sizeof(long);
3342
int err;
3343
3344
if (want != data_size) {
3345
dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
3346
return -EIO;
3347
}
3348
if (want == 0)
3349
return 0;
3350
err = drbd_recv(mdev, buffer, want);
3351
if (err != want) {
3352
if (err >= 0)
3353
err = -EIO;
3354
return err;
3355
}
3356
3357
drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3358
3359
c->word_offset += num_words;
3360
c->bit_offset = c->word_offset * BITS_PER_LONG;
3361
if (c->bit_offset > c->bm_bits)
3362
c->bit_offset = c->bm_bits;
3363
3364
return 1;
3365
}
3366
3367
/**
3368
* recv_bm_rle_bits
3369
*
3370
* Return 0 when done, 1 when another iteration is needed, and a negative error
3371
* code upon failure.
3372
*/
3373
static int
3374
recv_bm_rle_bits(struct drbd_conf *mdev,
3375
struct p_compressed_bm *p,
3376
struct bm_xfer_ctx *c)
3377
{
3378
struct bitstream bs;
3379
u64 look_ahead;
3380
u64 rl;
3381
u64 tmp;
3382
unsigned long s = c->bit_offset;
3383
unsigned long e;
3384
int len = be16_to_cpu(p->head.length) - (sizeof(*p) - sizeof(p->head));
3385
int toggle = DCBP_get_start(p);
3386
int have;
3387
int bits;
3388
3389
bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3390
3391
bits = bitstream_get_bits(&bs, &look_ahead, 64);
3392
if (bits < 0)
3393
return -EIO;
3394
3395
for (have = bits; have > 0; s += rl, toggle = !toggle) {
3396
bits = vli_decode_bits(&rl, look_ahead);
3397
if (bits <= 0)
3398
return -EIO;
3399
3400
if (toggle) {
3401
e = s + rl -1;
3402
if (e >= c->bm_bits) {
3403
dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3404
return -EIO;
3405
}
3406
_drbd_bm_set_bits(mdev, s, e);
3407
}
3408
3409
if (have < bits) {
3410
dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3411
have, bits, look_ahead,
3412
(unsigned int)(bs.cur.b - p->code),
3413
(unsigned int)bs.buf_len);
3414
return -EIO;
3415
}
3416
look_ahead >>= bits;
3417
have -= bits;
3418
3419
bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3420
if (bits < 0)
3421
return -EIO;
3422
look_ahead |= tmp << have;
3423
have += bits;
3424
}
3425
3426
c->bit_offset = s;
3427
bm_xfer_ctx_bit_to_word_offset(c);
3428
3429
return (s != c->bm_bits);
3430
}
3431
3432
/**
3433
* decode_bitmap_c
3434
*
3435
* Return 0 when done, 1 when another iteration is needed, and a negative error
3436
* code upon failure.
3437
*/
3438
static int
3439
decode_bitmap_c(struct drbd_conf *mdev,
3440
struct p_compressed_bm *p,
3441
struct bm_xfer_ctx *c)
3442
{
3443
if (DCBP_get_code(p) == RLE_VLI_Bits)
3444
return recv_bm_rle_bits(mdev, p, c);
3445
3446
/* other variants had been implemented for evaluation,
3447
* but have been dropped as this one turned out to be "best"
3448
* during all our tests. */
3449
3450
dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3451
drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3452
return -EIO;
3453
}
3454
3455
void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3456
const char *direction, struct bm_xfer_ctx *c)
3457
{
3458
/* what would it take to transfer it "plaintext" */
3459
unsigned plain = sizeof(struct p_header80) *
3460
((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3461
+ c->bm_words * sizeof(long);
3462
unsigned total = c->bytes[0] + c->bytes[1];
3463
unsigned r;
3464
3465
/* total can not be zero. but just in case: */
3466
if (total == 0)
3467
return;
3468
3469
/* don't report if not compressed */
3470
if (total >= plain)
3471
return;
3472
3473
/* total < plain. check for overflow, still */
3474
r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3475
: (1000 * total / plain);
3476
3477
if (r > 1000)
3478
r = 1000;
3479
3480
r = 1000 - r;
3481
dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3482
"total %u; compression: %u.%u%%\n",
3483
direction,
3484
c->bytes[1], c->packets[1],
3485
c->bytes[0], c->packets[0],
3486
total, r/10, r % 10);
3487
}
3488
3489
/* Since we are processing the bitfield from lower addresses to higher,
3490
it does not matter if the process it in 32 bit chunks or 64 bit
3491
chunks as long as it is little endian. (Understand it as byte stream,
3492
beginning with the lowest byte...) If we would use big endian
3493
we would need to process it from the highest address to the lowest,
3494
in order to be agnostic to the 32 vs 64 bits issue.
3495
3496
returns 0 on failure, 1 if we successfully received it. */
3497
static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3498
{
3499
struct bm_xfer_ctx c;
3500
void *buffer;
3501
int err;
3502
int ok = false;
3503
struct p_header80 *h = &mdev->data.rbuf.header.h80;
3504
3505
drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3506
/* you are supposed to send additional out-of-sync information
3507
* if you actually set bits during this phase */
3508
3509
/* maybe we should use some per thread scratch page,
3510
* and allocate that during initial device creation? */
3511
buffer = (unsigned long *) __get_free_page(GFP_NOIO);
3512
if (!buffer) {
3513
dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3514
goto out;
3515
}
3516
3517
c = (struct bm_xfer_ctx) {
3518
.bm_bits = drbd_bm_bits(mdev),
3519
.bm_words = drbd_bm_words(mdev),
3520
};
3521
3522
for(;;) {
3523
if (cmd == P_BITMAP) {
3524
err = receive_bitmap_plain(mdev, data_size, buffer, &c);
3525
} else if (cmd == P_COMPRESSED_BITMAP) {
3526
/* MAYBE: sanity check that we speak proto >= 90,
3527
* and the feature is enabled! */
3528
struct p_compressed_bm *p;
3529
3530
if (data_size > BM_PACKET_PAYLOAD_BYTES) {
3531
dev_err(DEV, "ReportCBitmap packet too large\n");
3532
goto out;
3533
}
3534
/* use the page buff */
3535
p = buffer;
3536
memcpy(p, h, sizeof(*h));
3537
if (drbd_recv(mdev, p->head.payload, data_size) != data_size)
3538
goto out;
3539
if (data_size <= (sizeof(*p) - sizeof(p->head))) {
3540
dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);
3541
goto out;
3542
}
3543
err = decode_bitmap_c(mdev, p, &c);
3544
} else {
3545
dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
3546
goto out;
3547
}
3548
3549
c.packets[cmd == P_BITMAP]++;
3550
c.bytes[cmd == P_BITMAP] += sizeof(struct p_header80) + data_size;
3551
3552
if (err <= 0) {
3553
if (err < 0)
3554
goto out;
3555
break;
3556
}
3557
if (!drbd_recv_header(mdev, &cmd, &data_size))
3558
goto out;
3559
}
3560
3561
INFO_bm_xfer_stats(mdev, "receive", &c);
3562
3563
if (mdev->state.conn == C_WF_BITMAP_T) {
3564
enum drbd_state_rv rv;
3565
3566
ok = !drbd_send_bitmap(mdev);
3567
if (!ok)
3568
goto out;
3569
/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3570
rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3571
D_ASSERT(rv == SS_SUCCESS);
3572
} else if (mdev->state.conn != C_WF_BITMAP_S) {
3573
/* admin may have requested C_DISCONNECTING,
3574
* other threads may have noticed network errors */
3575
dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3576
drbd_conn_str(mdev->state.conn));
3577
}
3578
3579
ok = true;
3580
out:
3581
drbd_bm_unlock(mdev);
3582
if (ok && mdev->state.conn == C_WF_BITMAP_S)
3583
drbd_start_resync(mdev, C_SYNC_SOURCE);
3584
free_page((unsigned long) buffer);
3585
return ok;
3586
}
3587
3588
static int receive_skip(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3589
{
3590
/* TODO zero copy sink :) */
3591
static char sink[128];
3592
int size, want, r;
3593
3594
dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3595
cmd, data_size);
3596
3597
size = data_size;
3598
while (size > 0) {
3599
want = min_t(int, size, sizeof(sink));
3600
r = drbd_recv(mdev, sink, want);
3601
ERR_IF(r <= 0) break;
3602
size -= r;
3603
}
3604
return size == 0;
3605
}
3606
3607
static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3608
{
3609
/* Make sure we've acked all the TCP data associated
3610
* with the data requests being unplugged */
3611
drbd_tcp_quickack(mdev->data.socket);
3612
3613
return true;
3614
}
3615
3616
static int receive_out_of_sync(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3617
{
3618
struct p_block_desc *p = &mdev->data.rbuf.block_desc;
3619
3620
switch (mdev->state.conn) {
3621
case C_WF_SYNC_UUID:
3622
case C_WF_BITMAP_T:
3623
case C_BEHIND:
3624
break;
3625
default:
3626
dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3627
drbd_conn_str(mdev->state.conn));
3628
}
3629
3630
drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3631
3632
return true;
3633
}
3634
3635
typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, enum drbd_packets cmd, unsigned int to_receive);
3636
3637
struct data_cmd {
3638
int expect_payload;
3639
size_t pkt_size;
3640
drbd_cmd_handler_f function;
3641
};
3642
3643
static struct data_cmd drbd_cmd_handler[] = {
3644
[P_DATA] = { 1, sizeof(struct p_data), receive_Data },
3645
[P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
3646
[P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3647
[P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
3648
[P_BITMAP] = { 1, sizeof(struct p_header80), receive_bitmap } ,
3649
[P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header80), receive_bitmap } ,
3650
[P_UNPLUG_REMOTE] = { 0, sizeof(struct p_header80), receive_UnplugRemote },
3651
[P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3652
[P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3653
[P_SYNC_PARAM] = { 1, sizeof(struct p_header80), receive_SyncParam },
3654
[P_SYNC_PARAM89] = { 1, sizeof(struct p_header80), receive_SyncParam },
3655
[P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
3656
[P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
3657
[P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
3658
[P_STATE] = { 0, sizeof(struct p_state), receive_state },
3659
[P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
3660
[P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
3661
[P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3662
[P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3663
[P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3664
[P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
3665
[P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
3666
/* anything missing from this table is in
3667
* the asender_tbl, see get_asender_cmd */
3668
[P_MAX_CMD] = { 0, 0, NULL },
3669
};
3670
3671
/* All handler functions that expect a sub-header get that sub-heder in
3672
mdev->data.rbuf.header.head.payload.
3673
3674
Usually in mdev->data.rbuf.header.head the callback can find the usual
3675
p_header, but they may not rely on that. Since there is also p_header95 !
3676
*/
3677
3678
static void drbdd(struct drbd_conf *mdev)
3679
{
3680
union p_header *header = &mdev->data.rbuf.header;
3681
unsigned int packet_size;
3682
enum drbd_packets cmd;
3683
size_t shs; /* sub header size */
3684
int rv;
3685
3686
while (get_t_state(&mdev->receiver) == Running) {
3687
drbd_thread_current_set_cpu(mdev);
3688
if (!drbd_recv_header(mdev, &cmd, &packet_size))
3689
goto err_out;
3690
3691
if (unlikely(cmd >= P_MAX_CMD || !drbd_cmd_handler[cmd].function)) {
3692
dev_err(DEV, "unknown packet type %d, l: %d!\n", cmd, packet_size);
3693
goto err_out;
3694
}
3695
3696
shs = drbd_cmd_handler[cmd].pkt_size - sizeof(union p_header);
3697
if (packet_size - shs > 0 && !drbd_cmd_handler[cmd].expect_payload) {
3698
dev_err(DEV, "No payload expected %s l:%d\n", cmdname(cmd), packet_size);
3699
goto err_out;
3700
}
3701
3702
if (shs) {
3703
rv = drbd_recv(mdev, &header->h80.payload, shs);
3704
if (unlikely(rv != shs)) {
3705
if (!signal_pending(current))
3706
dev_warn(DEV, "short read while reading sub header: rv=%d\n", rv);
3707
goto err_out;
3708
}
3709
}
3710
3711
rv = drbd_cmd_handler[cmd].function(mdev, cmd, packet_size - shs);
3712
3713
if (unlikely(!rv)) {
3714
dev_err(DEV, "error receiving %s, l: %d!\n",
3715
cmdname(cmd), packet_size);
3716
goto err_out;
3717
}
3718
}
3719
3720
if (0) {
3721
err_out:
3722
drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3723
}
3724
/* If we leave here, we probably want to update at least the
3725
* "Connected" indicator on stable storage. Do so explicitly here. */
3726
drbd_md_sync(mdev);
3727
}
3728
3729
void drbd_flush_workqueue(struct drbd_conf *mdev)
3730
{
3731
struct drbd_wq_barrier barr;
3732
3733
barr.w.cb = w_prev_work_done;
3734
init_completion(&barr.done);
3735
drbd_queue_work(&mdev->data.work, &barr.w);
3736
wait_for_completion(&barr.done);
3737
}
3738
3739
void drbd_free_tl_hash(struct drbd_conf *mdev)
3740
{
3741
struct hlist_head *h;
3742
3743
spin_lock_irq(&mdev->req_lock);
3744
3745
if (!mdev->tl_hash || mdev->state.conn != C_STANDALONE) {
3746
spin_unlock_irq(&mdev->req_lock);
3747
return;
3748
}
3749
/* paranoia code */
3750
for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3751
if (h->first)
3752
dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3753
(int)(h - mdev->ee_hash), h->first);
3754
kfree(mdev->ee_hash);
3755
mdev->ee_hash = NULL;
3756
mdev->ee_hash_s = 0;
3757
3758
/* paranoia code */
3759
for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
3760
if (h->first)
3761
dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
3762
(int)(h - mdev->tl_hash), h->first);
3763
kfree(mdev->tl_hash);
3764
mdev->tl_hash = NULL;
3765
mdev->tl_hash_s = 0;
3766
spin_unlock_irq(&mdev->req_lock);
3767
}
3768
3769
static void drbd_disconnect(struct drbd_conf *mdev)
3770
{
3771
enum drbd_fencing_p fp;
3772
union drbd_state os, ns;
3773
int rv = SS_UNKNOWN_ERROR;
3774
unsigned int i;
3775
3776
if (mdev->state.conn == C_STANDALONE)
3777
return;
3778
3779
/* asender does not clean up anything. it must not interfere, either */
3780
drbd_thread_stop(&mdev->asender);
3781
drbd_free_sock(mdev);
3782
3783
/* wait for current activity to cease. */
3784
spin_lock_irq(&mdev->req_lock);
3785
_drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3786
_drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3787
_drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3788
spin_unlock_irq(&mdev->req_lock);
3789
3790
/* We do not have data structures that would allow us to
3791
* get the rs_pending_cnt down to 0 again.
3792
* * On C_SYNC_TARGET we do not have any data structures describing
3793
* the pending RSDataRequest's we have sent.
3794
* * On C_SYNC_SOURCE there is no data structure that tracks
3795
* the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3796
* And no, it is not the sum of the reference counts in the
3797
* resync_LRU. The resync_LRU tracks the whole operation including
3798
* the disk-IO, while the rs_pending_cnt only tracks the blocks
3799
* on the fly. */
3800
drbd_rs_cancel_all(mdev);
3801
mdev->rs_total = 0;
3802
mdev->rs_failed = 0;
3803
atomic_set(&mdev->rs_pending_cnt, 0);
3804
wake_up(&mdev->misc_wait);
3805
3806
del_timer(&mdev->request_timer);
3807
3808
/* make sure syncer is stopped and w_resume_next_sg queued */
3809
del_timer_sync(&mdev->resync_timer);
3810
resync_timer_fn((unsigned long)mdev);
3811
3812
/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3813
* w_make_resync_request etc. which may still be on the worker queue
3814
* to be "canceled" */
3815
drbd_flush_workqueue(mdev);
3816
3817
/* This also does reclaim_net_ee(). If we do this too early, we might
3818
* miss some resync ee and pages.*/
3819
drbd_process_done_ee(mdev);
3820
3821
kfree(mdev->p_uuid);
3822
mdev->p_uuid = NULL;
3823
3824
if (!is_susp(mdev->state))
3825
tl_clear(mdev);
3826
3827
dev_info(DEV, "Connection closed\n");
3828
3829
drbd_md_sync(mdev);
3830
3831
fp = FP_DONT_CARE;
3832
if (get_ldev(mdev)) {
3833
fp = mdev->ldev->dc.fencing;
3834
put_ldev(mdev);
3835
}
3836
3837
if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
3838
drbd_try_outdate_peer_async(mdev);
3839
3840
spin_lock_irq(&mdev->req_lock);
3841
os = mdev->state;
3842
if (os.conn >= C_UNCONNECTED) {
3843
/* Do not restart in case we are C_DISCONNECTING */
3844
ns = os;
3845
ns.conn = C_UNCONNECTED;
3846
rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3847
}
3848
spin_unlock_irq(&mdev->req_lock);
3849
3850
if (os.conn == C_DISCONNECTING) {
3851
wait_event(mdev->net_cnt_wait, atomic_read(&mdev->net_cnt) == 0);
3852
3853
crypto_free_hash(mdev->cram_hmac_tfm);
3854
mdev->cram_hmac_tfm = NULL;
3855
3856
kfree(mdev->net_conf);
3857
mdev->net_conf = NULL;
3858
drbd_request_state(mdev, NS(conn, C_STANDALONE));
3859
}
3860
3861
/* serialize with bitmap writeout triggered by the state change,
3862
* if any. */
3863
wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
3864
3865
/* tcp_close and release of sendpage pages can be deferred. I don't
3866
* want to use SO_LINGER, because apparently it can be deferred for
3867
* more than 20 seconds (longest time I checked).
3868
*
3869
* Actually we don't care for exactly when the network stack does its
3870
* put_page(), but release our reference on these pages right here.
3871
*/
3872
i = drbd_release_ee(mdev, &mdev->net_ee);
3873
if (i)
3874
dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3875
i = atomic_read(&mdev->pp_in_use_by_net);
3876
if (i)
3877
dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
3878
i = atomic_read(&mdev->pp_in_use);
3879
if (i)
3880
dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
3881
3882
D_ASSERT(list_empty(&mdev->read_ee));
3883
D_ASSERT(list_empty(&mdev->active_ee));
3884
D_ASSERT(list_empty(&mdev->sync_ee));
3885
D_ASSERT(list_empty(&mdev->done_ee));
3886
3887
/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3888
atomic_set(&mdev->current_epoch->epoch_size, 0);
3889
D_ASSERT(list_empty(&mdev->current_epoch->list));
3890
}
3891
3892
/*
3893
* We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3894
* we can agree on is stored in agreed_pro_version.
3895
*
3896
* feature flags and the reserved array should be enough room for future
3897
* enhancements of the handshake protocol, and possible plugins...
3898
*
3899
* for now, they are expected to be zero, but ignored.
3900
*/
3901
static int drbd_send_handshake(struct drbd_conf *mdev)
3902
{
3903
/* ASSERT current == mdev->receiver ... */
3904
struct p_handshake *p = &mdev->data.sbuf.handshake;
3905
int ok;
3906
3907
if (mutex_lock_interruptible(&mdev->data.mutex)) {
3908
dev_err(DEV, "interrupted during initial handshake\n");
3909
return 0; /* interrupted. not ok. */
3910
}
3911
3912
if (mdev->data.socket == NULL) {
3913
mutex_unlock(&mdev->data.mutex);
3914
return 0;
3915
}
3916
3917
memset(p, 0, sizeof(*p));
3918
p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3919
p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3920
ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3921
(struct p_header80 *)p, sizeof(*p), 0 );
3922
mutex_unlock(&mdev->data.mutex);
3923
return ok;
3924
}
3925
3926
/*
3927
* return values:
3928
* 1 yes, we have a valid connection
3929
* 0 oops, did not work out, please try again
3930
* -1 peer talks different language,
3931
* no point in trying again, please go standalone.
3932
*/
3933
static int drbd_do_handshake(struct drbd_conf *mdev)
3934
{
3935
/* ASSERT current == mdev->receiver ... */
3936
struct p_handshake *p = &mdev->data.rbuf.handshake;
3937
const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
3938
unsigned int length;
3939
enum drbd_packets cmd;
3940
int rv;
3941
3942
rv = drbd_send_handshake(mdev);
3943
if (!rv)
3944
return 0;
3945
3946
rv = drbd_recv_header(mdev, &cmd, &length);
3947
if (!rv)
3948
return 0;
3949
3950
if (cmd != P_HAND_SHAKE) {
3951
dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
3952
cmdname(cmd), cmd);
3953
return -1;
3954
}
3955
3956
if (length != expect) {
3957
dev_err(DEV, "expected HandShake length: %u, received: %u\n",
3958
expect, length);
3959
return -1;
3960
}
3961
3962
rv = drbd_recv(mdev, &p->head.payload, expect);
3963
3964
if (rv != expect) {
3965
if (!signal_pending(current))
3966
dev_warn(DEV, "short read receiving handshake packet: l=%u\n", rv);
3967
return 0;
3968
}
3969
3970
p->protocol_min = be32_to_cpu(p->protocol_min);
3971
p->protocol_max = be32_to_cpu(p->protocol_max);
3972
if (p->protocol_max == 0)
3973
p->protocol_max = p->protocol_min;
3974
3975
if (PRO_VERSION_MAX < p->protocol_min ||
3976
PRO_VERSION_MIN > p->protocol_max)
3977
goto incompat;
3978
3979
mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
3980
3981
dev_info(DEV, "Handshake successful: "
3982
"Agreed network protocol version %d\n", mdev->agreed_pro_version);
3983
3984
return 1;
3985
3986
incompat:
3987
dev_err(DEV, "incompatible DRBD dialects: "
3988
"I support %d-%d, peer supports %d-%d\n",
3989
PRO_VERSION_MIN, PRO_VERSION_MAX,
3990
p->protocol_min, p->protocol_max);
3991
return -1;
3992
}
3993
3994
#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
3995
static int drbd_do_auth(struct drbd_conf *mdev)
3996
{
3997
dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
3998
dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
3999
return -1;
4000
}
4001
#else
4002
#define CHALLENGE_LEN 64
4003
4004
/* Return value:
4005
1 - auth succeeded,
4006
0 - failed, try again (network error),
4007
-1 - auth failed, don't try again.
4008
*/
4009
4010
static int drbd_do_auth(struct drbd_conf *mdev)
4011
{
4012
char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4013
struct scatterlist sg;
4014
char *response = NULL;
4015
char *right_response = NULL;
4016
char *peers_ch = NULL;
4017
unsigned int key_len = strlen(mdev->net_conf->shared_secret);
4018
unsigned int resp_size;
4019
struct hash_desc desc;
4020
enum drbd_packets cmd;
4021
unsigned int length;
4022
int rv;
4023
4024
desc.tfm = mdev->cram_hmac_tfm;
4025
desc.flags = 0;
4026
4027
rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
4028
(u8 *)mdev->net_conf->shared_secret, key_len);
4029
if (rv) {
4030
dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
4031
rv = -1;
4032
goto fail;
4033
}
4034
4035
get_random_bytes(my_challenge, CHALLENGE_LEN);
4036
4037
rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4038
if (!rv)
4039
goto fail;
4040
4041
rv = drbd_recv_header(mdev, &cmd, &length);
4042
if (!rv)
4043
goto fail;
4044
4045
if (cmd != P_AUTH_CHALLENGE) {
4046
dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4047
cmdname(cmd), cmd);
4048
rv = 0;
4049
goto fail;
4050
}
4051
4052
if (length > CHALLENGE_LEN * 2) {
4053
dev_err(DEV, "expected AuthChallenge payload too big.\n");
4054
rv = -1;
4055
goto fail;
4056
}
4057
4058
peers_ch = kmalloc(length, GFP_NOIO);
4059
if (peers_ch == NULL) {
4060
dev_err(DEV, "kmalloc of peers_ch failed\n");
4061
rv = -1;
4062
goto fail;
4063
}
4064
4065
rv = drbd_recv(mdev, peers_ch, length);
4066
4067
if (rv != length) {
4068
if (!signal_pending(current))
4069
dev_warn(DEV, "short read AuthChallenge: l=%u\n", rv);
4070
rv = 0;
4071
goto fail;
4072
}
4073
4074
resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
4075
response = kmalloc(resp_size, GFP_NOIO);
4076
if (response == NULL) {
4077
dev_err(DEV, "kmalloc of response failed\n");
4078
rv = -1;
4079
goto fail;
4080
}
4081
4082
sg_init_table(&sg, 1);
4083
sg_set_buf(&sg, peers_ch, length);
4084
4085
rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4086
if (rv) {
4087
dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4088
rv = -1;
4089
goto fail;
4090
}
4091
4092
rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4093
if (!rv)
4094
goto fail;
4095
4096
rv = drbd_recv_header(mdev, &cmd, &length);
4097
if (!rv)
4098
goto fail;
4099
4100
if (cmd != P_AUTH_RESPONSE) {
4101
dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
4102
cmdname(cmd), cmd);
4103
rv = 0;
4104
goto fail;
4105
}
4106
4107
if (length != resp_size) {
4108
dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4109
rv = 0;
4110
goto fail;
4111
}
4112
4113
rv = drbd_recv(mdev, response , resp_size);
4114
4115
if (rv != resp_size) {
4116
if (!signal_pending(current))
4117
dev_warn(DEV, "short read receiving AuthResponse: l=%u\n", rv);
4118
rv = 0;
4119
goto fail;
4120
}
4121
4122
right_response = kmalloc(resp_size, GFP_NOIO);
4123
if (right_response == NULL) {
4124
dev_err(DEV, "kmalloc of right_response failed\n");
4125
rv = -1;
4126
goto fail;
4127
}
4128
4129
sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4130
4131
rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4132
if (rv) {
4133
dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4134
rv = -1;
4135
goto fail;
4136
}
4137
4138
rv = !memcmp(response, right_response, resp_size);
4139
4140
if (rv)
4141
dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4142
resp_size, mdev->net_conf->cram_hmac_alg);
4143
else
4144
rv = -1;
4145
4146
fail:
4147
kfree(peers_ch);
4148
kfree(response);
4149
kfree(right_response);
4150
4151
return rv;
4152
}
4153
#endif
4154
4155
int drbdd_init(struct drbd_thread *thi)
4156
{
4157
struct drbd_conf *mdev = thi->mdev;
4158
unsigned int minor = mdev_to_minor(mdev);
4159
int h;
4160
4161
sprintf(current->comm, "drbd%d_receiver", minor);
4162
4163
dev_info(DEV, "receiver (re)started\n");
4164
4165
do {
4166
h = drbd_connect(mdev);
4167
if (h == 0) {
4168
drbd_disconnect(mdev);
4169
schedule_timeout_interruptible(HZ);
4170
}
4171
if (h == -1) {
4172
dev_warn(DEV, "Discarding network configuration.\n");
4173
drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4174
}
4175
} while (h == 0);
4176
4177
if (h > 0) {
4178
if (get_net_conf(mdev)) {
4179
drbdd(mdev);
4180
put_net_conf(mdev);
4181
}
4182
}
4183
4184
drbd_disconnect(mdev);
4185
4186
dev_info(DEV, "receiver terminated\n");
4187
return 0;
4188
}
4189
4190
/* ********* acknowledge sender ******** */
4191
4192
static int got_RqSReply(struct drbd_conf *mdev, struct p_header80 *h)
4193
{
4194
struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4195
4196
int retcode = be32_to_cpu(p->retcode);
4197
4198
if (retcode >= SS_SUCCESS) {
4199
set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4200
} else {
4201
set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4202
dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4203
drbd_set_st_err_str(retcode), retcode);
4204
}
4205
wake_up(&mdev->state_wait);
4206
4207
return true;
4208
}
4209
4210
static int got_Ping(struct drbd_conf *mdev, struct p_header80 *h)
4211
{
4212
return drbd_send_ping_ack(mdev);
4213
4214
}
4215
4216
static int got_PingAck(struct drbd_conf *mdev, struct p_header80 *h)
4217
{
4218
/* restore idle timeout */
4219
mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
4220
if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4221
wake_up(&mdev->misc_wait);
4222
4223
return true;
4224
}
4225
4226
static int got_IsInSync(struct drbd_conf *mdev, struct p_header80 *h)
4227
{
4228
struct p_block_ack *p = (struct p_block_ack *)h;
4229
sector_t sector = be64_to_cpu(p->sector);
4230
int blksize = be32_to_cpu(p->blksize);
4231
4232
D_ASSERT(mdev->agreed_pro_version >= 89);
4233
4234
update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4235
4236
if (get_ldev(mdev)) {
4237
drbd_rs_complete_io(mdev, sector);
4238
drbd_set_in_sync(mdev, sector, blksize);
4239
/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4240
mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4241
put_ldev(mdev);
4242
}
4243
dec_rs_pending(mdev);
4244
atomic_add(blksize >> 9, &mdev->rs_sect_in);
4245
4246
return true;
4247
}
4248
4249
/* when we receive the ACK for a write request,
4250
* verify that we actually know about it */
4251
static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4252
u64 id, sector_t sector)
4253
{
4254
struct hlist_head *slot = tl_hash_slot(mdev, sector);
4255
struct hlist_node *n;
4256
struct drbd_request *req;
4257
4258
hlist_for_each_entry(req, n, slot, collision) {
4259
if ((unsigned long)req == (unsigned long)id) {
4260
if (req->sector != sector) {
4261
dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4262
"wrong sector (%llus versus %llus)\n", req,
4263
(unsigned long long)req->sector,
4264
(unsigned long long)sector);
4265
break;
4266
}
4267
return req;
4268
}
4269
}
4270
return NULL;
4271
}
4272
4273
typedef struct drbd_request *(req_validator_fn)
4274
(struct drbd_conf *mdev, u64 id, sector_t sector);
4275
4276
static int validate_req_change_req_state(struct drbd_conf *mdev,
4277
u64 id, sector_t sector, req_validator_fn validator,
4278
const char *func, enum drbd_req_event what)
4279
{
4280
struct drbd_request *req;
4281
struct bio_and_error m;
4282
4283
spin_lock_irq(&mdev->req_lock);
4284
req = validator(mdev, id, sector);
4285
if (unlikely(!req)) {
4286
spin_unlock_irq(&mdev->req_lock);
4287
4288
dev_err(DEV, "%s: failed to find req %p, sector %llus\n", func,
4289
(void *)(unsigned long)id, (unsigned long long)sector);
4290
return false;
4291
}
4292
__req_mod(req, what, &m);
4293
spin_unlock_irq(&mdev->req_lock);
4294
4295
if (m.bio)
4296
complete_master_bio(mdev, &m);
4297
return true;
4298
}
4299
4300
static int got_BlockAck(struct drbd_conf *mdev, struct p_header80 *h)
4301
{
4302
struct p_block_ack *p = (struct p_block_ack *)h;
4303
sector_t sector = be64_to_cpu(p->sector);
4304
int blksize = be32_to_cpu(p->blksize);
4305
enum drbd_req_event what;
4306
4307
update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4308
4309
if (is_syncer_block_id(p->block_id)) {
4310
drbd_set_in_sync(mdev, sector, blksize);
4311
dec_rs_pending(mdev);
4312
return true;
4313
}
4314
switch (be16_to_cpu(h->command)) {
4315
case P_RS_WRITE_ACK:
4316
D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4317
what = write_acked_by_peer_and_sis;
4318
break;
4319
case P_WRITE_ACK:
4320
D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4321
what = write_acked_by_peer;
4322
break;
4323
case P_RECV_ACK:
4324
D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4325
what = recv_acked_by_peer;
4326
break;
4327
case P_DISCARD_ACK:
4328
D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4329
what = conflict_discarded_by_peer;
4330
break;
4331
default:
4332
D_ASSERT(0);
4333
return false;
4334
}
4335
4336
return validate_req_change_req_state(mdev, p->block_id, sector,
4337
_ack_id_to_req, __func__ , what);
4338
}
4339
4340
static int got_NegAck(struct drbd_conf *mdev, struct p_header80 *h)
4341
{
4342
struct p_block_ack *p = (struct p_block_ack *)h;
4343
sector_t sector = be64_to_cpu(p->sector);
4344
int size = be32_to_cpu(p->blksize);
4345
struct drbd_request *req;
4346
struct bio_and_error m;
4347
4348
update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4349
4350
if (is_syncer_block_id(p->block_id)) {
4351
dec_rs_pending(mdev);
4352
drbd_rs_failed_io(mdev, sector, size);
4353
return true;
4354
}
4355
4356
spin_lock_irq(&mdev->req_lock);
4357
req = _ack_id_to_req(mdev, p->block_id, sector);
4358
if (!req) {
4359
spin_unlock_irq(&mdev->req_lock);
4360
if (mdev->net_conf->wire_protocol == DRBD_PROT_A ||
4361
mdev->net_conf->wire_protocol == DRBD_PROT_B) {
4362
/* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4363
The master bio might already be completed, therefore the
4364
request is no longer in the collision hash.
4365
=> Do not try to validate block_id as request. */
4366
/* In Protocol B we might already have got a P_RECV_ACK
4367
but then get a P_NEG_ACK after wards. */
4368
drbd_set_out_of_sync(mdev, sector, size);
4369
return true;
4370
} else {
4371
dev_err(DEV, "%s: failed to find req %p, sector %llus\n", __func__,
4372
(void *)(unsigned long)p->block_id, (unsigned long long)sector);
4373
return false;
4374
}
4375
}
4376
__req_mod(req, neg_acked, &m);
4377
spin_unlock_irq(&mdev->req_lock);
4378
4379
if (m.bio)
4380
complete_master_bio(mdev, &m);
4381
return true;
4382
}
4383
4384
static int got_NegDReply(struct drbd_conf *mdev, struct p_header80 *h)
4385
{
4386
struct p_block_ack *p = (struct p_block_ack *)h;
4387
sector_t sector = be64_to_cpu(p->sector);
4388
4389
update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4390
dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4391
(unsigned long long)sector, be32_to_cpu(p->blksize));
4392
4393
return validate_req_change_req_state(mdev, p->block_id, sector,
4394
_ar_id_to_req, __func__ , neg_acked);
4395
}
4396
4397
static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header80 *h)
4398
{
4399
sector_t sector;
4400
int size;
4401
struct p_block_ack *p = (struct p_block_ack *)h;
4402
4403
sector = be64_to_cpu(p->sector);
4404
size = be32_to_cpu(p->blksize);
4405
4406
update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4407
4408
dec_rs_pending(mdev);
4409
4410
if (get_ldev_if_state(mdev, D_FAILED)) {
4411
drbd_rs_complete_io(mdev, sector);
4412
switch (be16_to_cpu(h->command)) {
4413
case P_NEG_RS_DREPLY:
4414
drbd_rs_failed_io(mdev, sector, size);
4415
case P_RS_CANCEL:
4416
break;
4417
default:
4418
D_ASSERT(0);
4419
put_ldev(mdev);
4420
return false;
4421
}
4422
put_ldev(mdev);
4423
}
4424
4425
return true;
4426
}
4427
4428
static int got_BarrierAck(struct drbd_conf *mdev, struct p_header80 *h)
4429
{
4430
struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4431
4432
tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4433
4434
if (mdev->state.conn == C_AHEAD &&
4435
atomic_read(&mdev->ap_in_flight) == 0 &&
4436
!test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4437
mdev->start_resync_timer.expires = jiffies + HZ;
4438
add_timer(&mdev->start_resync_timer);
4439
}
4440
4441
return true;
4442
}
4443
4444
static int got_OVResult(struct drbd_conf *mdev, struct p_header80 *h)
4445
{
4446
struct p_block_ack *p = (struct p_block_ack *)h;
4447
struct drbd_work *w;
4448
sector_t sector;
4449
int size;
4450
4451
sector = be64_to_cpu(p->sector);
4452
size = be32_to_cpu(p->blksize);
4453
4454
update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4455
4456
if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4457
drbd_ov_oos_found(mdev, sector, size);
4458
else
4459
ov_oos_print(mdev);
4460
4461
if (!get_ldev(mdev))
4462
return true;
4463
4464
drbd_rs_complete_io(mdev, sector);
4465
dec_rs_pending(mdev);
4466
4467
--mdev->ov_left;
4468
4469
/* let's advance progress step marks only for every other megabyte */
4470
if ((mdev->ov_left & 0x200) == 0x200)
4471
drbd_advance_rs_marks(mdev, mdev->ov_left);
4472
4473
if (mdev->ov_left == 0) {
4474
w = kmalloc(sizeof(*w), GFP_NOIO);
4475
if (w) {
4476
w->cb = w_ov_finished;
4477
drbd_queue_work_front(&mdev->data.work, w);
4478
} else {
4479
dev_err(DEV, "kmalloc(w) failed.");
4480
ov_oos_print(mdev);
4481
drbd_resync_finished(mdev);
4482
}
4483
}
4484
put_ldev(mdev);
4485
return true;
4486
}
4487
4488
static int got_skip(struct drbd_conf *mdev, struct p_header80 *h)
4489
{
4490
return true;
4491
}
4492
4493
struct asender_cmd {
4494
size_t pkt_size;
4495
int (*process)(struct drbd_conf *mdev, struct p_header80 *h);
4496
};
4497
4498
static struct asender_cmd *get_asender_cmd(int cmd)
4499
{
4500
static struct asender_cmd asender_tbl[] = {
4501
/* anything missing from this table is in
4502
* the drbd_cmd_handler (drbd_default_handler) table,
4503
* see the beginning of drbdd() */
4504
[P_PING] = { sizeof(struct p_header80), got_Ping },
4505
[P_PING_ACK] = { sizeof(struct p_header80), got_PingAck },
4506
[P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4507
[P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4508
[P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4509
[P_DISCARD_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4510
[P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
4511
[P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
4512
[P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply},
4513
[P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
4514
[P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
4515
[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4516
[P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
4517
[P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
4518
[P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply},
4519
[P_MAX_CMD] = { 0, NULL },
4520
};
4521
if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4522
return NULL;
4523
return &asender_tbl[cmd];
4524
}
4525
4526
int drbd_asender(struct drbd_thread *thi)
4527
{
4528
struct drbd_conf *mdev = thi->mdev;
4529
struct p_header80 *h = &mdev->meta.rbuf.header.h80;
4530
struct asender_cmd *cmd = NULL;
4531
4532
int rv, len;
4533
void *buf = h;
4534
int received = 0;
4535
int expect = sizeof(struct p_header80);
4536
int empty;
4537
int ping_timeout_active = 0;
4538
4539
sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4540
4541
current->policy = SCHED_RR; /* Make this a realtime task! */
4542
current->rt_priority = 2; /* more important than all other tasks */
4543
4544
while (get_t_state(thi) == Running) {
4545
drbd_thread_current_set_cpu(mdev);
4546
if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4547
ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4548
mdev->meta.socket->sk->sk_rcvtimeo =
4549
mdev->net_conf->ping_timeo*HZ/10;
4550
ping_timeout_active = 1;
4551
}
4552
4553
/* conditionally cork;
4554
* it may hurt latency if we cork without much to send */
4555
if (!mdev->net_conf->no_cork &&
4556
3 < atomic_read(&mdev->unacked_cnt))
4557
drbd_tcp_cork(mdev->meta.socket);
4558
while (1) {
4559
clear_bit(SIGNAL_ASENDER, &mdev->flags);
4560
flush_signals(current);
4561
if (!drbd_process_done_ee(mdev))
4562
goto reconnect;
4563
/* to avoid race with newly queued ACKs */
4564
set_bit(SIGNAL_ASENDER, &mdev->flags);
4565
spin_lock_irq(&mdev->req_lock);
4566
empty = list_empty(&mdev->done_ee);
4567
spin_unlock_irq(&mdev->req_lock);
4568
/* new ack may have been queued right here,
4569
* but then there is also a signal pending,
4570
* and we start over... */
4571
if (empty)
4572
break;
4573
}
4574
/* but unconditionally uncork unless disabled */
4575
if (!mdev->net_conf->no_cork)
4576
drbd_tcp_uncork(mdev->meta.socket);
4577
4578
/* short circuit, recv_msg would return EINTR anyways. */
4579
if (signal_pending(current))
4580
continue;
4581
4582
rv = drbd_recv_short(mdev, mdev->meta.socket,
4583
buf, expect-received, 0);
4584
clear_bit(SIGNAL_ASENDER, &mdev->flags);
4585
4586
flush_signals(current);
4587
4588
/* Note:
4589
* -EINTR (on meta) we got a signal
4590
* -EAGAIN (on meta) rcvtimeo expired
4591
* -ECONNRESET other side closed the connection
4592
* -ERESTARTSYS (on data) we got a signal
4593
* rv < 0 other than above: unexpected error!
4594
* rv == expected: full header or command
4595
* rv < expected: "woken" by signal during receive
4596
* rv == 0 : "connection shut down by peer"
4597
*/
4598
if (likely(rv > 0)) {
4599
received += rv;
4600
buf += rv;
4601
} else if (rv == 0) {
4602
dev_err(DEV, "meta connection shut down by peer.\n");
4603
goto reconnect;
4604
} else if (rv == -EAGAIN) {
4605
/* If the data socket received something meanwhile,
4606
* that is good enough: peer is still alive. */
4607
if (time_after(mdev->last_received,
4608
jiffies - mdev->meta.socket->sk->sk_rcvtimeo))
4609
continue;
4610
if (ping_timeout_active) {
4611
dev_err(DEV, "PingAck did not arrive in time.\n");
4612
goto reconnect;
4613
}
4614
set_bit(SEND_PING, &mdev->flags);
4615
continue;
4616
} else if (rv == -EINTR) {
4617
continue;
4618
} else {
4619
dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4620
goto reconnect;
4621
}
4622
4623
if (received == expect && cmd == NULL) {
4624
if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4625
dev_err(DEV, "magic?? on meta m: 0x%08x c: %d l: %d\n",
4626
be32_to_cpu(h->magic),
4627
be16_to_cpu(h->command),
4628
be16_to_cpu(h->length));
4629
goto reconnect;
4630
}
4631
cmd = get_asender_cmd(be16_to_cpu(h->command));
4632
len = be16_to_cpu(h->length);
4633
if (unlikely(cmd == NULL)) {
4634
dev_err(DEV, "unknown command?? on meta m: 0x%08x c: %d l: %d\n",
4635
be32_to_cpu(h->magic),
4636
be16_to_cpu(h->command),
4637
be16_to_cpu(h->length));
4638
goto disconnect;
4639
}
4640
expect = cmd->pkt_size;
4641
ERR_IF(len != expect-sizeof(struct p_header80))
4642
goto reconnect;
4643
}
4644
if (received == expect) {
4645
mdev->last_received = jiffies;
4646
D_ASSERT(cmd != NULL);
4647
if (!cmd->process(mdev, h))
4648
goto reconnect;
4649
4650
/* the idle_timeout (ping-int)
4651
* has been restored in got_PingAck() */
4652
if (cmd == get_asender_cmd(P_PING_ACK))
4653
ping_timeout_active = 0;
4654
4655
buf = h;
4656
received = 0;
4657
expect = sizeof(struct p_header80);
4658
cmd = NULL;
4659
}
4660
}
4661
4662
if (0) {
4663
reconnect:
4664
drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4665
drbd_md_sync(mdev);
4666
}
4667
if (0) {
4668
disconnect:
4669
drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4670
drbd_md_sync(mdev);
4671
}
4672
clear_bit(SIGNAL_ASENDER, &mdev->flags);
4673
4674
D_ASSERT(mdev->state.conn < C_CONNECTED);
4675
dev_info(DEV, "asender terminated\n");
4676
4677
return 0;
4678
}
4679
4680