Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/fs/dlm/lockspace.c
26278 views
1
// SPDX-License-Identifier: GPL-2.0-only
2
/******************************************************************************
3
*******************************************************************************
4
**
5
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
6
** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved.
7
**
8
**
9
*******************************************************************************
10
******************************************************************************/
11
12
#include <linux/module.h>
13
14
#include "dlm_internal.h"
15
#include "lockspace.h"
16
#include "member.h"
17
#include "recoverd.h"
18
#include "dir.h"
19
#include "midcomms.h"
20
#include "config.h"
21
#include "memory.h"
22
#include "lock.h"
23
#include "recover.h"
24
#include "requestqueue.h"
25
#include "user.h"
26
#include "ast.h"
27
28
static int ls_count;
29
static struct mutex ls_lock;
30
static struct list_head lslist;
31
static spinlock_t lslist_lock;
32
33
static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
34
{
35
ssize_t ret = len;
36
int n;
37
int rc = kstrtoint(buf, 0, &n);
38
39
if (rc)
40
return rc;
41
ls = dlm_find_lockspace_local(ls);
42
if (!ls)
43
return -EINVAL;
44
45
switch (n) {
46
case 0:
47
dlm_ls_stop(ls);
48
break;
49
case 1:
50
dlm_ls_start(ls);
51
break;
52
default:
53
ret = -EINVAL;
54
}
55
dlm_put_lockspace(ls);
56
return ret;
57
}
58
59
static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
60
{
61
int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
62
63
if (rc)
64
return rc;
65
set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
66
wake_up(&ls->ls_uevent_wait);
67
return len;
68
}
69
70
static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
71
{
72
return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
73
}
74
75
static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
76
{
77
int rc = kstrtouint(buf, 0, &ls->ls_global_id);
78
79
if (rc)
80
return rc;
81
return len;
82
}
83
84
static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
85
{
86
return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
87
}
88
89
static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
90
{
91
int val;
92
int rc = kstrtoint(buf, 0, &val);
93
94
if (rc)
95
return rc;
96
if (val == 1)
97
set_bit(LSFL_NODIR, &ls->ls_flags);
98
return len;
99
}
100
101
static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
102
{
103
uint32_t status = dlm_recover_status(ls);
104
return snprintf(buf, PAGE_SIZE, "%x\n", status);
105
}
106
107
static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
108
{
109
return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
110
}
111
112
struct dlm_attr {
113
struct attribute attr;
114
ssize_t (*show)(struct dlm_ls *, char *);
115
ssize_t (*store)(struct dlm_ls *, const char *, size_t);
116
};
117
118
static struct dlm_attr dlm_attr_control = {
119
.attr = {.name = "control", .mode = S_IWUSR},
120
.store = dlm_control_store
121
};
122
123
static struct dlm_attr dlm_attr_event = {
124
.attr = {.name = "event_done", .mode = S_IWUSR},
125
.store = dlm_event_store
126
};
127
128
static struct dlm_attr dlm_attr_id = {
129
.attr = {.name = "id", .mode = S_IRUGO | S_IWUSR},
130
.show = dlm_id_show,
131
.store = dlm_id_store
132
};
133
134
static struct dlm_attr dlm_attr_nodir = {
135
.attr = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
136
.show = dlm_nodir_show,
137
.store = dlm_nodir_store
138
};
139
140
static struct dlm_attr dlm_attr_recover_status = {
141
.attr = {.name = "recover_status", .mode = S_IRUGO},
142
.show = dlm_recover_status_show
143
};
144
145
static struct dlm_attr dlm_attr_recover_nodeid = {
146
.attr = {.name = "recover_nodeid", .mode = S_IRUGO},
147
.show = dlm_recover_nodeid_show
148
};
149
150
static struct attribute *dlm_attrs[] = {
151
&dlm_attr_control.attr,
152
&dlm_attr_event.attr,
153
&dlm_attr_id.attr,
154
&dlm_attr_nodir.attr,
155
&dlm_attr_recover_status.attr,
156
&dlm_attr_recover_nodeid.attr,
157
NULL,
158
};
159
ATTRIBUTE_GROUPS(dlm);
160
161
static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
162
char *buf)
163
{
164
struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
165
struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
166
return a->show ? a->show(ls, buf) : 0;
167
}
168
169
static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
170
const char *buf, size_t len)
171
{
172
struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
173
struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
174
return a->store ? a->store(ls, buf, len) : len;
175
}
176
177
static const struct sysfs_ops dlm_attr_ops = {
178
.show = dlm_attr_show,
179
.store = dlm_attr_store,
180
};
181
182
static struct kobj_type dlm_ktype = {
183
.default_groups = dlm_groups,
184
.sysfs_ops = &dlm_attr_ops,
185
};
186
187
static struct kset *dlm_kset;
188
189
static int do_uevent(struct dlm_ls *ls, int in)
190
{
191
if (in)
192
kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
193
else
194
kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
195
196
log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
197
198
/* dlm_controld will see the uevent, do the necessary group management
199
and then write to sysfs to wake us */
200
201
wait_event(ls->ls_uevent_wait,
202
test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
203
204
log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
205
206
return ls->ls_uevent_result;
207
}
208
209
static int dlm_uevent(const struct kobject *kobj, struct kobj_uevent_env *env)
210
{
211
const struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
212
213
add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
214
return 0;
215
}
216
217
static const struct kset_uevent_ops dlm_uevent_ops = {
218
.uevent = dlm_uevent,
219
};
220
221
int __init dlm_lockspace_init(void)
222
{
223
ls_count = 0;
224
mutex_init(&ls_lock);
225
INIT_LIST_HEAD(&lslist);
226
spin_lock_init(&lslist_lock);
227
228
dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
229
if (!dlm_kset) {
230
printk(KERN_WARNING "%s: can not create kset\n", __func__);
231
return -ENOMEM;
232
}
233
return 0;
234
}
235
236
void dlm_lockspace_exit(void)
237
{
238
kset_unregister(dlm_kset);
239
}
240
241
struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
242
{
243
struct dlm_ls *ls;
244
245
spin_lock_bh(&lslist_lock);
246
247
list_for_each_entry(ls, &lslist, ls_list) {
248
if (ls->ls_global_id == id) {
249
atomic_inc(&ls->ls_count);
250
goto out;
251
}
252
}
253
ls = NULL;
254
out:
255
spin_unlock_bh(&lslist_lock);
256
return ls;
257
}
258
259
struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
260
{
261
struct dlm_ls *ls = lockspace;
262
263
atomic_inc(&ls->ls_count);
264
return ls;
265
}
266
267
struct dlm_ls *dlm_find_lockspace_device(int minor)
268
{
269
struct dlm_ls *ls;
270
271
spin_lock_bh(&lslist_lock);
272
list_for_each_entry(ls, &lslist, ls_list) {
273
if (ls->ls_device.minor == minor) {
274
atomic_inc(&ls->ls_count);
275
goto out;
276
}
277
}
278
ls = NULL;
279
out:
280
spin_unlock_bh(&lslist_lock);
281
return ls;
282
}
283
284
void dlm_put_lockspace(struct dlm_ls *ls)
285
{
286
if (atomic_dec_and_test(&ls->ls_count))
287
wake_up(&ls->ls_count_wait);
288
}
289
290
static void remove_lockspace(struct dlm_ls *ls)
291
{
292
retry:
293
wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
294
295
spin_lock_bh(&lslist_lock);
296
if (atomic_read(&ls->ls_count) != 0) {
297
spin_unlock_bh(&lslist_lock);
298
goto retry;
299
}
300
301
WARN_ON(ls->ls_create_count != 0);
302
list_del(&ls->ls_list);
303
spin_unlock_bh(&lslist_lock);
304
}
305
306
static int threads_start(void)
307
{
308
int error;
309
310
/* Thread for sending/receiving messages for all lockspace's */
311
error = dlm_midcomms_start();
312
if (error)
313
log_print("cannot start dlm midcomms %d", error);
314
315
return error;
316
}
317
318
static int lkb_idr_free(struct dlm_lkb *lkb)
319
{
320
if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
321
dlm_free_lvb(lkb->lkb_lvbptr);
322
323
dlm_free_lkb(lkb);
324
return 0;
325
}
326
327
static void rhash_free_rsb(void *ptr, void *arg)
328
{
329
struct dlm_rsb *rsb = ptr;
330
331
dlm_free_rsb(rsb);
332
}
333
334
static void free_lockspace(struct work_struct *work)
335
{
336
struct dlm_ls *ls = container_of(work, struct dlm_ls, ls_free_work);
337
struct dlm_lkb *lkb;
338
unsigned long id;
339
340
/*
341
* Free all lkb's in xa
342
*/
343
xa_for_each(&ls->ls_lkbxa, id, lkb) {
344
lkb_idr_free(lkb);
345
}
346
xa_destroy(&ls->ls_lkbxa);
347
348
/*
349
* Free all rsb's on rsbtbl
350
*/
351
rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL);
352
353
kfree(ls);
354
}
355
356
static int new_lockspace(const char *name, const char *cluster,
357
uint32_t flags, int lvblen,
358
const struct dlm_lockspace_ops *ops, void *ops_arg,
359
int *ops_result, dlm_lockspace_t **lockspace)
360
{
361
struct dlm_ls *ls;
362
int namelen = strlen(name);
363
int error;
364
365
if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
366
return -EINVAL;
367
368
if (lvblen % 8)
369
return -EINVAL;
370
371
if (!try_module_get(THIS_MODULE))
372
return -EINVAL;
373
374
if (!dlm_user_daemon_available()) {
375
log_print("dlm user daemon not available");
376
error = -EUNATCH;
377
goto out;
378
}
379
380
if (ops && ops_result) {
381
if (!dlm_config.ci_recover_callbacks)
382
*ops_result = -EOPNOTSUPP;
383
else
384
*ops_result = 0;
385
}
386
387
if (!cluster)
388
log_print("dlm cluster name '%s' is being used without an application provided cluster name",
389
dlm_config.ci_cluster_name);
390
391
if (dlm_config.ci_recover_callbacks && cluster &&
392
strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
393
log_print("dlm cluster name '%s' does not match "
394
"the application cluster name '%s'",
395
dlm_config.ci_cluster_name, cluster);
396
error = -EBADR;
397
goto out;
398
}
399
400
error = 0;
401
402
spin_lock_bh(&lslist_lock);
403
list_for_each_entry(ls, &lslist, ls_list) {
404
WARN_ON(ls->ls_create_count <= 0);
405
if (ls->ls_namelen != namelen)
406
continue;
407
if (memcmp(ls->ls_name, name, namelen))
408
continue;
409
if (flags & DLM_LSFL_NEWEXCL) {
410
error = -EEXIST;
411
break;
412
}
413
ls->ls_create_count++;
414
*lockspace = ls;
415
error = 1;
416
break;
417
}
418
spin_unlock_bh(&lslist_lock);
419
420
if (error)
421
goto out;
422
423
error = -ENOMEM;
424
425
ls = kzalloc(sizeof(*ls), GFP_NOFS);
426
if (!ls)
427
goto out;
428
memcpy(ls->ls_name, name, namelen);
429
ls->ls_namelen = namelen;
430
ls->ls_lvblen = lvblen;
431
atomic_set(&ls->ls_count, 0);
432
init_waitqueue_head(&ls->ls_count_wait);
433
ls->ls_flags = 0;
434
435
if (ops && dlm_config.ci_recover_callbacks) {
436
ls->ls_ops = ops;
437
ls->ls_ops_arg = ops_arg;
438
}
439
440
if (flags & DLM_LSFL_SOFTIRQ)
441
set_bit(LSFL_SOFTIRQ, &ls->ls_flags);
442
443
/* ls_exflags are forced to match among nodes, and we don't
444
* need to require all nodes to have some flags set
445
*/
446
ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL |
447
DLM_LSFL_SOFTIRQ));
448
449
INIT_LIST_HEAD(&ls->ls_slow_inactive);
450
INIT_LIST_HEAD(&ls->ls_slow_active);
451
rwlock_init(&ls->ls_rsbtbl_lock);
452
453
error = rhashtable_init(&ls->ls_rsbtbl, &dlm_rhash_rsb_params);
454
if (error)
455
goto out_lsfree;
456
457
xa_init_flags(&ls->ls_lkbxa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH);
458
rwlock_init(&ls->ls_lkbxa_lock);
459
460
INIT_LIST_HEAD(&ls->ls_waiters);
461
spin_lock_init(&ls->ls_waiters_lock);
462
INIT_LIST_HEAD(&ls->ls_orphans);
463
spin_lock_init(&ls->ls_orphans_lock);
464
465
INIT_LIST_HEAD(&ls->ls_nodes);
466
INIT_LIST_HEAD(&ls->ls_nodes_gone);
467
ls->ls_num_nodes = 0;
468
ls->ls_low_nodeid = 0;
469
ls->ls_total_weight = 0;
470
ls->ls_node_array = NULL;
471
472
memset(&ls->ls_local_rsb, 0, sizeof(struct dlm_rsb));
473
ls->ls_local_rsb.res_ls = ls;
474
475
ls->ls_debug_rsb_dentry = NULL;
476
ls->ls_debug_waiters_dentry = NULL;
477
478
init_waitqueue_head(&ls->ls_uevent_wait);
479
ls->ls_uevent_result = 0;
480
init_completion(&ls->ls_recovery_done);
481
ls->ls_recovery_result = -1;
482
483
spin_lock_init(&ls->ls_cb_lock);
484
INIT_LIST_HEAD(&ls->ls_cb_delay);
485
486
INIT_WORK(&ls->ls_free_work, free_lockspace);
487
488
ls->ls_recoverd_task = NULL;
489
mutex_init(&ls->ls_recoverd_active);
490
spin_lock_init(&ls->ls_recover_lock);
491
spin_lock_init(&ls->ls_rcom_spin);
492
get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
493
ls->ls_recover_status = 0;
494
ls->ls_recover_seq = get_random_u64();
495
ls->ls_recover_args = NULL;
496
init_rwsem(&ls->ls_in_recovery);
497
rwlock_init(&ls->ls_recv_active);
498
INIT_LIST_HEAD(&ls->ls_requestqueue);
499
rwlock_init(&ls->ls_requestqueue_lock);
500
spin_lock_init(&ls->ls_clear_proc_locks);
501
502
/* Due backwards compatibility with 3.1 we need to use maximum
503
* possible dlm message size to be sure the message will fit and
504
* not having out of bounds issues. However on sending side 3.2
505
* might send less.
506
*/
507
ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
508
if (!ls->ls_recover_buf) {
509
error = -ENOMEM;
510
goto out_lkbxa;
511
}
512
513
ls->ls_slot = 0;
514
ls->ls_num_slots = 0;
515
ls->ls_slots_size = 0;
516
ls->ls_slots = NULL;
517
518
INIT_LIST_HEAD(&ls->ls_recover_list);
519
spin_lock_init(&ls->ls_recover_list_lock);
520
xa_init_flags(&ls->ls_recover_xa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH);
521
spin_lock_init(&ls->ls_recover_xa_lock);
522
ls->ls_recover_list_count = 0;
523
init_waitqueue_head(&ls->ls_wait_general);
524
INIT_LIST_HEAD(&ls->ls_masters_list);
525
rwlock_init(&ls->ls_masters_lock);
526
INIT_LIST_HEAD(&ls->ls_dir_dump_list);
527
rwlock_init(&ls->ls_dir_dump_lock);
528
529
INIT_LIST_HEAD(&ls->ls_scan_list);
530
spin_lock_init(&ls->ls_scan_lock);
531
timer_setup(&ls->ls_scan_timer, dlm_rsb_scan, TIMER_DEFERRABLE);
532
533
spin_lock_bh(&lslist_lock);
534
ls->ls_create_count = 1;
535
list_add(&ls->ls_list, &lslist);
536
spin_unlock_bh(&lslist_lock);
537
538
if (flags & DLM_LSFL_FS)
539
set_bit(LSFL_FS, &ls->ls_flags);
540
541
error = dlm_callback_start(ls);
542
if (error) {
543
log_error(ls, "can't start dlm_callback %d", error);
544
goto out_delist;
545
}
546
547
init_waitqueue_head(&ls->ls_recover_lock_wait);
548
549
/*
550
* Once started, dlm_recoverd first looks for ls in lslist, then
551
* initializes ls_in_recovery as locked in "down" mode. We need
552
* to wait for the wakeup from dlm_recoverd because in_recovery
553
* has to start out in down mode.
554
*/
555
556
error = dlm_recoverd_start(ls);
557
if (error) {
558
log_error(ls, "can't start dlm_recoverd %d", error);
559
goto out_callback;
560
}
561
562
wait_event(ls->ls_recover_lock_wait,
563
test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
564
565
ls->ls_kobj.kset = dlm_kset;
566
error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
567
"%s", ls->ls_name);
568
if (error)
569
goto out_recoverd;
570
kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
571
572
/* This uevent triggers dlm_controld in userspace to add us to the
573
group of nodes that are members of this lockspace (managed by the
574
cluster infrastructure.) Once it's done that, it tells us who the
575
current lockspace members are (via configfs) and then tells the
576
lockspace to start running (via sysfs) in dlm_ls_start(). */
577
578
error = do_uevent(ls, 1);
579
if (error < 0)
580
goto out_recoverd;
581
582
/* wait until recovery is successful or failed */
583
wait_for_completion(&ls->ls_recovery_done);
584
error = ls->ls_recovery_result;
585
if (error)
586
goto out_members;
587
588
dlm_create_debug_file(ls);
589
590
log_rinfo(ls, "join complete");
591
*lockspace = ls;
592
return 0;
593
594
out_members:
595
do_uevent(ls, 0);
596
dlm_clear_members(ls);
597
kfree(ls->ls_node_array);
598
out_recoverd:
599
dlm_recoverd_stop(ls);
600
out_callback:
601
dlm_callback_stop(ls);
602
out_delist:
603
spin_lock_bh(&lslist_lock);
604
list_del(&ls->ls_list);
605
spin_unlock_bh(&lslist_lock);
606
xa_destroy(&ls->ls_recover_xa);
607
kfree(ls->ls_recover_buf);
608
out_lkbxa:
609
xa_destroy(&ls->ls_lkbxa);
610
rhashtable_destroy(&ls->ls_rsbtbl);
611
out_lsfree:
612
kobject_put(&ls->ls_kobj);
613
kfree(ls);
614
out:
615
module_put(THIS_MODULE);
616
return error;
617
}
618
619
static int __dlm_new_lockspace(const char *name, const char *cluster,
620
uint32_t flags, int lvblen,
621
const struct dlm_lockspace_ops *ops,
622
void *ops_arg, int *ops_result,
623
dlm_lockspace_t **lockspace)
624
{
625
int error = 0;
626
627
mutex_lock(&ls_lock);
628
if (!ls_count)
629
error = threads_start();
630
if (error)
631
goto out;
632
633
error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
634
ops_result, lockspace);
635
if (!error)
636
ls_count++;
637
if (error > 0)
638
error = 0;
639
if (!ls_count) {
640
dlm_midcomms_shutdown();
641
dlm_midcomms_stop();
642
}
643
out:
644
mutex_unlock(&ls_lock);
645
return error;
646
}
647
648
int dlm_new_lockspace(const char *name, const char *cluster, uint32_t flags,
649
int lvblen, const struct dlm_lockspace_ops *ops,
650
void *ops_arg, int *ops_result,
651
dlm_lockspace_t **lockspace)
652
{
653
return __dlm_new_lockspace(name, cluster, flags | DLM_LSFL_FS, lvblen,
654
ops, ops_arg, ops_result, lockspace);
655
}
656
657
int dlm_new_user_lockspace(const char *name, const char *cluster,
658
uint32_t flags, int lvblen,
659
const struct dlm_lockspace_ops *ops,
660
void *ops_arg, int *ops_result,
661
dlm_lockspace_t **lockspace)
662
{
663
if (flags & DLM_LSFL_SOFTIRQ)
664
return -EINVAL;
665
666
return __dlm_new_lockspace(name, cluster, flags, lvblen, ops,
667
ops_arg, ops_result, lockspace);
668
}
669
670
/* NOTE: We check the lkbxa here rather than the resource table.
671
This is because there may be LKBs queued as ASTs that have been unlinked
672
from their RSBs and are pending deletion once the AST has been delivered */
673
674
static int lockspace_busy(struct dlm_ls *ls, int force)
675
{
676
struct dlm_lkb *lkb;
677
unsigned long id;
678
int rv = 0;
679
680
read_lock_bh(&ls->ls_lkbxa_lock);
681
if (force == 0) {
682
xa_for_each(&ls->ls_lkbxa, id, lkb) {
683
rv = 1;
684
break;
685
}
686
} else if (force == 1) {
687
xa_for_each(&ls->ls_lkbxa, id, lkb) {
688
if (lkb->lkb_nodeid == 0 &&
689
lkb->lkb_grmode != DLM_LOCK_IV) {
690
rv = 1;
691
break;
692
}
693
}
694
} else {
695
rv = 0;
696
}
697
read_unlock_bh(&ls->ls_lkbxa_lock);
698
return rv;
699
}
700
701
static int release_lockspace(struct dlm_ls *ls, int force)
702
{
703
int busy, rv;
704
705
busy = lockspace_busy(ls, force);
706
707
spin_lock_bh(&lslist_lock);
708
if (ls->ls_create_count == 1) {
709
if (busy) {
710
rv = -EBUSY;
711
} else {
712
/* remove_lockspace takes ls off lslist */
713
ls->ls_create_count = 0;
714
rv = 0;
715
}
716
} else if (ls->ls_create_count > 1) {
717
rv = --ls->ls_create_count;
718
} else {
719
rv = -EINVAL;
720
}
721
spin_unlock_bh(&lslist_lock);
722
723
if (rv) {
724
log_debug(ls, "release_lockspace no remove %d", rv);
725
return rv;
726
}
727
728
if (ls_count == 1)
729
dlm_midcomms_version_wait();
730
731
dlm_device_deregister(ls);
732
733
if (force < 3 && dlm_user_daemon_available())
734
do_uevent(ls, 0);
735
736
dlm_recoverd_stop(ls);
737
738
/* clear the LSFL_RUNNING flag to fast up
739
* time_shutdown_sync(), we don't care anymore
740
*/
741
clear_bit(LSFL_RUNNING, &ls->ls_flags);
742
timer_shutdown_sync(&ls->ls_scan_timer);
743
744
if (ls_count == 1) {
745
dlm_clear_members(ls);
746
dlm_midcomms_shutdown();
747
}
748
749
dlm_callback_stop(ls);
750
751
remove_lockspace(ls);
752
753
dlm_delete_debug_file(ls);
754
755
kobject_put(&ls->ls_kobj);
756
757
xa_destroy(&ls->ls_recover_xa);
758
kfree(ls->ls_recover_buf);
759
760
/*
761
* Free structures on any other lists
762
*/
763
764
dlm_purge_requestqueue(ls);
765
kfree(ls->ls_recover_args);
766
dlm_clear_members(ls);
767
dlm_clear_members_gone(ls);
768
kfree(ls->ls_node_array);
769
770
log_rinfo(ls, "%s final free", __func__);
771
772
/* delayed free of data structures see free_lockspace() */
773
queue_work(dlm_wq, &ls->ls_free_work);
774
module_put(THIS_MODULE);
775
return 0;
776
}
777
778
/*
779
* Called when a system has released all its locks and is not going to use the
780
* lockspace any longer. We free everything we're managing for this lockspace.
781
* Remaining nodes will go through the recovery process as if we'd died. The
782
* lockspace must continue to function as usual, participating in recoveries,
783
* until this returns.
784
*
785
* Force has 4 possible values:
786
* 0 - don't destroy lockspace if it has any LKBs
787
* 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
788
* 2 - destroy lockspace regardless of LKBs
789
* 3 - destroy lockspace as part of a forced shutdown
790
*/
791
792
int dlm_release_lockspace(void *lockspace, int force)
793
{
794
struct dlm_ls *ls;
795
int error;
796
797
ls = dlm_find_lockspace_local(lockspace);
798
if (!ls)
799
return -EINVAL;
800
dlm_put_lockspace(ls);
801
802
mutex_lock(&ls_lock);
803
error = release_lockspace(ls, force);
804
if (!error)
805
ls_count--;
806
if (!ls_count)
807
dlm_midcomms_stop();
808
mutex_unlock(&ls_lock);
809
810
return error;
811
}
812
813
void dlm_stop_lockspaces(void)
814
{
815
struct dlm_ls *ls;
816
int count;
817
818
restart:
819
count = 0;
820
spin_lock_bh(&lslist_lock);
821
list_for_each_entry(ls, &lslist, ls_list) {
822
if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
823
count++;
824
continue;
825
}
826
spin_unlock_bh(&lslist_lock);
827
log_error(ls, "no userland control daemon, stopping lockspace");
828
dlm_ls_stop(ls);
829
goto restart;
830
}
831
spin_unlock_bh(&lslist_lock);
832
833
if (count)
834
log_print("dlm user daemon left %d lockspaces", count);
835
}
836
837