Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/fs/dlm/member.c
26285 views
1
// SPDX-License-Identifier: GPL-2.0-only
2
/******************************************************************************
3
*******************************************************************************
4
**
5
** Copyright (C) 2005-2011 Red Hat, Inc. All rights reserved.
6
**
7
**
8
*******************************************************************************
9
******************************************************************************/
10
11
#include "dlm_internal.h"
12
#include "lockspace.h"
13
#include "member.h"
14
#include "recoverd.h"
15
#include "recover.h"
16
#include "rcom.h"
17
#include "config.h"
18
#include "midcomms.h"
19
#include "lowcomms.h"
20
21
int dlm_slots_version(const struct dlm_header *h)
22
{
23
if ((le32_to_cpu(h->h_version) & 0x0000FFFF) < DLM_HEADER_SLOTS)
24
return 0;
25
return 1;
26
}
27
28
void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc,
29
struct dlm_member *memb)
30
{
31
struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
32
33
if (!dlm_slots_version(&rc->rc_header))
34
return;
35
36
memb->slot = le16_to_cpu(rf->rf_our_slot);
37
memb->generation = le32_to_cpu(rf->rf_generation);
38
}
39
40
void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc)
41
{
42
struct dlm_slot *slot;
43
struct rcom_slot *ro;
44
int i;
45
46
ro = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
47
48
/* ls_slots array is sparse, but not rcom_slots */
49
50
for (i = 0; i < ls->ls_slots_size; i++) {
51
slot = &ls->ls_slots[i];
52
if (!slot->nodeid)
53
continue;
54
ro->ro_nodeid = cpu_to_le32(slot->nodeid);
55
ro->ro_slot = cpu_to_le16(slot->slot);
56
ro++;
57
}
58
}
59
60
#define SLOT_DEBUG_LINE 128
61
62
static void log_slots(struct dlm_ls *ls, uint32_t gen, int num_slots,
63
struct rcom_slot *ro0, struct dlm_slot *array,
64
int array_size)
65
{
66
char line[SLOT_DEBUG_LINE];
67
int len = SLOT_DEBUG_LINE - 1;
68
int pos = 0;
69
int ret, i;
70
71
memset(line, 0, sizeof(line));
72
73
if (array) {
74
for (i = 0; i < array_size; i++) {
75
if (!array[i].nodeid)
76
continue;
77
78
ret = snprintf(line + pos, len - pos, " %d:%d",
79
array[i].slot, array[i].nodeid);
80
if (ret >= len - pos)
81
break;
82
pos += ret;
83
}
84
} else if (ro0) {
85
for (i = 0; i < num_slots; i++) {
86
ret = snprintf(line + pos, len - pos, " %d:%d",
87
ro0[i].ro_slot, ro0[i].ro_nodeid);
88
if (ret >= len - pos)
89
break;
90
pos += ret;
91
}
92
}
93
94
log_rinfo(ls, "generation %u slots %d%s", gen, num_slots, line);
95
}
96
97
int dlm_slots_copy_in(struct dlm_ls *ls)
98
{
99
struct dlm_member *memb;
100
struct dlm_rcom *rc = ls->ls_recover_buf;
101
struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
102
struct rcom_slot *ro0, *ro;
103
int our_nodeid = dlm_our_nodeid();
104
int i, num_slots;
105
uint32_t gen;
106
107
if (!dlm_slots_version(&rc->rc_header))
108
return -1;
109
110
gen = le32_to_cpu(rf->rf_generation);
111
if (gen <= ls->ls_generation) {
112
log_error(ls, "dlm_slots_copy_in gen %u old %u",
113
gen, ls->ls_generation);
114
}
115
ls->ls_generation = gen;
116
117
num_slots = le16_to_cpu(rf->rf_num_slots);
118
if (!num_slots)
119
return -1;
120
121
ro0 = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
122
123
log_slots(ls, gen, num_slots, ro0, NULL, 0);
124
125
list_for_each_entry(memb, &ls->ls_nodes, list) {
126
for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
127
if (le32_to_cpu(ro->ro_nodeid) != memb->nodeid)
128
continue;
129
memb->slot = le16_to_cpu(ro->ro_slot);
130
memb->slot_prev = memb->slot;
131
break;
132
}
133
134
if (memb->nodeid == our_nodeid) {
135
if (ls->ls_slot && ls->ls_slot != memb->slot) {
136
log_error(ls, "dlm_slots_copy_in our slot "
137
"changed %d %d", ls->ls_slot,
138
memb->slot);
139
return -1;
140
}
141
142
if (!ls->ls_slot)
143
ls->ls_slot = memb->slot;
144
}
145
146
if (!memb->slot) {
147
log_error(ls, "dlm_slots_copy_in nodeid %d no slot",
148
memb->nodeid);
149
return -1;
150
}
151
}
152
153
return 0;
154
}
155
156
/* for any nodes that do not support slots, we will not have set memb->slot
157
in wait_status_all(), so memb->slot will remain -1, and we will not
158
assign slots or set ls_num_slots here */
159
160
int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size,
161
struct dlm_slot **slots_out, uint32_t *gen_out)
162
{
163
struct dlm_member *memb;
164
struct dlm_slot *array;
165
int our_nodeid = dlm_our_nodeid();
166
int array_size, max_slots, i;
167
int need = 0;
168
int max = 0;
169
int num = 0;
170
uint32_t gen = 0;
171
172
/* our own memb struct will have slot -1 gen 0 */
173
174
list_for_each_entry(memb, &ls->ls_nodes, list) {
175
if (memb->nodeid == our_nodeid) {
176
memb->slot = ls->ls_slot;
177
memb->generation = ls->ls_generation;
178
break;
179
}
180
}
181
182
list_for_each_entry(memb, &ls->ls_nodes, list) {
183
if (memb->generation > gen)
184
gen = memb->generation;
185
186
/* node doesn't support slots */
187
188
if (memb->slot == -1)
189
return -1;
190
191
/* node needs a slot assigned */
192
193
if (!memb->slot)
194
need++;
195
196
/* node has a slot assigned */
197
198
num++;
199
200
if (!max || max < memb->slot)
201
max = memb->slot;
202
203
/* sanity check, once slot is assigned it shouldn't change */
204
205
if (memb->slot_prev && memb->slot && memb->slot_prev != memb->slot) {
206
log_error(ls, "nodeid %d slot changed %d %d",
207
memb->nodeid, memb->slot_prev, memb->slot);
208
return -1;
209
}
210
memb->slot_prev = memb->slot;
211
}
212
213
array_size = max + need;
214
array = kcalloc(array_size, sizeof(*array), GFP_NOFS);
215
if (!array)
216
return -ENOMEM;
217
218
num = 0;
219
220
/* fill in slots (offsets) that are used */
221
222
list_for_each_entry(memb, &ls->ls_nodes, list) {
223
if (!memb->slot)
224
continue;
225
226
if (memb->slot > array_size) {
227
log_error(ls, "invalid slot number %d", memb->slot);
228
kfree(array);
229
return -1;
230
}
231
232
array[memb->slot - 1].nodeid = memb->nodeid;
233
array[memb->slot - 1].slot = memb->slot;
234
num++;
235
}
236
237
/* assign new slots from unused offsets */
238
239
list_for_each_entry(memb, &ls->ls_nodes, list) {
240
if (memb->slot)
241
continue;
242
243
for (i = 0; i < array_size; i++) {
244
if (array[i].nodeid)
245
continue;
246
247
memb->slot = i + 1;
248
memb->slot_prev = memb->slot;
249
array[i].nodeid = memb->nodeid;
250
array[i].slot = memb->slot;
251
num++;
252
253
if (!ls->ls_slot && memb->nodeid == our_nodeid)
254
ls->ls_slot = memb->slot;
255
break;
256
}
257
258
if (!memb->slot) {
259
log_error(ls, "no free slot found");
260
kfree(array);
261
return -1;
262
}
263
}
264
265
gen++;
266
267
log_slots(ls, gen, num, NULL, array, array_size);
268
269
max_slots = (DLM_MAX_APP_BUFSIZE - sizeof(struct dlm_rcom) -
270
sizeof(struct rcom_config)) / sizeof(struct rcom_slot);
271
272
if (num > max_slots) {
273
log_error(ls, "num_slots %d exceeds max_slots %d",
274
num, max_slots);
275
kfree(array);
276
return -1;
277
}
278
279
*gen_out = gen;
280
*slots_out = array;
281
*slots_size = array_size;
282
*num_slots = num;
283
return 0;
284
}
285
286
static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
287
{
288
struct dlm_member *memb = NULL;
289
struct list_head *tmp;
290
struct list_head *newlist = &new->list;
291
struct list_head *head = &ls->ls_nodes;
292
293
list_for_each(tmp, head) {
294
memb = list_entry(tmp, struct dlm_member, list);
295
if (new->nodeid < memb->nodeid)
296
break;
297
}
298
299
if (!memb)
300
list_add_tail(newlist, head);
301
else {
302
/* FIXME: can use list macro here */
303
newlist->prev = tmp->prev;
304
newlist->next = tmp;
305
tmp->prev->next = newlist;
306
tmp->prev = newlist;
307
}
308
}
309
310
static int add_remote_member(int nodeid)
311
{
312
int error;
313
314
if (nodeid == dlm_our_nodeid())
315
return 0;
316
317
error = dlm_lowcomms_connect_node(nodeid);
318
if (error < 0)
319
return error;
320
321
dlm_midcomms_add_member(nodeid);
322
return 0;
323
}
324
325
static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node)
326
{
327
struct dlm_member *memb;
328
int error;
329
330
memb = kzalloc(sizeof(*memb), GFP_NOFS);
331
if (!memb)
332
return -ENOMEM;
333
334
memb->nodeid = node->nodeid;
335
memb->weight = node->weight;
336
memb->comm_seq = node->comm_seq;
337
338
error = add_remote_member(node->nodeid);
339
if (error < 0) {
340
kfree(memb);
341
return error;
342
}
343
344
add_ordered_member(ls, memb);
345
ls->ls_num_nodes++;
346
return 0;
347
}
348
349
static struct dlm_member *find_memb(struct list_head *head, int nodeid)
350
{
351
struct dlm_member *memb;
352
353
list_for_each_entry(memb, head, list) {
354
if (memb->nodeid == nodeid)
355
return memb;
356
}
357
return NULL;
358
}
359
360
int dlm_is_member(struct dlm_ls *ls, int nodeid)
361
{
362
if (find_memb(&ls->ls_nodes, nodeid))
363
return 1;
364
return 0;
365
}
366
367
int dlm_is_removed(struct dlm_ls *ls, int nodeid)
368
{
369
WARN_ON_ONCE(!nodeid || nodeid == -1);
370
371
if (find_memb(&ls->ls_nodes_gone, nodeid))
372
return 1;
373
return 0;
374
}
375
376
static void clear_memb_list(struct list_head *head,
377
void (*after_del)(int nodeid))
378
{
379
struct dlm_member *memb;
380
381
while (!list_empty(head)) {
382
memb = list_entry(head->next, struct dlm_member, list);
383
list_del(&memb->list);
384
if (after_del)
385
after_del(memb->nodeid);
386
kfree(memb);
387
}
388
}
389
390
static void remove_remote_member(int nodeid)
391
{
392
if (nodeid == dlm_our_nodeid())
393
return;
394
395
dlm_midcomms_remove_member(nodeid);
396
}
397
398
void dlm_clear_members(struct dlm_ls *ls)
399
{
400
clear_memb_list(&ls->ls_nodes, remove_remote_member);
401
ls->ls_num_nodes = 0;
402
}
403
404
void dlm_clear_members_gone(struct dlm_ls *ls)
405
{
406
clear_memb_list(&ls->ls_nodes_gone, NULL);
407
}
408
409
static void make_member_array(struct dlm_ls *ls)
410
{
411
struct dlm_member *memb;
412
int i, w, x = 0, total = 0, all_zero = 0, *array;
413
414
kfree(ls->ls_node_array);
415
ls->ls_node_array = NULL;
416
417
list_for_each_entry(memb, &ls->ls_nodes, list) {
418
if (memb->weight)
419
total += memb->weight;
420
}
421
422
/* all nodes revert to weight of 1 if all have weight 0 */
423
424
if (!total) {
425
total = ls->ls_num_nodes;
426
all_zero = 1;
427
}
428
429
ls->ls_total_weight = total;
430
array = kmalloc_array(total, sizeof(*array), GFP_NOFS);
431
if (!array)
432
return;
433
434
list_for_each_entry(memb, &ls->ls_nodes, list) {
435
if (!all_zero && !memb->weight)
436
continue;
437
438
if (all_zero)
439
w = 1;
440
else
441
w = memb->weight;
442
443
DLM_ASSERT(x < total, printk("total %d x %d\n", total, x););
444
445
for (i = 0; i < w; i++)
446
array[x++] = memb->nodeid;
447
}
448
449
ls->ls_node_array = array;
450
}
451
452
/* send a status request to all members just to establish comms connections */
453
454
static int ping_members(struct dlm_ls *ls, uint64_t seq)
455
{
456
struct dlm_member *memb;
457
int error = 0;
458
459
list_for_each_entry(memb, &ls->ls_nodes, list) {
460
if (dlm_recovery_stopped(ls)) {
461
error = -EINTR;
462
break;
463
}
464
error = dlm_rcom_status(ls, memb->nodeid, 0, seq);
465
if (error)
466
break;
467
}
468
if (error)
469
log_rinfo(ls, "ping_members aborted %d last nodeid %d",
470
error, ls->ls_recover_nodeid);
471
return error;
472
}
473
474
static void dlm_lsop_recover_prep(struct dlm_ls *ls)
475
{
476
if (!ls->ls_ops || !ls->ls_ops->recover_prep)
477
return;
478
ls->ls_ops->recover_prep(ls->ls_ops_arg);
479
}
480
481
static void dlm_lsop_recover_slot(struct dlm_ls *ls, struct dlm_member *memb)
482
{
483
struct dlm_slot slot;
484
uint32_t seq;
485
int error;
486
487
if (!ls->ls_ops || !ls->ls_ops->recover_slot)
488
return;
489
490
/* if there is no comms connection with this node
491
or the present comms connection is newer
492
than the one when this member was added, then
493
we consider the node to have failed (versus
494
being removed due to dlm_release_lockspace) */
495
496
error = dlm_comm_seq(memb->nodeid, &seq, false);
497
498
if (!error && seq == memb->comm_seq)
499
return;
500
501
slot.nodeid = memb->nodeid;
502
slot.slot = memb->slot;
503
504
ls->ls_ops->recover_slot(ls->ls_ops_arg, &slot);
505
}
506
507
void dlm_lsop_recover_done(struct dlm_ls *ls)
508
{
509
struct dlm_member *memb;
510
struct dlm_slot *slots;
511
int i, num;
512
513
if (!ls->ls_ops || !ls->ls_ops->recover_done)
514
return;
515
516
num = ls->ls_num_nodes;
517
slots = kcalloc(num, sizeof(*slots), GFP_KERNEL);
518
if (!slots)
519
return;
520
521
i = 0;
522
list_for_each_entry(memb, &ls->ls_nodes, list) {
523
if (i == num) {
524
log_error(ls, "dlm_lsop_recover_done bad num %d", num);
525
goto out;
526
}
527
slots[i].nodeid = memb->nodeid;
528
slots[i].slot = memb->slot;
529
i++;
530
}
531
532
ls->ls_ops->recover_done(ls->ls_ops_arg, slots, num,
533
ls->ls_slot, ls->ls_generation);
534
out:
535
kfree(slots);
536
}
537
538
static struct dlm_config_node *find_config_node(struct dlm_recover *rv,
539
int nodeid)
540
{
541
int i;
542
543
for (i = 0; i < rv->nodes_count; i++) {
544
if (rv->nodes[i].nodeid == nodeid)
545
return &rv->nodes[i];
546
}
547
return NULL;
548
}
549
550
int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
551
{
552
struct dlm_member *memb, *safe;
553
struct dlm_config_node *node;
554
int i, error, neg = 0, low = -1;
555
556
/* previously removed members that we've not finished removing need to
557
* count as a negative change so the "neg" recovery steps will happen
558
*
559
* This functionality must report all member changes to lsops or
560
* midcomms layer and must never return before.
561
*/
562
563
list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
564
log_rinfo(ls, "prev removed member %d", memb->nodeid);
565
neg++;
566
}
567
568
/* move departed members from ls_nodes to ls_nodes_gone */
569
570
list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
571
node = find_config_node(rv, memb->nodeid);
572
if (node && !node->new)
573
continue;
574
575
if (!node) {
576
log_rinfo(ls, "remove member %d", memb->nodeid);
577
} else {
578
/* removed and re-added */
579
log_rinfo(ls, "remove member %d comm_seq %u %u",
580
memb->nodeid, memb->comm_seq, node->comm_seq);
581
}
582
583
neg++;
584
list_move(&memb->list, &ls->ls_nodes_gone);
585
remove_remote_member(memb->nodeid);
586
ls->ls_num_nodes--;
587
dlm_lsop_recover_slot(ls, memb);
588
}
589
590
/* add new members to ls_nodes */
591
592
for (i = 0; i < rv->nodes_count; i++) {
593
node = &rv->nodes[i];
594
if (dlm_is_member(ls, node->nodeid))
595
continue;
596
error = dlm_add_member(ls, node);
597
if (error)
598
return error;
599
600
log_rinfo(ls, "add member %d", node->nodeid);
601
}
602
603
list_for_each_entry(memb, &ls->ls_nodes, list) {
604
if (low == -1 || memb->nodeid < low)
605
low = memb->nodeid;
606
}
607
ls->ls_low_nodeid = low;
608
609
make_member_array(ls);
610
*neg_out = neg;
611
612
error = ping_members(ls, rv->seq);
613
log_rinfo(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes);
614
return error;
615
}
616
617
/* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
618
dlm_ls_start() is called on any of them to start the new recovery. */
619
620
int dlm_ls_stop(struct dlm_ls *ls)
621
{
622
int new;
623
624
/*
625
* Prevent dlm_recv from being in the middle of something when we do
626
* the stop. This includes ensuring dlm_recv isn't processing a
627
* recovery message (rcom), while dlm_recoverd is aborting and
628
* resetting things from an in-progress recovery. i.e. we want
629
* dlm_recoverd to abort its recovery without worrying about dlm_recv
630
* processing an rcom at the same time. Stopping dlm_recv also makes
631
* it easy for dlm_receive_message() to check locking stopped and add a
632
* message to the requestqueue without races.
633
*/
634
635
write_lock_bh(&ls->ls_recv_active);
636
637
/*
638
* Abort any recovery that's in progress (see RECOVER_STOP,
639
* dlm_recovery_stopped()) and tell any other threads running in the
640
* dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
641
*/
642
643
spin_lock_bh(&ls->ls_recover_lock);
644
set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
645
new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
646
if (new)
647
timer_delete_sync(&ls->ls_scan_timer);
648
ls->ls_recover_seq++;
649
650
/* activate requestqueue and stop processing */
651
write_lock_bh(&ls->ls_requestqueue_lock);
652
set_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags);
653
write_unlock_bh(&ls->ls_requestqueue_lock);
654
spin_unlock_bh(&ls->ls_recover_lock);
655
656
/*
657
* Let dlm_recv run again, now any normal messages will be saved on the
658
* requestqueue for later.
659
*/
660
661
write_unlock_bh(&ls->ls_recv_active);
662
663
/*
664
* This in_recovery lock does two things:
665
* 1) Keeps this function from returning until all threads are out
666
* of locking routines and locking is truly stopped.
667
* 2) Keeps any new requests from being processed until it's unlocked
668
* when recovery is complete.
669
*/
670
671
if (new) {
672
set_bit(LSFL_RECOVER_DOWN, &ls->ls_flags);
673
wake_up_process(ls->ls_recoverd_task);
674
wait_event(ls->ls_recover_lock_wait,
675
test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
676
}
677
678
/*
679
* The recoverd suspend/resume makes sure that dlm_recoverd (if
680
* running) has noticed RECOVER_STOP above and quit processing the
681
* previous recovery.
682
*/
683
684
dlm_recoverd_suspend(ls);
685
686
spin_lock_bh(&ls->ls_recover_lock);
687
kfree(ls->ls_slots);
688
ls->ls_slots = NULL;
689
ls->ls_num_slots = 0;
690
ls->ls_slots_size = 0;
691
ls->ls_recover_status = 0;
692
spin_unlock_bh(&ls->ls_recover_lock);
693
694
dlm_recoverd_resume(ls);
695
696
if (!ls->ls_recover_begin)
697
ls->ls_recover_begin = jiffies;
698
699
/* call recover_prep ops only once and not multiple times
700
* for each possible dlm_ls_stop() when recovery is already
701
* stopped.
702
*
703
* If we successful was able to clear LSFL_RUNNING bit and
704
* it was set we know it is the first dlm_ls_stop() call.
705
*/
706
if (new)
707
dlm_lsop_recover_prep(ls);
708
709
return 0;
710
}
711
712
int dlm_ls_start(struct dlm_ls *ls)
713
{
714
struct dlm_recover *rv, *rv_old;
715
struct dlm_config_node *nodes = NULL;
716
int error, count;
717
718
rv = kzalloc(sizeof(*rv), GFP_NOFS);
719
if (!rv)
720
return -ENOMEM;
721
722
error = dlm_config_nodes(ls->ls_name, &nodes, &count);
723
if (error < 0)
724
goto fail_rv;
725
726
spin_lock_bh(&ls->ls_recover_lock);
727
728
/* the lockspace needs to be stopped before it can be started */
729
730
if (!dlm_locking_stopped(ls)) {
731
spin_unlock_bh(&ls->ls_recover_lock);
732
log_error(ls, "start ignored: lockspace running");
733
error = -EINVAL;
734
goto fail;
735
}
736
737
rv->nodes = nodes;
738
rv->nodes_count = count;
739
rv->seq = ++ls->ls_recover_seq;
740
rv_old = ls->ls_recover_args;
741
ls->ls_recover_args = rv;
742
spin_unlock_bh(&ls->ls_recover_lock);
743
744
if (rv_old) {
745
log_error(ls, "unused recovery %llx %d",
746
(unsigned long long)rv_old->seq, rv_old->nodes_count);
747
kfree(rv_old->nodes);
748
kfree(rv_old);
749
}
750
751
set_bit(LSFL_RECOVER_WORK, &ls->ls_flags);
752
wake_up_process(ls->ls_recoverd_task);
753
return 0;
754
755
fail:
756
kfree(nodes);
757
fail_rv:
758
kfree(rv);
759
return error;
760
}
761
762
763