Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/fs/dlm/recover.c
26282 views
1
// SPDX-License-Identifier: GPL-2.0-only
2
/******************************************************************************
3
*******************************************************************************
4
**
5
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
6
** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
7
**
8
**
9
*******************************************************************************
10
******************************************************************************/
11
12
#include "dlm_internal.h"
13
#include "lockspace.h"
14
#include "dir.h"
15
#include "config.h"
16
#include "ast.h"
17
#include "memory.h"
18
#include "rcom.h"
19
#include "lock.h"
20
#include "lowcomms.h"
21
#include "member.h"
22
#include "recover.h"
23
24
25
/*
26
* Recovery waiting routines: these functions wait for a particular reply from
27
* a remote node, or for the remote node to report a certain status. They need
28
* to abort if the lockspace is stopped indicating a node has failed (perhaps
29
* the one being waited for).
30
*/
31
32
/*
33
* Wait until given function returns non-zero or lockspace is stopped
34
* (LS_RECOVERY_STOP set due to failure of a node in ls_nodes). When another
35
* function thinks it could have completed the waited-on task, they should wake
36
* up ls_wait_general to get an immediate response rather than waiting for the
37
* timeout. This uses a timeout so it can check periodically if the wait
38
* should abort due to node failure (which doesn't cause a wake_up).
39
* This should only be called by the dlm_recoverd thread.
40
*/
41
42
int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls))
43
{
44
int error = 0;
45
int rv;
46
47
while (1) {
48
rv = wait_event_timeout(ls->ls_wait_general,
49
testfn(ls) || dlm_recovery_stopped(ls),
50
dlm_config.ci_recover_timer * HZ);
51
if (rv)
52
break;
53
if (test_bit(LSFL_RCOM_WAIT, &ls->ls_flags)) {
54
log_debug(ls, "dlm_wait_function timed out");
55
return -ETIMEDOUT;
56
}
57
}
58
59
if (dlm_recovery_stopped(ls)) {
60
log_debug(ls, "dlm_wait_function aborted");
61
error = -EINTR;
62
}
63
return error;
64
}
65
66
/*
67
* An efficient way for all nodes to wait for all others to have a certain
68
* status. The node with the lowest nodeid polls all the others for their
69
* status (wait_status_all) and all the others poll the node with the low id
70
* for its accumulated result (wait_status_low). When all nodes have set
71
* status flag X, then status flag X_ALL will be set on the low nodeid.
72
*/
73
74
uint32_t dlm_recover_status(struct dlm_ls *ls)
75
{
76
uint32_t status;
77
spin_lock_bh(&ls->ls_recover_lock);
78
status = ls->ls_recover_status;
79
spin_unlock_bh(&ls->ls_recover_lock);
80
return status;
81
}
82
83
static void _set_recover_status(struct dlm_ls *ls, uint32_t status)
84
{
85
ls->ls_recover_status |= status;
86
}
87
88
void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status)
89
{
90
spin_lock_bh(&ls->ls_recover_lock);
91
_set_recover_status(ls, status);
92
spin_unlock_bh(&ls->ls_recover_lock);
93
}
94
95
static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status,
96
int save_slots, uint64_t seq)
97
{
98
struct dlm_rcom *rc = ls->ls_recover_buf;
99
struct dlm_member *memb;
100
int error = 0, delay;
101
102
list_for_each_entry(memb, &ls->ls_nodes, list) {
103
delay = 0;
104
for (;;) {
105
if (dlm_recovery_stopped(ls)) {
106
error = -EINTR;
107
goto out;
108
}
109
110
error = dlm_rcom_status(ls, memb->nodeid, 0, seq);
111
if (error)
112
goto out;
113
114
if (save_slots)
115
dlm_slot_save(ls, rc, memb);
116
117
if (le32_to_cpu(rc->rc_result) & wait_status)
118
break;
119
if (delay < 1000)
120
delay += 20;
121
msleep(delay);
122
}
123
}
124
out:
125
return error;
126
}
127
128
static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status,
129
uint32_t status_flags, uint64_t seq)
130
{
131
struct dlm_rcom *rc = ls->ls_recover_buf;
132
int error = 0, delay = 0, nodeid = ls->ls_low_nodeid;
133
134
for (;;) {
135
if (dlm_recovery_stopped(ls)) {
136
error = -EINTR;
137
goto out;
138
}
139
140
error = dlm_rcom_status(ls, nodeid, status_flags, seq);
141
if (error)
142
break;
143
144
if (le32_to_cpu(rc->rc_result) & wait_status)
145
break;
146
if (delay < 1000)
147
delay += 20;
148
msleep(delay);
149
}
150
out:
151
return error;
152
}
153
154
static int wait_status(struct dlm_ls *ls, uint32_t status, uint64_t seq)
155
{
156
uint32_t status_all = status << 1;
157
int error;
158
159
if (ls->ls_low_nodeid == dlm_our_nodeid()) {
160
error = wait_status_all(ls, status, 0, seq);
161
if (!error)
162
dlm_set_recover_status(ls, status_all);
163
} else
164
error = wait_status_low(ls, status_all, 0, seq);
165
166
return error;
167
}
168
169
int dlm_recover_members_wait(struct dlm_ls *ls, uint64_t seq)
170
{
171
struct dlm_member *memb;
172
struct dlm_slot *slots;
173
int num_slots, slots_size;
174
int error, rv;
175
uint32_t gen;
176
177
list_for_each_entry(memb, &ls->ls_nodes, list) {
178
memb->slot = -1;
179
memb->generation = 0;
180
}
181
182
if (ls->ls_low_nodeid == dlm_our_nodeid()) {
183
error = wait_status_all(ls, DLM_RS_NODES, 1, seq);
184
if (error)
185
goto out;
186
187
/* slots array is sparse, slots_size may be > num_slots */
188
189
rv = dlm_slots_assign(ls, &num_slots, &slots_size, &slots, &gen);
190
if (!rv) {
191
spin_lock_bh(&ls->ls_recover_lock);
192
_set_recover_status(ls, DLM_RS_NODES_ALL);
193
ls->ls_num_slots = num_slots;
194
ls->ls_slots_size = slots_size;
195
ls->ls_slots = slots;
196
ls->ls_generation = gen;
197
spin_unlock_bh(&ls->ls_recover_lock);
198
} else {
199
dlm_set_recover_status(ls, DLM_RS_NODES_ALL);
200
}
201
} else {
202
error = wait_status_low(ls, DLM_RS_NODES_ALL,
203
DLM_RSF_NEED_SLOTS, seq);
204
if (error)
205
goto out;
206
207
dlm_slots_copy_in(ls);
208
}
209
out:
210
return error;
211
}
212
213
int dlm_recover_directory_wait(struct dlm_ls *ls, uint64_t seq)
214
{
215
return wait_status(ls, DLM_RS_DIR, seq);
216
}
217
218
int dlm_recover_locks_wait(struct dlm_ls *ls, uint64_t seq)
219
{
220
return wait_status(ls, DLM_RS_LOCKS, seq);
221
}
222
223
int dlm_recover_done_wait(struct dlm_ls *ls, uint64_t seq)
224
{
225
return wait_status(ls, DLM_RS_DONE, seq);
226
}
227
228
/*
229
* The recover_list contains all the rsb's for which we've requested the new
230
* master nodeid. As replies are returned from the resource directories the
231
* rsb's are removed from the list. When the list is empty we're done.
232
*
233
* The recover_list is later similarly used for all rsb's for which we've sent
234
* new lkb's and need to receive new corresponding lkid's.
235
*
236
* We use the address of the rsb struct as a simple local identifier for the
237
* rsb so we can match an rcom reply with the rsb it was sent for.
238
*/
239
240
static int recover_list_empty(struct dlm_ls *ls)
241
{
242
int empty;
243
244
spin_lock_bh(&ls->ls_recover_list_lock);
245
empty = list_empty(&ls->ls_recover_list);
246
spin_unlock_bh(&ls->ls_recover_list_lock);
247
248
return empty;
249
}
250
251
static void recover_list_add(struct dlm_rsb *r)
252
{
253
struct dlm_ls *ls = r->res_ls;
254
255
spin_lock_bh(&ls->ls_recover_list_lock);
256
if (list_empty(&r->res_recover_list)) {
257
list_add_tail(&r->res_recover_list, &ls->ls_recover_list);
258
ls->ls_recover_list_count++;
259
dlm_hold_rsb(r);
260
}
261
spin_unlock_bh(&ls->ls_recover_list_lock);
262
}
263
264
static void recover_list_del(struct dlm_rsb *r)
265
{
266
struct dlm_ls *ls = r->res_ls;
267
268
spin_lock_bh(&ls->ls_recover_list_lock);
269
list_del_init(&r->res_recover_list);
270
ls->ls_recover_list_count--;
271
spin_unlock_bh(&ls->ls_recover_list_lock);
272
273
dlm_put_rsb(r);
274
}
275
276
static void recover_list_clear(struct dlm_ls *ls)
277
{
278
struct dlm_rsb *r, *s;
279
280
spin_lock_bh(&ls->ls_recover_list_lock);
281
list_for_each_entry_safe(r, s, &ls->ls_recover_list, res_recover_list) {
282
list_del_init(&r->res_recover_list);
283
r->res_recover_locks_count = 0;
284
dlm_put_rsb(r);
285
ls->ls_recover_list_count--;
286
}
287
288
if (ls->ls_recover_list_count != 0) {
289
log_error(ls, "warning: recover_list_count %d",
290
ls->ls_recover_list_count);
291
ls->ls_recover_list_count = 0;
292
}
293
spin_unlock_bh(&ls->ls_recover_list_lock);
294
}
295
296
static int recover_xa_empty(struct dlm_ls *ls)
297
{
298
int empty = 1;
299
300
spin_lock_bh(&ls->ls_recover_xa_lock);
301
if (ls->ls_recover_list_count)
302
empty = 0;
303
spin_unlock_bh(&ls->ls_recover_xa_lock);
304
305
return empty;
306
}
307
308
static int recover_xa_add(struct dlm_rsb *r)
309
{
310
struct dlm_ls *ls = r->res_ls;
311
struct xa_limit limit = {
312
.min = 1,
313
.max = UINT_MAX,
314
};
315
uint32_t id;
316
int rv;
317
318
spin_lock_bh(&ls->ls_recover_xa_lock);
319
if (r->res_id) {
320
rv = -1;
321
goto out_unlock;
322
}
323
rv = xa_alloc(&ls->ls_recover_xa, &id, r, limit, GFP_ATOMIC);
324
if (rv < 0)
325
goto out_unlock;
326
327
r->res_id = id;
328
ls->ls_recover_list_count++;
329
dlm_hold_rsb(r);
330
rv = 0;
331
out_unlock:
332
spin_unlock_bh(&ls->ls_recover_xa_lock);
333
return rv;
334
}
335
336
static void recover_xa_del(struct dlm_rsb *r)
337
{
338
struct dlm_ls *ls = r->res_ls;
339
340
spin_lock_bh(&ls->ls_recover_xa_lock);
341
xa_erase_bh(&ls->ls_recover_xa, r->res_id);
342
r->res_id = 0;
343
ls->ls_recover_list_count--;
344
spin_unlock_bh(&ls->ls_recover_xa_lock);
345
346
dlm_put_rsb(r);
347
}
348
349
static struct dlm_rsb *recover_xa_find(struct dlm_ls *ls, uint64_t id)
350
{
351
struct dlm_rsb *r;
352
353
spin_lock_bh(&ls->ls_recover_xa_lock);
354
r = xa_load(&ls->ls_recover_xa, (int)id);
355
spin_unlock_bh(&ls->ls_recover_xa_lock);
356
return r;
357
}
358
359
static void recover_xa_clear(struct dlm_ls *ls)
360
{
361
struct dlm_rsb *r;
362
unsigned long id;
363
364
spin_lock_bh(&ls->ls_recover_xa_lock);
365
366
xa_for_each(&ls->ls_recover_xa, id, r) {
367
xa_erase_bh(&ls->ls_recover_xa, id);
368
r->res_id = 0;
369
r->res_recover_locks_count = 0;
370
ls->ls_recover_list_count--;
371
372
dlm_put_rsb(r);
373
}
374
375
if (ls->ls_recover_list_count != 0) {
376
log_error(ls, "warning: recover_list_count %d",
377
ls->ls_recover_list_count);
378
ls->ls_recover_list_count = 0;
379
}
380
spin_unlock_bh(&ls->ls_recover_xa_lock);
381
}
382
383
384
/* Master recovery: find new master node for rsb's that were
385
mastered on nodes that have been removed.
386
387
dlm_recover_masters
388
recover_master
389
dlm_send_rcom_lookup -> receive_rcom_lookup
390
dlm_dir_lookup
391
receive_rcom_lookup_reply <-
392
dlm_recover_master_reply
393
set_new_master
394
set_master_lkbs
395
set_lock_master
396
*/
397
398
/*
399
* Set the lock master for all LKBs in a lock queue
400
* If we are the new master of the rsb, we may have received new
401
* MSTCPY locks from other nodes already which we need to ignore
402
* when setting the new nodeid.
403
*/
404
405
static void set_lock_master(struct list_head *queue, int nodeid)
406
{
407
struct dlm_lkb *lkb;
408
409
list_for_each_entry(lkb, queue, lkb_statequeue) {
410
if (!test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) {
411
lkb->lkb_nodeid = nodeid;
412
lkb->lkb_remid = 0;
413
}
414
}
415
}
416
417
static void set_master_lkbs(struct dlm_rsb *r)
418
{
419
set_lock_master(&r->res_grantqueue, r->res_nodeid);
420
set_lock_master(&r->res_convertqueue, r->res_nodeid);
421
set_lock_master(&r->res_waitqueue, r->res_nodeid);
422
}
423
424
/*
425
* Propagate the new master nodeid to locks
426
* The NEW_MASTER flag tells dlm_recover_locks() which rsb's to consider.
427
* The NEW_MASTER2 flag tells recover_lvb() and recover_grant() which
428
* rsb's to consider.
429
*/
430
431
static void set_new_master(struct dlm_rsb *r)
432
{
433
set_master_lkbs(r);
434
rsb_set_flag(r, RSB_NEW_MASTER);
435
rsb_set_flag(r, RSB_NEW_MASTER2);
436
}
437
438
/*
439
* We do async lookups on rsb's that need new masters. The rsb's
440
* waiting for a lookup reply are kept on the recover_list.
441
*
442
* Another node recovering the master may have sent us a rcom lookup,
443
* and our dlm_master_lookup() set it as the new master, along with
444
* NEW_MASTER so that we'll recover it here (this implies dir_nodeid
445
* equals our_nodeid below).
446
*/
447
448
static int recover_master(struct dlm_rsb *r, unsigned int *count, uint64_t seq)
449
{
450
struct dlm_ls *ls = r->res_ls;
451
int our_nodeid, dir_nodeid;
452
int is_removed = 0;
453
int error;
454
455
if (r->res_nodeid != -1 && is_master(r))
456
return 0;
457
458
if (r->res_nodeid != -1)
459
is_removed = dlm_is_removed(ls, r->res_nodeid);
460
461
if (!is_removed && !rsb_flag(r, RSB_NEW_MASTER))
462
return 0;
463
464
our_nodeid = dlm_our_nodeid();
465
dir_nodeid = dlm_dir_nodeid(r);
466
467
if (dir_nodeid == our_nodeid) {
468
if (is_removed) {
469
r->res_master_nodeid = our_nodeid;
470
r->res_nodeid = 0;
471
}
472
473
/* set master of lkbs to ourself when is_removed, or to
474
another new master which we set along with NEW_MASTER
475
in dlm_master_lookup */
476
set_new_master(r);
477
error = 0;
478
} else {
479
recover_xa_add(r);
480
error = dlm_send_rcom_lookup(r, dir_nodeid, seq);
481
}
482
483
(*count)++;
484
return error;
485
}
486
487
/*
488
* All MSTCPY locks are purged and rebuilt, even if the master stayed the same.
489
* This is necessary because recovery can be started, aborted and restarted,
490
* causing the master nodeid to briefly change during the aborted recovery, and
491
* change back to the original value in the second recovery. The MSTCPY locks
492
* may or may not have been purged during the aborted recovery. Another node
493
* with an outstanding request in waiters list and a request reply saved in the
494
* requestqueue, cannot know whether it should ignore the reply and resend the
495
* request, or accept the reply and complete the request. It must do the
496
* former if the remote node purged MSTCPY locks, and it must do the later if
497
* the remote node did not. This is solved by always purging MSTCPY locks, in
498
* which case, the request reply would always be ignored and the request
499
* resent.
500
*/
501
502
static int recover_master_static(struct dlm_rsb *r, unsigned int *count)
503
{
504
int dir_nodeid = dlm_dir_nodeid(r);
505
int new_master = dir_nodeid;
506
507
if (dir_nodeid == dlm_our_nodeid())
508
new_master = 0;
509
510
dlm_purge_mstcpy_locks(r);
511
r->res_master_nodeid = dir_nodeid;
512
r->res_nodeid = new_master;
513
set_new_master(r);
514
(*count)++;
515
return 0;
516
}
517
518
/*
519
* Go through local root resources and for each rsb which has a master which
520
* has departed, get the new master nodeid from the directory. The dir will
521
* assign mastery to the first node to look up the new master. That means
522
* we'll discover in this lookup if we're the new master of any rsb's.
523
*
524
* We fire off all the dir lookup requests individually and asynchronously to
525
* the correct dir node.
526
*/
527
528
int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq,
529
const struct list_head *root_list)
530
{
531
struct dlm_rsb *r;
532
unsigned int total = 0;
533
unsigned int count = 0;
534
int nodir = dlm_no_directory(ls);
535
int error;
536
537
log_rinfo(ls, "dlm_recover_masters");
538
539
list_for_each_entry(r, root_list, res_root_list) {
540
if (dlm_recovery_stopped(ls)) {
541
error = -EINTR;
542
goto out;
543
}
544
545
lock_rsb(r);
546
if (nodir)
547
error = recover_master_static(r, &count);
548
else
549
error = recover_master(r, &count, seq);
550
unlock_rsb(r);
551
cond_resched();
552
total++;
553
554
if (error)
555
goto out;
556
}
557
558
log_rinfo(ls, "dlm_recover_masters %u of %u", count, total);
559
560
error = dlm_wait_function(ls, &recover_xa_empty);
561
out:
562
if (error)
563
recover_xa_clear(ls);
564
return error;
565
}
566
567
int dlm_recover_master_reply(struct dlm_ls *ls, const struct dlm_rcom *rc)
568
{
569
struct dlm_rsb *r;
570
int ret_nodeid, new_master;
571
572
r = recover_xa_find(ls, le64_to_cpu(rc->rc_id));
573
if (!r) {
574
log_error(ls, "dlm_recover_master_reply no id %llx",
575
(unsigned long long)le64_to_cpu(rc->rc_id));
576
goto out;
577
}
578
579
ret_nodeid = le32_to_cpu(rc->rc_result);
580
581
if (ret_nodeid == dlm_our_nodeid())
582
new_master = 0;
583
else
584
new_master = ret_nodeid;
585
586
lock_rsb(r);
587
r->res_master_nodeid = ret_nodeid;
588
r->res_nodeid = new_master;
589
set_new_master(r);
590
unlock_rsb(r);
591
recover_xa_del(r);
592
593
if (recover_xa_empty(ls))
594
wake_up(&ls->ls_wait_general);
595
out:
596
return 0;
597
}
598
599
600
/* Lock recovery: rebuild the process-copy locks we hold on a
601
remastered rsb on the new rsb master.
602
603
dlm_recover_locks
604
recover_locks
605
recover_locks_queue
606
dlm_send_rcom_lock -> receive_rcom_lock
607
dlm_recover_master_copy
608
receive_rcom_lock_reply <-
609
dlm_recover_process_copy
610
*/
611
612
613
/*
614
* keep a count of the number of lkb's we send to the new master; when we get
615
* an equal number of replies then recovery for the rsb is done
616
*/
617
618
static int recover_locks_queue(struct dlm_rsb *r, struct list_head *head,
619
uint64_t seq)
620
{
621
struct dlm_lkb *lkb;
622
int error = 0;
623
624
list_for_each_entry(lkb, head, lkb_statequeue) {
625
error = dlm_send_rcom_lock(r, lkb, seq);
626
if (error)
627
break;
628
r->res_recover_locks_count++;
629
}
630
631
return error;
632
}
633
634
static int recover_locks(struct dlm_rsb *r, uint64_t seq)
635
{
636
int error = 0;
637
638
lock_rsb(r);
639
640
DLM_ASSERT(!r->res_recover_locks_count, dlm_dump_rsb(r););
641
642
error = recover_locks_queue(r, &r->res_grantqueue, seq);
643
if (error)
644
goto out;
645
error = recover_locks_queue(r, &r->res_convertqueue, seq);
646
if (error)
647
goto out;
648
error = recover_locks_queue(r, &r->res_waitqueue, seq);
649
if (error)
650
goto out;
651
652
if (r->res_recover_locks_count)
653
recover_list_add(r);
654
else
655
rsb_clear_flag(r, RSB_NEW_MASTER);
656
out:
657
unlock_rsb(r);
658
return error;
659
}
660
661
int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq,
662
const struct list_head *root_list)
663
{
664
struct dlm_rsb *r;
665
int error, count = 0;
666
667
list_for_each_entry(r, root_list, res_root_list) {
668
if (r->res_nodeid != -1 && is_master(r)) {
669
rsb_clear_flag(r, RSB_NEW_MASTER);
670
continue;
671
}
672
673
if (!rsb_flag(r, RSB_NEW_MASTER))
674
continue;
675
676
if (dlm_recovery_stopped(ls)) {
677
error = -EINTR;
678
goto out;
679
}
680
681
error = recover_locks(r, seq);
682
if (error)
683
goto out;
684
685
count += r->res_recover_locks_count;
686
}
687
688
log_rinfo(ls, "dlm_recover_locks %d out", count);
689
690
error = dlm_wait_function(ls, &recover_list_empty);
691
out:
692
if (error)
693
recover_list_clear(ls);
694
return error;
695
}
696
697
void dlm_recovered_lock(struct dlm_rsb *r)
698
{
699
DLM_ASSERT(rsb_flag(r, RSB_NEW_MASTER), dlm_dump_rsb(r););
700
701
r->res_recover_locks_count--;
702
if (!r->res_recover_locks_count) {
703
rsb_clear_flag(r, RSB_NEW_MASTER);
704
recover_list_del(r);
705
}
706
707
if (recover_list_empty(r->res_ls))
708
wake_up(&r->res_ls->ls_wait_general);
709
}
710
711
/*
712
* The lvb needs to be recovered on all master rsb's. This includes setting
713
* the VALNOTVALID flag if necessary, and determining the correct lvb contents
714
* based on the lvb's of the locks held on the rsb.
715
*
716
* RSB_VALNOTVALID is set in two cases:
717
*
718
* 1. we are master, but not new, and we purged an EX/PW lock held by a
719
* failed node (in dlm_recover_purge which set RSB_RECOVER_LVB_INVAL)
720
*
721
* 2. we are a new master, and there are only NL/CR locks left.
722
* (We could probably improve this by only invaliding in this way when
723
* the previous master left uncleanly. VMS docs mention that.)
724
*
725
* The LVB contents are only considered for changing when this is a new master
726
* of the rsb (NEW_MASTER2). Then, the rsb's lvb is taken from any lkb with
727
* mode > CR. If no lkb's exist with mode above CR, the lvb contents are taken
728
* from the lkb with the largest lvb sequence number.
729
*/
730
731
static void recover_lvb(struct dlm_rsb *r)
732
{
733
struct dlm_lkb *big_lkb = NULL, *iter, *high_lkb = NULL;
734
uint32_t high_seq = 0;
735
int lock_lvb_exists = 0;
736
int lvblen = r->res_ls->ls_lvblen;
737
738
if (!rsb_flag(r, RSB_NEW_MASTER2) &&
739
rsb_flag(r, RSB_RECOVER_LVB_INVAL)) {
740
/* case 1 above */
741
rsb_set_flag(r, RSB_VALNOTVALID);
742
return;
743
}
744
745
if (!rsb_flag(r, RSB_NEW_MASTER2))
746
return;
747
748
/* we are the new master, so figure out if VALNOTVALID should
749
be set, and set the rsb lvb from the best lkb available. */
750
751
list_for_each_entry(iter, &r->res_grantqueue, lkb_statequeue) {
752
if (!(iter->lkb_exflags & DLM_LKF_VALBLK))
753
continue;
754
755
lock_lvb_exists = 1;
756
757
if (iter->lkb_grmode > DLM_LOCK_CR) {
758
big_lkb = iter;
759
goto setflag;
760
}
761
762
if (((int)iter->lkb_lvbseq - (int)high_seq) >= 0) {
763
high_lkb = iter;
764
high_seq = iter->lkb_lvbseq;
765
}
766
}
767
768
list_for_each_entry(iter, &r->res_convertqueue, lkb_statequeue) {
769
if (!(iter->lkb_exflags & DLM_LKF_VALBLK))
770
continue;
771
772
lock_lvb_exists = 1;
773
774
if (iter->lkb_grmode > DLM_LOCK_CR) {
775
big_lkb = iter;
776
goto setflag;
777
}
778
779
if (((int)iter->lkb_lvbseq - (int)high_seq) >= 0) {
780
high_lkb = iter;
781
high_seq = iter->lkb_lvbseq;
782
}
783
}
784
785
setflag:
786
if (!lock_lvb_exists)
787
goto out;
788
789
/* lvb is invalidated if only NL/CR locks remain */
790
if (!big_lkb)
791
rsb_set_flag(r, RSB_VALNOTVALID);
792
793
if (!r->res_lvbptr) {
794
r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
795
if (!r->res_lvbptr)
796
goto out;
797
}
798
799
if (big_lkb) {
800
r->res_lvbseq = big_lkb->lkb_lvbseq;
801
memcpy(r->res_lvbptr, big_lkb->lkb_lvbptr, lvblen);
802
} else if (high_lkb) {
803
r->res_lvbseq = high_lkb->lkb_lvbseq;
804
memcpy(r->res_lvbptr, high_lkb->lkb_lvbptr, lvblen);
805
} else {
806
r->res_lvbseq = 0;
807
memset(r->res_lvbptr, 0, lvblen);
808
}
809
out:
810
return;
811
}
812
813
/* All master rsb's flagged RECOVER_CONVERT need to be looked at. The locks
814
* converting PR->CW or CW->PR may need to have their lkb_grmode changed.
815
*/
816
817
static void recover_conversion(struct dlm_rsb *r)
818
{
819
struct dlm_ls *ls = r->res_ls;
820
uint32_t other_lkid = 0;
821
int other_grmode = -1;
822
struct dlm_lkb *lkb;
823
824
list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) {
825
if (lkb->lkb_grmode == DLM_LOCK_PR ||
826
lkb->lkb_grmode == DLM_LOCK_CW) {
827
other_grmode = lkb->lkb_grmode;
828
other_lkid = lkb->lkb_id;
829
break;
830
}
831
}
832
833
if (other_grmode == -1)
834
return;
835
836
list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) {
837
/* Lock recovery created incompatible granted modes, so
838
* change the granted mode of the converting lock to
839
* NL. The rqmode of the converting lock should be CW,
840
* which means the converting lock should be granted at
841
* the end of recovery.
842
*/
843
if (((lkb->lkb_grmode == DLM_LOCK_PR) && (other_grmode == DLM_LOCK_CW)) ||
844
((lkb->lkb_grmode == DLM_LOCK_CW) && (other_grmode == DLM_LOCK_PR))) {
845
log_limit(ls, "%s %x gr %d rq %d, remote %d %x, other_lkid %u, other gr %d, set gr=NL",
846
__func__, lkb->lkb_id, lkb->lkb_grmode,
847
lkb->lkb_rqmode, lkb->lkb_nodeid,
848
lkb->lkb_remid, other_lkid, other_grmode);
849
lkb->lkb_grmode = DLM_LOCK_NL;
850
}
851
}
852
}
853
854
/* We've become the new master for this rsb and waiting/converting locks may
855
need to be granted in dlm_recover_grant() due to locks that may have
856
existed from a removed node. */
857
858
static void recover_grant(struct dlm_rsb *r)
859
{
860
if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
861
rsb_set_flag(r, RSB_RECOVER_GRANT);
862
}
863
864
void dlm_recover_rsbs(struct dlm_ls *ls, const struct list_head *root_list)
865
{
866
struct dlm_rsb *r;
867
unsigned int count = 0;
868
869
list_for_each_entry(r, root_list, res_root_list) {
870
lock_rsb(r);
871
if (r->res_nodeid != -1 && is_master(r)) {
872
if (rsb_flag(r, RSB_RECOVER_CONVERT))
873
recover_conversion(r);
874
875
/* recover lvb before granting locks so the updated
876
lvb/VALNOTVALID is presented in the completion */
877
recover_lvb(r);
878
879
if (rsb_flag(r, RSB_NEW_MASTER2))
880
recover_grant(r);
881
count++;
882
} else {
883
rsb_clear_flag(r, RSB_VALNOTVALID);
884
}
885
rsb_clear_flag(r, RSB_RECOVER_CONVERT);
886
rsb_clear_flag(r, RSB_RECOVER_LVB_INVAL);
887
rsb_clear_flag(r, RSB_NEW_MASTER2);
888
unlock_rsb(r);
889
}
890
891
if (count)
892
log_rinfo(ls, "dlm_recover_rsbs %d done", count);
893
}
894
895
void dlm_clear_inactive(struct dlm_ls *ls)
896
{
897
struct dlm_rsb *r, *safe;
898
unsigned int count = 0;
899
900
write_lock_bh(&ls->ls_rsbtbl_lock);
901
list_for_each_entry_safe(r, safe, &ls->ls_slow_inactive, res_slow_list) {
902
list_del(&r->res_slow_list);
903
rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
904
dlm_rhash_rsb_params);
905
906
if (!list_empty(&r->res_scan_list))
907
list_del_init(&r->res_scan_list);
908
909
free_inactive_rsb(r);
910
count++;
911
}
912
write_unlock_bh(&ls->ls_rsbtbl_lock);
913
914
if (count)
915
log_rinfo(ls, "dlm_clear_inactive %u done", count);
916
}
917
918
919