Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/fs/dlm/member.c
50083 views
1
// SPDX-License-Identifier: GPL-2.0-only
2
/******************************************************************************
3
*******************************************************************************
4
**
5
** Copyright (C) 2005-2011 Red Hat, Inc. All rights reserved.
6
**
7
**
8
*******************************************************************************
9
******************************************************************************/
10
11
#include "dlm_internal.h"
12
#include "lockspace.h"
13
#include "member.h"
14
#include "recoverd.h"
15
#include "recover.h"
16
#include "rcom.h"
17
#include "config.h"
18
#include "midcomms.h"
19
#include "lowcomms.h"
20
21
int dlm_slots_version(const struct dlm_header *h)
22
{
23
if ((le32_to_cpu(h->h_version) & 0x0000FFFF) < DLM_HEADER_SLOTS)
24
return 0;
25
return 1;
26
}
27
28
void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc,
29
struct dlm_member *memb)
30
{
31
struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
32
33
if (!dlm_slots_version(&rc->rc_header))
34
return;
35
36
memb->slot = le16_to_cpu(rf->rf_our_slot);
37
memb->generation = le32_to_cpu(rf->rf_generation);
38
}
39
40
void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc)
41
{
42
struct dlm_slot *slot;
43
struct rcom_slot *ro;
44
int i;
45
46
ro = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
47
48
/* ls_slots array is sparse, but not rcom_slots */
49
50
for (i = 0; i < ls->ls_slots_size; i++) {
51
slot = &ls->ls_slots[i];
52
if (!slot->nodeid)
53
continue;
54
ro->ro_nodeid = cpu_to_le32(slot->nodeid);
55
ro->ro_slot = cpu_to_le16(slot->slot);
56
ro++;
57
}
58
}
59
60
#define SLOT_DEBUG_LINE 128
61
62
static void log_slots(struct dlm_ls *ls, uint32_t gen, int num_slots,
63
struct rcom_slot *ro0, struct dlm_slot *array,
64
int array_size)
65
{
66
char line[SLOT_DEBUG_LINE];
67
int len = SLOT_DEBUG_LINE - 1;
68
int pos = 0;
69
int ret, i;
70
71
memset(line, 0, sizeof(line));
72
73
if (array) {
74
for (i = 0; i < array_size; i++) {
75
if (!array[i].nodeid)
76
continue;
77
78
ret = snprintf(line + pos, len - pos, " %d:%d",
79
array[i].slot, array[i].nodeid);
80
if (ret >= len - pos)
81
break;
82
pos += ret;
83
}
84
} else if (ro0) {
85
for (i = 0; i < num_slots; i++) {
86
ret = snprintf(line + pos, len - pos, " %d:%d",
87
ro0[i].ro_slot, ro0[i].ro_nodeid);
88
if (ret >= len - pos)
89
break;
90
pos += ret;
91
}
92
}
93
94
log_rinfo(ls, "generation %u slots %d%s", gen, num_slots, line);
95
}
96
97
int dlm_slots_copy_in(struct dlm_ls *ls)
98
{
99
struct dlm_member *memb;
100
struct dlm_rcom *rc = ls->ls_recover_buf;
101
struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
102
struct rcom_slot *ro0, *ro;
103
int our_nodeid = dlm_our_nodeid();
104
int i, num_slots;
105
uint32_t gen;
106
107
if (!dlm_slots_version(&rc->rc_header))
108
return -1;
109
110
gen = le32_to_cpu(rf->rf_generation);
111
if (gen <= ls->ls_generation) {
112
log_error(ls, "dlm_slots_copy_in gen %u old %u",
113
gen, ls->ls_generation);
114
}
115
ls->ls_generation = gen;
116
117
num_slots = le16_to_cpu(rf->rf_num_slots);
118
if (!num_slots)
119
return -1;
120
121
ro0 = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
122
123
log_slots(ls, gen, num_slots, ro0, NULL, 0);
124
125
list_for_each_entry(memb, &ls->ls_nodes, list) {
126
for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
127
if (le32_to_cpu(ro->ro_nodeid) != memb->nodeid)
128
continue;
129
memb->slot = le16_to_cpu(ro->ro_slot);
130
memb->slot_prev = memb->slot;
131
break;
132
}
133
134
if (memb->nodeid == our_nodeid) {
135
if (ls->ls_slot && ls->ls_slot != memb->slot) {
136
log_error(ls, "dlm_slots_copy_in our slot "
137
"changed %d %d", ls->ls_slot,
138
memb->slot);
139
return -1;
140
}
141
142
if (!ls->ls_slot)
143
ls->ls_slot = memb->slot;
144
}
145
146
if (!memb->slot) {
147
log_error(ls, "dlm_slots_copy_in nodeid %d no slot",
148
memb->nodeid);
149
return -1;
150
}
151
}
152
153
return 0;
154
}
155
156
/* for any nodes that do not support slots, we will not have set memb->slot
157
in wait_status_all(), so memb->slot will remain -1, and we will not
158
assign slots or set ls_num_slots here */
159
160
int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size,
161
struct dlm_slot **slots_out, uint32_t *gen_out)
162
{
163
struct dlm_member *memb;
164
struct dlm_slot *array;
165
int our_nodeid = dlm_our_nodeid();
166
int array_size, max_slots, i;
167
int need = 0;
168
int max = 0;
169
int num = 0;
170
uint32_t gen = 0;
171
172
/* our own memb struct will have slot -1 gen 0 */
173
174
list_for_each_entry(memb, &ls->ls_nodes, list) {
175
if (memb->nodeid == our_nodeid) {
176
memb->slot = ls->ls_slot;
177
memb->generation = ls->ls_generation;
178
break;
179
}
180
}
181
182
list_for_each_entry(memb, &ls->ls_nodes, list) {
183
if (memb->generation > gen)
184
gen = memb->generation;
185
186
/* node doesn't support slots */
187
188
if (memb->slot == -1)
189
return -1;
190
191
/* node needs a slot assigned */
192
193
if (!memb->slot)
194
need++;
195
196
/* node has a slot assigned */
197
198
num++;
199
200
if (!max || max < memb->slot)
201
max = memb->slot;
202
203
/* sanity check, once slot is assigned it shouldn't change */
204
205
if (memb->slot_prev && memb->slot && memb->slot_prev != memb->slot) {
206
log_error(ls, "nodeid %d slot changed %d %d",
207
memb->nodeid, memb->slot_prev, memb->slot);
208
return -1;
209
}
210
memb->slot_prev = memb->slot;
211
}
212
213
array_size = max + need;
214
array = kcalloc(array_size, sizeof(*array), GFP_NOFS);
215
if (!array)
216
return -ENOMEM;
217
218
num = 0;
219
220
/* fill in slots (offsets) that are used */
221
222
list_for_each_entry(memb, &ls->ls_nodes, list) {
223
if (!memb->slot)
224
continue;
225
226
if (memb->slot > array_size) {
227
log_error(ls, "invalid slot number %d", memb->slot);
228
kfree(array);
229
return -1;
230
}
231
232
array[memb->slot - 1].nodeid = memb->nodeid;
233
array[memb->slot - 1].slot = memb->slot;
234
num++;
235
}
236
237
/* assign new slots from unused offsets */
238
239
list_for_each_entry(memb, &ls->ls_nodes, list) {
240
if (memb->slot)
241
continue;
242
243
for (i = 0; i < array_size; i++) {
244
if (array[i].nodeid)
245
continue;
246
247
memb->slot = i + 1;
248
memb->slot_prev = memb->slot;
249
array[i].nodeid = memb->nodeid;
250
array[i].slot = memb->slot;
251
num++;
252
253
if (!ls->ls_slot && memb->nodeid == our_nodeid)
254
ls->ls_slot = memb->slot;
255
break;
256
}
257
258
if (!memb->slot) {
259
log_error(ls, "no free slot found");
260
kfree(array);
261
return -1;
262
}
263
}
264
265
gen++;
266
267
log_slots(ls, gen, num, NULL, array, array_size);
268
269
max_slots = (DLM_MAX_APP_BUFSIZE - sizeof(struct dlm_rcom) -
270
sizeof(struct rcom_config)) / sizeof(struct rcom_slot);
271
272
if (num > max_slots) {
273
log_error(ls, "num_slots %d exceeds max_slots %d",
274
num, max_slots);
275
kfree(array);
276
return -1;
277
}
278
279
*gen_out = gen;
280
*slots_out = array;
281
*slots_size = array_size;
282
*num_slots = num;
283
return 0;
284
}
285
286
static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
287
{
288
struct dlm_member *memb = NULL;
289
struct list_head *tmp;
290
struct list_head *newlist = &new->list;
291
struct list_head *head = &ls->ls_nodes;
292
293
list_for_each(tmp, head) {
294
memb = list_entry(tmp, struct dlm_member, list);
295
if (new->nodeid < memb->nodeid)
296
break;
297
}
298
299
if (!memb)
300
list_add_tail(newlist, head);
301
else {
302
list_add_tail(newlist, tmp);
303
}
304
}
305
306
static int add_remote_member(int nodeid)
307
{
308
int error;
309
310
if (nodeid == dlm_our_nodeid())
311
return 0;
312
313
error = dlm_lowcomms_connect_node(nodeid);
314
if (error < 0)
315
return error;
316
317
dlm_midcomms_add_member(nodeid);
318
return 0;
319
}
320
321
static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node)
322
{
323
struct dlm_member *memb;
324
int error;
325
326
memb = kzalloc(sizeof(*memb), GFP_NOFS);
327
if (!memb)
328
return -ENOMEM;
329
330
memb->nodeid = node->nodeid;
331
memb->weight = node->weight;
332
memb->comm_seq = node->comm_seq;
333
334
error = add_remote_member(node->nodeid);
335
if (error < 0) {
336
kfree(memb);
337
return error;
338
}
339
340
add_ordered_member(ls, memb);
341
ls->ls_num_nodes++;
342
return 0;
343
}
344
345
static struct dlm_member *find_memb(struct list_head *head, int nodeid)
346
{
347
struct dlm_member *memb;
348
349
list_for_each_entry(memb, head, list) {
350
if (memb->nodeid == nodeid)
351
return memb;
352
}
353
return NULL;
354
}
355
356
int dlm_is_member(struct dlm_ls *ls, int nodeid)
357
{
358
if (find_memb(&ls->ls_nodes, nodeid))
359
return 1;
360
return 0;
361
}
362
363
int dlm_is_removed(struct dlm_ls *ls, int nodeid)
364
{
365
WARN_ON_ONCE(!nodeid || nodeid == -1);
366
367
if (find_memb(&ls->ls_nodes_gone, nodeid))
368
return 1;
369
return 0;
370
}
371
372
static void clear_memb_list(struct list_head *head,
373
void (*after_del)(int nodeid))
374
{
375
struct dlm_member *memb;
376
377
while (!list_empty(head)) {
378
memb = list_entry(head->next, struct dlm_member, list);
379
list_del(&memb->list);
380
if (after_del)
381
after_del(memb->nodeid);
382
kfree(memb);
383
}
384
}
385
386
static void remove_remote_member(int nodeid)
387
{
388
if (nodeid == dlm_our_nodeid())
389
return;
390
391
dlm_midcomms_remove_member(nodeid);
392
}
393
394
void dlm_clear_members(struct dlm_ls *ls)
395
{
396
clear_memb_list(&ls->ls_nodes, remove_remote_member);
397
ls->ls_num_nodes = 0;
398
}
399
400
void dlm_clear_members_gone(struct dlm_ls *ls)
401
{
402
clear_memb_list(&ls->ls_nodes_gone, NULL);
403
}
404
405
static void make_member_array(struct dlm_ls *ls)
406
{
407
struct dlm_member *memb;
408
int i, w, x = 0, total = 0, all_zero = 0, *array;
409
410
kfree(ls->ls_node_array);
411
ls->ls_node_array = NULL;
412
413
list_for_each_entry(memb, &ls->ls_nodes, list) {
414
if (memb->weight)
415
total += memb->weight;
416
}
417
418
/* all nodes revert to weight of 1 if all have weight 0 */
419
420
if (!total) {
421
total = ls->ls_num_nodes;
422
all_zero = 1;
423
}
424
425
ls->ls_total_weight = total;
426
array = kmalloc_array(total, sizeof(*array), GFP_NOFS);
427
if (!array)
428
return;
429
430
list_for_each_entry(memb, &ls->ls_nodes, list) {
431
if (!all_zero && !memb->weight)
432
continue;
433
434
if (all_zero)
435
w = 1;
436
else
437
w = memb->weight;
438
439
DLM_ASSERT(x < total, printk("total %d x %d\n", total, x););
440
441
for (i = 0; i < w; i++)
442
array[x++] = memb->nodeid;
443
}
444
445
ls->ls_node_array = array;
446
}
447
448
/* send a status request to all members just to establish comms connections */
449
450
static int ping_members(struct dlm_ls *ls, uint64_t seq)
451
{
452
struct dlm_member *memb;
453
int error = 0;
454
455
list_for_each_entry(memb, &ls->ls_nodes, list) {
456
if (dlm_recovery_stopped(ls)) {
457
error = -EINTR;
458
break;
459
}
460
error = dlm_rcom_status(ls, memb->nodeid, 0, seq);
461
if (error)
462
break;
463
}
464
if (error)
465
log_rinfo(ls, "ping_members aborted %d last nodeid %d",
466
error, ls->ls_recover_nodeid);
467
return error;
468
}
469
470
static void dlm_lsop_recover_prep(struct dlm_ls *ls)
471
{
472
if (!ls->ls_ops || !ls->ls_ops->recover_prep)
473
return;
474
ls->ls_ops->recover_prep(ls->ls_ops_arg);
475
}
476
477
static void dlm_lsop_recover_slot(struct dlm_ls *ls, struct dlm_member *memb,
478
unsigned int release_recover)
479
{
480
struct dlm_slot slot;
481
uint32_t seq;
482
int error;
483
484
if (!ls->ls_ops || !ls->ls_ops->recover_slot)
485
return;
486
487
/* if there is no comms connection with this node
488
or the present comms connection is newer
489
than the one when this member was added, then
490
we consider the node to have failed (versus
491
being removed due to dlm_release_lockspace) */
492
493
error = dlm_comm_seq(memb->nodeid, &seq, false);
494
495
if (!release_recover && !error && seq == memb->comm_seq)
496
return;
497
498
slot.nodeid = memb->nodeid;
499
slot.slot = memb->slot;
500
501
ls->ls_ops->recover_slot(ls->ls_ops_arg, &slot);
502
}
503
504
void dlm_lsop_recover_done(struct dlm_ls *ls)
505
{
506
struct dlm_member *memb;
507
struct dlm_slot *slots;
508
int i, num;
509
510
if (!ls->ls_ops || !ls->ls_ops->recover_done)
511
return;
512
513
num = ls->ls_num_nodes;
514
slots = kcalloc(num, sizeof(*slots), GFP_KERNEL);
515
if (!slots)
516
return;
517
518
i = 0;
519
list_for_each_entry(memb, &ls->ls_nodes, list) {
520
if (i == num) {
521
log_error(ls, "dlm_lsop_recover_done bad num %d", num);
522
goto out;
523
}
524
slots[i].nodeid = memb->nodeid;
525
slots[i].slot = memb->slot;
526
i++;
527
}
528
529
ls->ls_ops->recover_done(ls->ls_ops_arg, slots, num,
530
ls->ls_slot, ls->ls_generation);
531
out:
532
kfree(slots);
533
}
534
535
static struct dlm_config_node *find_config_node(struct dlm_recover *rv,
536
int nodeid)
537
{
538
int i;
539
540
for (i = 0; i < rv->nodes_count; i++) {
541
if (rv->nodes[i].nodeid == nodeid)
542
return &rv->nodes[i];
543
}
544
return NULL;
545
}
546
547
int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
548
{
549
struct dlm_member *memb, *safe;
550
struct dlm_config_node *node;
551
int i, error, neg = 0, low = -1;
552
unsigned int release_recover;
553
554
/* previously removed members that we've not finished removing need to
555
* count as a negative change so the "neg" recovery steps will happen
556
*
557
* This functionality must report all member changes to lsops or
558
* midcomms layer and must never return before.
559
*/
560
561
list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
562
log_rinfo(ls, "prev removed member %d", memb->nodeid);
563
neg++;
564
}
565
566
/* move departed members from ls_nodes to ls_nodes_gone */
567
568
list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
569
node = find_config_node(rv, memb->nodeid);
570
if (!node) {
571
log_error(ls, "remove member %d invalid",
572
memb->nodeid);
573
return -EFAULT;
574
}
575
576
if (!node->new && !node->gone)
577
continue;
578
579
release_recover = 0;
580
581
if (node->gone) {
582
release_recover = node->release_recover;
583
log_rinfo(ls, "remove member %d%s", memb->nodeid,
584
release_recover ? " (release_recover)" : "");
585
} else {
586
/* removed and re-added */
587
log_rinfo(ls, "remove member %d comm_seq %u %u",
588
memb->nodeid, memb->comm_seq, node->comm_seq);
589
}
590
591
neg++;
592
list_move(&memb->list, &ls->ls_nodes_gone);
593
remove_remote_member(memb->nodeid);
594
ls->ls_num_nodes--;
595
dlm_lsop_recover_slot(ls, memb, release_recover);
596
}
597
598
/* add new members to ls_nodes */
599
600
for (i = 0; i < rv->nodes_count; i++) {
601
node = &rv->nodes[i];
602
if (node->gone)
603
continue;
604
605
if (dlm_is_member(ls, node->nodeid))
606
continue;
607
error = dlm_add_member(ls, node);
608
if (error)
609
return error;
610
611
log_rinfo(ls, "add member %d", node->nodeid);
612
}
613
614
list_for_each_entry(memb, &ls->ls_nodes, list) {
615
if (low == -1 || memb->nodeid < low)
616
low = memb->nodeid;
617
}
618
ls->ls_low_nodeid = low;
619
620
make_member_array(ls);
621
*neg_out = neg;
622
623
error = ping_members(ls, rv->seq);
624
log_rinfo(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes);
625
return error;
626
}
627
628
/* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
629
dlm_ls_start() is called on any of them to start the new recovery. */
630
631
int dlm_ls_stop(struct dlm_ls *ls)
632
{
633
int new;
634
635
/*
636
* Prevent dlm_recv from being in the middle of something when we do
637
* the stop. This includes ensuring dlm_recv isn't processing a
638
* recovery message (rcom), while dlm_recoverd is aborting and
639
* resetting things from an in-progress recovery. i.e. we want
640
* dlm_recoverd to abort its recovery without worrying about dlm_recv
641
* processing an rcom at the same time. Stopping dlm_recv also makes
642
* it easy for dlm_receive_message() to check locking stopped and add a
643
* message to the requestqueue without races.
644
*/
645
646
write_lock_bh(&ls->ls_recv_active);
647
648
/*
649
* Abort any recovery that's in progress (see RECOVER_STOP,
650
* dlm_recovery_stopped()) and tell any other threads running in the
651
* dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
652
*/
653
654
spin_lock_bh(&ls->ls_recover_lock);
655
set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
656
new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
657
if (new)
658
timer_delete_sync(&ls->ls_scan_timer);
659
ls->ls_recover_seq++;
660
661
/* activate requestqueue and stop processing */
662
write_lock_bh(&ls->ls_requestqueue_lock);
663
set_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags);
664
write_unlock_bh(&ls->ls_requestqueue_lock);
665
spin_unlock_bh(&ls->ls_recover_lock);
666
667
/*
668
* Let dlm_recv run again, now any normal messages will be saved on the
669
* requestqueue for later.
670
*/
671
672
write_unlock_bh(&ls->ls_recv_active);
673
674
/*
675
* This in_recovery lock does two things:
676
* 1) Keeps this function from returning until all threads are out
677
* of locking routines and locking is truly stopped.
678
* 2) Keeps any new requests from being processed until it's unlocked
679
* when recovery is complete.
680
*/
681
682
if (new) {
683
set_bit(LSFL_RECOVER_DOWN, &ls->ls_flags);
684
wake_up_process(ls->ls_recoverd_task);
685
wait_event(ls->ls_recover_lock_wait,
686
test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
687
}
688
689
/*
690
* The recoverd suspend/resume makes sure that dlm_recoverd (if
691
* running) has noticed RECOVER_STOP above and quit processing the
692
* previous recovery.
693
*/
694
695
dlm_recoverd_suspend(ls);
696
697
spin_lock_bh(&ls->ls_recover_lock);
698
kfree(ls->ls_slots);
699
ls->ls_slots = NULL;
700
ls->ls_num_slots = 0;
701
ls->ls_slots_size = 0;
702
ls->ls_recover_status = 0;
703
spin_unlock_bh(&ls->ls_recover_lock);
704
705
dlm_recoverd_resume(ls);
706
707
if (!ls->ls_recover_begin)
708
ls->ls_recover_begin = jiffies;
709
710
/* call recover_prep ops only once and not multiple times
711
* for each possible dlm_ls_stop() when recovery is already
712
* stopped.
713
*
714
* If we successful was able to clear LSFL_RUNNING bit and
715
* it was set we know it is the first dlm_ls_stop() call.
716
*/
717
if (new)
718
dlm_lsop_recover_prep(ls);
719
720
return 0;
721
}
722
723
int dlm_ls_start(struct dlm_ls *ls)
724
{
725
struct dlm_recover *rv, *rv_old;
726
struct dlm_config_node *nodes = NULL;
727
int error, count;
728
729
rv = kzalloc(sizeof(*rv), GFP_NOFS);
730
if (!rv)
731
return -ENOMEM;
732
733
error = dlm_config_nodes(ls->ls_name, &nodes, &count);
734
if (error < 0)
735
goto fail_rv;
736
737
spin_lock_bh(&ls->ls_recover_lock);
738
739
/* the lockspace needs to be stopped before it can be started */
740
741
if (!dlm_locking_stopped(ls)) {
742
spin_unlock_bh(&ls->ls_recover_lock);
743
log_error(ls, "start ignored: lockspace running");
744
error = -EINVAL;
745
goto fail;
746
}
747
748
rv->nodes = nodes;
749
rv->nodes_count = count;
750
rv->seq = ++ls->ls_recover_seq;
751
rv_old = ls->ls_recover_args;
752
ls->ls_recover_args = rv;
753
spin_unlock_bh(&ls->ls_recover_lock);
754
755
if (rv_old) {
756
log_error(ls, "unused recovery %llx %d",
757
(unsigned long long)rv_old->seq, rv_old->nodes_count);
758
kfree(rv_old->nodes);
759
kfree(rv_old);
760
}
761
762
set_bit(LSFL_RECOVER_WORK, &ls->ls_flags);
763
wake_up_process(ls->ls_recoverd_task);
764
return 0;
765
766
fail:
767
kfree(nodes);
768
fail_rv:
769
kfree(rv);
770
return error;
771
}
772
773
774