Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/fs/dlm/lock.c
26278 views
1
// SPDX-License-Identifier: GPL-2.0-only
2
/******************************************************************************
3
*******************************************************************************
4
**
5
** Copyright (C) 2005-2010 Red Hat, Inc. All rights reserved.
6
**
7
**
8
*******************************************************************************
9
******************************************************************************/
10
11
/* Central locking logic has four stages:
12
13
dlm_lock()
14
dlm_unlock()
15
16
request_lock(ls, lkb)
17
convert_lock(ls, lkb)
18
unlock_lock(ls, lkb)
19
cancel_lock(ls, lkb)
20
21
_request_lock(r, lkb)
22
_convert_lock(r, lkb)
23
_unlock_lock(r, lkb)
24
_cancel_lock(r, lkb)
25
26
do_request(r, lkb)
27
do_convert(r, lkb)
28
do_unlock(r, lkb)
29
do_cancel(r, lkb)
30
31
Stage 1 (lock, unlock) is mainly about checking input args and
32
splitting into one of the four main operations:
33
34
dlm_lock = request_lock
35
dlm_lock+CONVERT = convert_lock
36
dlm_unlock = unlock_lock
37
dlm_unlock+CANCEL = cancel_lock
38
39
Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
40
provided to the next stage.
41
42
Stage 3, _xxxx_lock(), determines if the operation is local or remote.
43
When remote, it calls send_xxxx(), when local it calls do_xxxx().
44
45
Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
46
given rsb and lkb and queues callbacks.
47
48
For remote operations, send_xxxx() results in the corresponding do_xxxx()
49
function being executed on the remote node. The connecting send/receive
50
calls on local (L) and remote (R) nodes:
51
52
L: send_xxxx() -> R: receive_xxxx()
53
R: do_xxxx()
54
L: receive_xxxx_reply() <- R: send_xxxx_reply()
55
*/
56
#include <trace/events/dlm.h>
57
58
#include <linux/types.h>
59
#include <linux/rbtree.h>
60
#include <linux/slab.h>
61
#include "dlm_internal.h"
62
#include <linux/dlm_device.h>
63
#include "memory.h"
64
#include "midcomms.h"
65
#include "requestqueue.h"
66
#include "util.h"
67
#include "dir.h"
68
#include "member.h"
69
#include "lockspace.h"
70
#include "ast.h"
71
#include "lock.h"
72
#include "rcom.h"
73
#include "recover.h"
74
#include "lvb_table.h"
75
#include "user.h"
76
#include "config.h"
77
78
static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79
static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80
static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81
static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82
static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83
static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84
static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85
static int send_remove(struct dlm_rsb *r);
86
static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87
static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88
static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89
const struct dlm_message *ms, bool local);
90
static int receive_extralen(const struct dlm_message *ms);
91
static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92
static void deactivate_rsb(struct kref *kref);
93
94
/*
95
* Lock compatibilty matrix - thanks Steve
96
* UN = Unlocked state. Not really a state, used as a flag
97
* PD = Padding. Used to make the matrix a nice power of two in size
98
* Other states are the same as the VMS DLM.
99
* Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
100
*/
101
102
static const int __dlm_compat_matrix[8][8] = {
103
/* UN NL CR CW PR PW EX PD */
104
{1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
105
{1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
106
{1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
107
{1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
108
{1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
109
{1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
110
{1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
111
{0, 0, 0, 0, 0, 0, 0, 0} /* PD */
112
};
113
114
/*
115
* This defines the direction of transfer of LVB data.
116
* Granted mode is the row; requested mode is the column.
117
* Usage: matrix[grmode+1][rqmode+1]
118
* 1 = LVB is returned to the caller
119
* 0 = LVB is written to the resource
120
* -1 = nothing happens to the LVB
121
*/
122
123
const int dlm_lvb_operations[8][8] = {
124
/* UN NL CR CW PR PW EX PD*/
125
{ -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
126
{ -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
127
{ -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
128
{ -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
129
{ -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
130
{ -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
131
{ -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
132
{ -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
133
};
134
135
#define modes_compat(gr, rq) \
136
__dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
137
138
int dlm_modes_compat(int mode1, int mode2)
139
{
140
return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
141
}
142
143
/*
144
* Compatibility matrix for conversions with QUECVT set.
145
* Granted mode is the row; requested mode is the column.
146
* Usage: matrix[grmode+1][rqmode+1]
147
*/
148
149
static const int __quecvt_compat_matrix[8][8] = {
150
/* UN NL CR CW PR PW EX PD */
151
{0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
152
{0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
153
{0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
154
{0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
155
{0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
156
{0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
157
{0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
158
{0, 0, 0, 0, 0, 0, 0, 0} /* PD */
159
};
160
161
void dlm_print_lkb(struct dlm_lkb *lkb)
162
{
163
printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
164
"sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
165
lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166
dlm_iflags_val(lkb), lkb->lkb_status, lkb->lkb_rqmode,
167
lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
168
(unsigned long long)lkb->lkb_recover_seq);
169
}
170
171
static void dlm_print_rsb(struct dlm_rsb *r)
172
{
173
printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
174
"rlc %d name %s\n",
175
r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
176
r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
177
r->res_name);
178
}
179
180
void dlm_dump_rsb(struct dlm_rsb *r)
181
{
182
struct dlm_lkb *lkb;
183
184
dlm_print_rsb(r);
185
186
printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
187
list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
188
printk(KERN_ERR "rsb lookup list\n");
189
list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
190
dlm_print_lkb(lkb);
191
printk(KERN_ERR "rsb grant queue:\n");
192
list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
193
dlm_print_lkb(lkb);
194
printk(KERN_ERR "rsb convert queue:\n");
195
list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
196
dlm_print_lkb(lkb);
197
printk(KERN_ERR "rsb wait queue:\n");
198
list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
199
dlm_print_lkb(lkb);
200
}
201
202
/* Threads cannot use the lockspace while it's being recovered */
203
204
void dlm_lock_recovery(struct dlm_ls *ls)
205
{
206
down_read(&ls->ls_in_recovery);
207
}
208
209
void dlm_unlock_recovery(struct dlm_ls *ls)
210
{
211
up_read(&ls->ls_in_recovery);
212
}
213
214
int dlm_lock_recovery_try(struct dlm_ls *ls)
215
{
216
return down_read_trylock(&ls->ls_in_recovery);
217
}
218
219
static inline int can_be_queued(struct dlm_lkb *lkb)
220
{
221
return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
222
}
223
224
static inline int force_blocking_asts(struct dlm_lkb *lkb)
225
{
226
return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
227
}
228
229
static inline int is_demoted(struct dlm_lkb *lkb)
230
{
231
return test_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
232
}
233
234
static inline int is_altmode(struct dlm_lkb *lkb)
235
{
236
return test_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
237
}
238
239
static inline int is_granted(struct dlm_lkb *lkb)
240
{
241
return (lkb->lkb_status == DLM_LKSTS_GRANTED);
242
}
243
244
static inline int is_remote(struct dlm_rsb *r)
245
{
246
DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
247
return !!r->res_nodeid;
248
}
249
250
static inline int is_process_copy(struct dlm_lkb *lkb)
251
{
252
return lkb->lkb_nodeid &&
253
!test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
254
}
255
256
static inline int is_master_copy(struct dlm_lkb *lkb)
257
{
258
return test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
259
}
260
261
static inline int middle_conversion(struct dlm_lkb *lkb)
262
{
263
if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264
(lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
265
return 1;
266
return 0;
267
}
268
269
static inline int down_conversion(struct dlm_lkb *lkb)
270
{
271
return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
272
}
273
274
static inline int is_overlap_unlock(struct dlm_lkb *lkb)
275
{
276
return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
277
}
278
279
static inline int is_overlap_cancel(struct dlm_lkb *lkb)
280
{
281
return test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
282
}
283
284
static inline int is_overlap(struct dlm_lkb *lkb)
285
{
286
return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags) ||
287
test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
288
}
289
290
static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
291
{
292
if (is_master_copy(lkb))
293
return;
294
295
DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
296
297
if (rv == -DLM_ECANCEL &&
298
test_and_clear_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags))
299
rv = -EDEADLK;
300
301
dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, dlm_sbflags_val(lkb));
302
}
303
304
static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
305
{
306
queue_cast(r, lkb,
307
is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
308
}
309
310
static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
311
{
312
if (is_master_copy(lkb)) {
313
send_bast(r, lkb, rqmode);
314
} else {
315
dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
316
}
317
}
318
319
/*
320
* Basic operations on rsb's and lkb's
321
*/
322
323
static inline unsigned long rsb_toss_jiffies(void)
324
{
325
return jiffies + (READ_ONCE(dlm_config.ci_toss_secs) * HZ);
326
}
327
328
/* This is only called to add a reference when the code already holds
329
a valid reference to the rsb, so there's no need for locking. */
330
331
static inline void hold_rsb(struct dlm_rsb *r)
332
{
333
/* inactive rsbs are not ref counted */
334
WARN_ON(rsb_flag(r, RSB_INACTIVE));
335
kref_get(&r->res_ref);
336
}
337
338
void dlm_hold_rsb(struct dlm_rsb *r)
339
{
340
hold_rsb(r);
341
}
342
343
/* TODO move this to lib/refcount.c */
344
static __must_check bool
345
dlm_refcount_dec_and_write_lock_bh(refcount_t *r, rwlock_t *lock)
346
__cond_acquires(lock)
347
{
348
if (refcount_dec_not_one(r))
349
return false;
350
351
write_lock_bh(lock);
352
if (!refcount_dec_and_test(r)) {
353
write_unlock_bh(lock);
354
return false;
355
}
356
357
return true;
358
}
359
360
/* TODO move this to include/linux/kref.h */
361
static inline int dlm_kref_put_write_lock_bh(struct kref *kref,
362
void (*release)(struct kref *kref),
363
rwlock_t *lock)
364
{
365
if (dlm_refcount_dec_and_write_lock_bh(&kref->refcount, lock)) {
366
release(kref);
367
return 1;
368
}
369
370
return 0;
371
}
372
373
static void put_rsb(struct dlm_rsb *r)
374
{
375
struct dlm_ls *ls = r->res_ls;
376
int rv;
377
378
rv = dlm_kref_put_write_lock_bh(&r->res_ref, deactivate_rsb,
379
&ls->ls_rsbtbl_lock);
380
if (rv)
381
write_unlock_bh(&ls->ls_rsbtbl_lock);
382
}
383
384
void dlm_put_rsb(struct dlm_rsb *r)
385
{
386
put_rsb(r);
387
}
388
389
/* connected with timer_delete_sync() in dlm_ls_stop() to stop
390
* new timers when recovery is triggered and don't run them
391
* again until a resume_scan_timer() tries it again.
392
*/
393
static void enable_scan_timer(struct dlm_ls *ls, unsigned long jiffies)
394
{
395
if (!dlm_locking_stopped(ls))
396
mod_timer(&ls->ls_scan_timer, jiffies);
397
}
398
399
/* This function tries to resume the timer callback if a rsb
400
* is on the scan list and no timer is pending. It might that
401
* the first entry is on currently executed as timer callback
402
* but we don't care if a timer queued up again and does
403
* nothing. Should be a rare case.
404
*/
405
void resume_scan_timer(struct dlm_ls *ls)
406
{
407
struct dlm_rsb *r;
408
409
spin_lock_bh(&ls->ls_scan_lock);
410
r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
411
res_scan_list);
412
if (r && !timer_pending(&ls->ls_scan_timer))
413
enable_scan_timer(ls, r->res_toss_time);
414
spin_unlock_bh(&ls->ls_scan_lock);
415
}
416
417
/* ls_rsbtbl_lock must be held */
418
419
static void del_scan(struct dlm_ls *ls, struct dlm_rsb *r)
420
{
421
struct dlm_rsb *first;
422
423
/* active rsbs should never be on the scan list */
424
WARN_ON(!rsb_flag(r, RSB_INACTIVE));
425
426
spin_lock_bh(&ls->ls_scan_lock);
427
r->res_toss_time = 0;
428
429
/* if the rsb is not queued do nothing */
430
if (list_empty(&r->res_scan_list))
431
goto out;
432
433
/* get the first element before delete */
434
first = list_first_entry(&ls->ls_scan_list, struct dlm_rsb,
435
res_scan_list);
436
list_del_init(&r->res_scan_list);
437
/* check if the first element was the rsb we deleted */
438
if (first == r) {
439
/* try to get the new first element, if the list
440
* is empty now try to delete the timer, if we are
441
* too late we don't care.
442
*
443
* if the list isn't empty and a new first element got
444
* in place, set the new timer expire time.
445
*/
446
first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
447
res_scan_list);
448
if (!first)
449
timer_delete(&ls->ls_scan_timer);
450
else
451
enable_scan_timer(ls, first->res_toss_time);
452
}
453
454
out:
455
spin_unlock_bh(&ls->ls_scan_lock);
456
}
457
458
static void add_scan(struct dlm_ls *ls, struct dlm_rsb *r)
459
{
460
int our_nodeid = dlm_our_nodeid();
461
struct dlm_rsb *first;
462
463
/* A dir record for a remote master rsb should never be on the scan list. */
464
WARN_ON(!dlm_no_directory(ls) &&
465
(r->res_master_nodeid != our_nodeid) &&
466
(dlm_dir_nodeid(r) == our_nodeid));
467
468
/* An active rsb should never be on the scan list. */
469
WARN_ON(!rsb_flag(r, RSB_INACTIVE));
470
471
/* An rsb should not already be on the scan list. */
472
WARN_ON(!list_empty(&r->res_scan_list));
473
474
spin_lock_bh(&ls->ls_scan_lock);
475
/* set the new rsb absolute expire time in the rsb */
476
r->res_toss_time = rsb_toss_jiffies();
477
if (list_empty(&ls->ls_scan_list)) {
478
/* if the queue is empty add the element and it's
479
* our new expire time
480
*/
481
list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
482
enable_scan_timer(ls, r->res_toss_time);
483
} else {
484
/* try to get the maybe new first element and then add
485
* to this rsb with the oldest expire time to the end
486
* of the queue. If the list was empty before this
487
* rsb expire time is our next expiration if it wasn't
488
* the now new first elemet is our new expiration time
489
*/
490
first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
491
res_scan_list);
492
list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
493
if (!first)
494
enable_scan_timer(ls, r->res_toss_time);
495
else
496
enable_scan_timer(ls, first->res_toss_time);
497
}
498
spin_unlock_bh(&ls->ls_scan_lock);
499
}
500
501
/* if we hit contention we do in 250 ms a retry to trylock.
502
* if there is any other mod_timer in between we don't care
503
* about that it expires earlier again this is only for the
504
* unlikely case nothing happened in this time.
505
*/
506
#define DLM_TOSS_TIMER_RETRY (jiffies + msecs_to_jiffies(250))
507
508
/* Called by lockspace scan_timer to free unused rsb's. */
509
510
void dlm_rsb_scan(struct timer_list *timer)
511
{
512
struct dlm_ls *ls = timer_container_of(ls, timer, ls_scan_timer);
513
int our_nodeid = dlm_our_nodeid();
514
struct dlm_rsb *r;
515
int rv;
516
517
while (1) {
518
/* interrupting point to leave iteration when
519
* recovery waits for timer_delete_sync(), recovery
520
* will take care to delete everything in scan list.
521
*/
522
if (dlm_locking_stopped(ls))
523
break;
524
525
rv = spin_trylock(&ls->ls_scan_lock);
526
if (!rv) {
527
/* rearm again try timer */
528
enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY);
529
break;
530
}
531
532
r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
533
res_scan_list);
534
if (!r) {
535
/* the next add_scan will enable the timer again */
536
spin_unlock(&ls->ls_scan_lock);
537
break;
538
}
539
540
/*
541
* If the first rsb is not yet expired, then stop because the
542
* list is sorted with nearest expiration first.
543
*/
544
if (time_before(jiffies, r->res_toss_time)) {
545
/* rearm with the next rsb to expire in the future */
546
enable_scan_timer(ls, r->res_toss_time);
547
spin_unlock(&ls->ls_scan_lock);
548
break;
549
}
550
551
/* in find_rsb_dir/nodir there is a reverse order of this
552
* lock, however this is only a trylock if we hit some
553
* possible contention we try it again.
554
*/
555
rv = write_trylock(&ls->ls_rsbtbl_lock);
556
if (!rv) {
557
spin_unlock(&ls->ls_scan_lock);
558
/* rearm again try timer */
559
enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY);
560
break;
561
}
562
563
list_del(&r->res_slow_list);
564
rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
565
dlm_rhash_rsb_params);
566
rsb_clear_flag(r, RSB_HASHED);
567
568
/* ls_rsbtbl_lock is not needed when calling send_remove() */
569
write_unlock(&ls->ls_rsbtbl_lock);
570
571
list_del_init(&r->res_scan_list);
572
spin_unlock(&ls->ls_scan_lock);
573
574
/* An rsb that is a dir record for a remote master rsb
575
* cannot be removed, and should not have a timer enabled.
576
*/
577
WARN_ON(!dlm_no_directory(ls) &&
578
(r->res_master_nodeid != our_nodeid) &&
579
(dlm_dir_nodeid(r) == our_nodeid));
580
581
/* We're the master of this rsb but we're not
582
* the directory record, so we need to tell the
583
* dir node to remove the dir record
584
*/
585
if (!dlm_no_directory(ls) &&
586
(r->res_master_nodeid == our_nodeid) &&
587
(dlm_dir_nodeid(r) != our_nodeid))
588
send_remove(r);
589
590
free_inactive_rsb(r);
591
}
592
}
593
594
/* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
595
unlock any spinlocks, go back and call pre_rsb_struct again.
596
Otherwise, take an rsb off the list and return it. */
597
598
static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
599
struct dlm_rsb **r_ret)
600
{
601
struct dlm_rsb *r;
602
603
r = dlm_allocate_rsb();
604
if (!r)
605
return -ENOMEM;
606
607
r->res_ls = ls;
608
r->res_length = len;
609
memcpy(r->res_name, name, len);
610
spin_lock_init(&r->res_lock);
611
612
INIT_LIST_HEAD(&r->res_lookup);
613
INIT_LIST_HEAD(&r->res_grantqueue);
614
INIT_LIST_HEAD(&r->res_convertqueue);
615
INIT_LIST_HEAD(&r->res_waitqueue);
616
INIT_LIST_HEAD(&r->res_root_list);
617
INIT_LIST_HEAD(&r->res_scan_list);
618
INIT_LIST_HEAD(&r->res_recover_list);
619
INIT_LIST_HEAD(&r->res_masters_list);
620
621
*r_ret = r;
622
return 0;
623
}
624
625
int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,
626
struct dlm_rsb **r_ret)
627
{
628
char key[DLM_RESNAME_MAXLEN] = {};
629
630
memcpy(key, name, len);
631
*r_ret = rhashtable_lookup_fast(rhash, &key, dlm_rhash_rsb_params);
632
if (*r_ret)
633
return 0;
634
635
return -EBADR;
636
}
637
638
static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash)
639
{
640
int rv;
641
642
rv = rhashtable_insert_fast(rhash, &rsb->res_node,
643
dlm_rhash_rsb_params);
644
if (!rv)
645
rsb_set_flag(rsb, RSB_HASHED);
646
647
return rv;
648
}
649
650
/*
651
* Find rsb in rsbtbl and potentially create/add one
652
*
653
* Delaying the release of rsb's has a similar benefit to applications keeping
654
* NL locks on an rsb, but without the guarantee that the cached master value
655
* will still be valid when the rsb is reused. Apps aren't always smart enough
656
* to keep NL locks on an rsb that they may lock again shortly; this can lead
657
* to excessive master lookups and removals if we don't delay the release.
658
*
659
* Searching for an rsb means looking through both the normal list and toss
660
* list. When found on the toss list the rsb is moved to the normal list with
661
* ref count of 1; when found on normal list the ref count is incremented.
662
*
663
* rsb's on the keep list are being used locally and refcounted.
664
* rsb's on the toss list are not being used locally, and are not refcounted.
665
*
666
* The toss list rsb's were either
667
* - previously used locally but not any more (were on keep list, then
668
* moved to toss list when last refcount dropped)
669
* - created and put on toss list as a directory record for a lookup
670
* (we are the dir node for the res, but are not using the res right now,
671
* but some other node is)
672
*
673
* The purpose of find_rsb() is to return a refcounted rsb for local use.
674
* So, if the given rsb is on the toss list, it is moved to the keep list
675
* before being returned.
676
*
677
* deactivate_rsb() happens when all local usage of the rsb is done, i.e. no
678
* more refcounts exist, so the rsb is moved from the keep list to the
679
* toss list.
680
*
681
* rsb's on both keep and toss lists are used for doing a name to master
682
* lookups. rsb's that are in use locally (and being refcounted) are on
683
* the keep list, rsb's that are not in use locally (not refcounted) and
684
* only exist for name/master lookups are on the toss list.
685
*
686
* rsb's on the toss list who's dir_nodeid is not local can have stale
687
* name/master mappings. So, remote requests on such rsb's can potentially
688
* return with an error, which means the mapping is stale and needs to
689
* be updated with a new lookup. (The idea behind MASTER UNCERTAIN and
690
* first_lkid is to keep only a single outstanding request on an rsb
691
* while that rsb has a potentially stale master.)
692
*/
693
694
static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
695
uint32_t hash, int dir_nodeid, int from_nodeid,
696
unsigned int flags, struct dlm_rsb **r_ret)
697
{
698
struct dlm_rsb *r = NULL;
699
int our_nodeid = dlm_our_nodeid();
700
int from_local = 0;
701
int from_other = 0;
702
int from_dir = 0;
703
int create = 0;
704
int error;
705
706
if (flags & R_RECEIVE_REQUEST) {
707
if (from_nodeid == dir_nodeid)
708
from_dir = 1;
709
else
710
from_other = 1;
711
} else if (flags & R_REQUEST) {
712
from_local = 1;
713
}
714
715
/*
716
* flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
717
* from_nodeid has sent us a lock in dlm_recover_locks, believing
718
* we're the new master. Our local recovery may not have set
719
* res_master_nodeid to our_nodeid yet, so allow either. Don't
720
* create the rsb; dlm_recover_process_copy() will handle EBADR
721
* by resending.
722
*
723
* If someone sends us a request, we are the dir node, and we do
724
* not find the rsb anywhere, then recreate it. This happens if
725
* someone sends us a request after we have removed/freed an rsb.
726
* (They sent a request instead of lookup because they are using
727
* an rsb taken from their scan list.)
728
*/
729
730
if (from_local || from_dir ||
731
(from_other && (dir_nodeid == our_nodeid))) {
732
create = 1;
733
}
734
735
retry:
736
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
737
if (error)
738
goto do_new;
739
740
/* check if the rsb is active under read lock - likely path */
741
read_lock_bh(&ls->ls_rsbtbl_lock);
742
if (!rsb_flag(r, RSB_HASHED)) {
743
read_unlock_bh(&ls->ls_rsbtbl_lock);
744
error = -EBADR;
745
goto do_new;
746
}
747
748
/*
749
* rsb is active, so we can't check master_nodeid without lock_rsb.
750
*/
751
752
if (rsb_flag(r, RSB_INACTIVE)) {
753
read_unlock_bh(&ls->ls_rsbtbl_lock);
754
goto do_inactive;
755
}
756
757
kref_get(&r->res_ref);
758
read_unlock_bh(&ls->ls_rsbtbl_lock);
759
goto out;
760
761
762
do_inactive:
763
write_lock_bh(&ls->ls_rsbtbl_lock);
764
765
/*
766
* The expectation here is that the rsb will have HASHED and
767
* INACTIVE flags set, and that the rsb can be moved from
768
* inactive back to active again. However, between releasing
769
* the read lock and acquiring the write lock, this rsb could
770
* have been removed from rsbtbl, and had HASHED cleared, to
771
* be freed. To deal with this case, we would normally need
772
* to repeat dlm_search_rsb_tree while holding the write lock,
773
* but rcu allows us to simply check the HASHED flag, because
774
* the rcu read lock means the rsb will not be freed yet.
775
* If the HASHED flag is not set, then the rsb is being freed,
776
* so we add a new rsb struct. If the HASHED flag is set,
777
* and INACTIVE is not set, it means another thread has
778
* made the rsb active, as we're expecting to do here, and
779
* we just repeat the lookup (this will be very unlikely.)
780
*/
781
if (rsb_flag(r, RSB_HASHED)) {
782
if (!rsb_flag(r, RSB_INACTIVE)) {
783
write_unlock_bh(&ls->ls_rsbtbl_lock);
784
goto retry;
785
}
786
} else {
787
write_unlock_bh(&ls->ls_rsbtbl_lock);
788
error = -EBADR;
789
goto do_new;
790
}
791
792
/*
793
* rsb found inactive (master_nodeid may be out of date unless
794
* we are the dir_nodeid or were the master) No other thread
795
* is using this rsb because it's inactive, so we can
796
* look at or update res_master_nodeid without lock_rsb.
797
*/
798
799
if ((r->res_master_nodeid != our_nodeid) && from_other) {
800
/* our rsb was not master, and another node (not the dir node)
801
has sent us a request */
802
log_debug(ls, "find_rsb inactive from_other %d master %d dir %d %s",
803
from_nodeid, r->res_master_nodeid, dir_nodeid,
804
r->res_name);
805
write_unlock_bh(&ls->ls_rsbtbl_lock);
806
error = -ENOTBLK;
807
goto out;
808
}
809
810
if ((r->res_master_nodeid != our_nodeid) && from_dir) {
811
/* don't think this should ever happen */
812
log_error(ls, "find_rsb inactive from_dir %d master %d",
813
from_nodeid, r->res_master_nodeid);
814
dlm_print_rsb(r);
815
/* fix it and go on */
816
r->res_master_nodeid = our_nodeid;
817
r->res_nodeid = 0;
818
rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
819
r->res_first_lkid = 0;
820
}
821
822
if (from_local && (r->res_master_nodeid != our_nodeid)) {
823
/* Because we have held no locks on this rsb,
824
res_master_nodeid could have become stale. */
825
rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
826
r->res_first_lkid = 0;
827
}
828
829
/* we always deactivate scan timer for the rsb, when
830
* we move it out of the inactive state as rsb state
831
* can be changed and scan timers are only for inactive
832
* rsbs.
833
*/
834
del_scan(ls, r);
835
list_move(&r->res_slow_list, &ls->ls_slow_active);
836
rsb_clear_flag(r, RSB_INACTIVE);
837
kref_init(&r->res_ref); /* ref is now used in active state */
838
write_unlock_bh(&ls->ls_rsbtbl_lock);
839
840
goto out;
841
842
843
do_new:
844
/*
845
* rsb not found
846
*/
847
848
if (error == -EBADR && !create)
849
goto out;
850
851
error = get_rsb_struct(ls, name, len, &r);
852
if (WARN_ON_ONCE(error))
853
goto out;
854
855
r->res_hash = hash;
856
r->res_dir_nodeid = dir_nodeid;
857
kref_init(&r->res_ref);
858
859
if (from_dir) {
860
/* want to see how often this happens */
861
log_debug(ls, "find_rsb new from_dir %d recreate %s",
862
from_nodeid, r->res_name);
863
r->res_master_nodeid = our_nodeid;
864
r->res_nodeid = 0;
865
goto out_add;
866
}
867
868
if (from_other && (dir_nodeid != our_nodeid)) {
869
/* should never happen */
870
log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
871
from_nodeid, dir_nodeid, our_nodeid, r->res_name);
872
dlm_free_rsb(r);
873
r = NULL;
874
error = -ENOTBLK;
875
goto out;
876
}
877
878
if (from_other) {
879
log_debug(ls, "find_rsb new from_other %d dir %d %s",
880
from_nodeid, dir_nodeid, r->res_name);
881
}
882
883
if (dir_nodeid == our_nodeid) {
884
/* When we are the dir nodeid, we can set the master
885
node immediately */
886
r->res_master_nodeid = our_nodeid;
887
r->res_nodeid = 0;
888
} else {
889
/* set_master will send_lookup to dir_nodeid */
890
r->res_master_nodeid = 0;
891
r->res_nodeid = -1;
892
}
893
894
out_add:
895
896
write_lock_bh(&ls->ls_rsbtbl_lock);
897
error = rsb_insert(r, &ls->ls_rsbtbl);
898
if (error == -EEXIST) {
899
/* somebody else was faster and it seems the
900
* rsb exists now, we do a whole relookup
901
*/
902
write_unlock_bh(&ls->ls_rsbtbl_lock);
903
dlm_free_rsb(r);
904
goto retry;
905
} else if (!error) {
906
list_add(&r->res_slow_list, &ls->ls_slow_active);
907
}
908
write_unlock_bh(&ls->ls_rsbtbl_lock);
909
out:
910
*r_ret = r;
911
return error;
912
}
913
914
/* During recovery, other nodes can send us new MSTCPY locks (from
915
dlm_recover_locks) before we've made ourself master (in
916
dlm_recover_masters). */
917
918
static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
919
uint32_t hash, int dir_nodeid, int from_nodeid,
920
unsigned int flags, struct dlm_rsb **r_ret)
921
{
922
struct dlm_rsb *r = NULL;
923
int our_nodeid = dlm_our_nodeid();
924
int recover = (flags & R_RECEIVE_RECOVER);
925
int error;
926
927
retry:
928
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
929
if (error)
930
goto do_new;
931
932
/* check if the rsb is in active state under read lock - likely path */
933
read_lock_bh(&ls->ls_rsbtbl_lock);
934
if (!rsb_flag(r, RSB_HASHED)) {
935
read_unlock_bh(&ls->ls_rsbtbl_lock);
936
goto do_new;
937
}
938
939
if (rsb_flag(r, RSB_INACTIVE)) {
940
read_unlock_bh(&ls->ls_rsbtbl_lock);
941
goto do_inactive;
942
}
943
944
/*
945
* rsb is active, so we can't check master_nodeid without lock_rsb.
946
*/
947
948
kref_get(&r->res_ref);
949
read_unlock_bh(&ls->ls_rsbtbl_lock);
950
951
goto out;
952
953
954
do_inactive:
955
write_lock_bh(&ls->ls_rsbtbl_lock);
956
957
/* See comment in find_rsb_dir. */
958
if (rsb_flag(r, RSB_HASHED)) {
959
if (!rsb_flag(r, RSB_INACTIVE)) {
960
write_unlock_bh(&ls->ls_rsbtbl_lock);
961
goto retry;
962
}
963
} else {
964
write_unlock_bh(&ls->ls_rsbtbl_lock);
965
goto do_new;
966
}
967
968
969
/*
970
* rsb found inactive. No other thread is using this rsb because
971
* it's inactive, so we can look at or update res_master_nodeid
972
* without lock_rsb.
973
*/
974
975
if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
976
/* our rsb is not master, and another node has sent us a
977
request; this should never happen */
978
log_error(ls, "find_rsb inactive from_nodeid %d master %d dir %d",
979
from_nodeid, r->res_master_nodeid, dir_nodeid);
980
dlm_print_rsb(r);
981
write_unlock_bh(&ls->ls_rsbtbl_lock);
982
error = -ENOTBLK;
983
goto out;
984
}
985
986
if (!recover && (r->res_master_nodeid != our_nodeid) &&
987
(dir_nodeid == our_nodeid)) {
988
/* our rsb is not master, and we are dir; may as well fix it;
989
this should never happen */
990
log_error(ls, "find_rsb inactive our %d master %d dir %d",
991
our_nodeid, r->res_master_nodeid, dir_nodeid);
992
dlm_print_rsb(r);
993
r->res_master_nodeid = our_nodeid;
994
r->res_nodeid = 0;
995
}
996
997
del_scan(ls, r);
998
list_move(&r->res_slow_list, &ls->ls_slow_active);
999
rsb_clear_flag(r, RSB_INACTIVE);
1000
kref_init(&r->res_ref);
1001
write_unlock_bh(&ls->ls_rsbtbl_lock);
1002
1003
goto out;
1004
1005
1006
do_new:
1007
/*
1008
* rsb not found
1009
*/
1010
1011
error = get_rsb_struct(ls, name, len, &r);
1012
if (WARN_ON_ONCE(error))
1013
goto out;
1014
1015
r->res_hash = hash;
1016
r->res_dir_nodeid = dir_nodeid;
1017
r->res_master_nodeid = dir_nodeid;
1018
r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
1019
kref_init(&r->res_ref);
1020
1021
write_lock_bh(&ls->ls_rsbtbl_lock);
1022
error = rsb_insert(r, &ls->ls_rsbtbl);
1023
if (error == -EEXIST) {
1024
/* somebody else was faster and it seems the
1025
* rsb exists now, we do a whole relookup
1026
*/
1027
write_unlock_bh(&ls->ls_rsbtbl_lock);
1028
dlm_free_rsb(r);
1029
goto retry;
1030
} else if (!error) {
1031
list_add(&r->res_slow_list, &ls->ls_slow_active);
1032
}
1033
write_unlock_bh(&ls->ls_rsbtbl_lock);
1034
1035
out:
1036
*r_ret = r;
1037
return error;
1038
}
1039
1040
/*
1041
* rsb rcu usage
1042
*
1043
* While rcu read lock is held, the rsb cannot be freed,
1044
* which allows a lookup optimization.
1045
*
1046
* Two threads are accessing the same rsb concurrently,
1047
* the first (A) is trying to use the rsb, the second (B)
1048
* is trying to free the rsb.
1049
*
1050
* thread A thread B
1051
* (trying to use rsb) (trying to free rsb)
1052
*
1053
* A1. rcu read lock
1054
* A2. rsbtbl read lock
1055
* A3. look up rsb in rsbtbl
1056
* A4. rsbtbl read unlock
1057
* B1. rsbtbl write lock
1058
* B2. look up rsb in rsbtbl
1059
* B3. remove rsb from rsbtbl
1060
* B4. clear rsb HASHED flag
1061
* B5. rsbtbl write unlock
1062
* B6. begin freeing rsb using rcu...
1063
*
1064
* (rsb is inactive, so try to make it active again)
1065
* A5. read rsb HASHED flag (safe because rsb is not freed yet)
1066
* A6. the rsb HASHED flag is not set, which it means the rsb
1067
* is being removed from rsbtbl and freed, so don't use it.
1068
* A7. rcu read unlock
1069
*
1070
* B7. ...finish freeing rsb using rcu
1071
* A8. create a new rsb
1072
*
1073
* Without the rcu optimization, steps A5-8 would need to do
1074
* an extra rsbtbl lookup:
1075
* A5. rsbtbl write lock
1076
* A6. look up rsb in rsbtbl, not found
1077
* A7. rsbtbl write unlock
1078
* A8. create a new rsb
1079
*/
1080
1081
static int find_rsb(struct dlm_ls *ls, const void *name, int len,
1082
int from_nodeid, unsigned int flags,
1083
struct dlm_rsb **r_ret)
1084
{
1085
int dir_nodeid;
1086
uint32_t hash;
1087
int rv;
1088
1089
if (len > DLM_RESNAME_MAXLEN)
1090
return -EINVAL;
1091
1092
hash = jhash(name, len, 0);
1093
dir_nodeid = dlm_hash2nodeid(ls, hash);
1094
1095
rcu_read_lock();
1096
if (dlm_no_directory(ls))
1097
rv = find_rsb_nodir(ls, name, len, hash, dir_nodeid,
1098
from_nodeid, flags, r_ret);
1099
else
1100
rv = find_rsb_dir(ls, name, len, hash, dir_nodeid,
1101
from_nodeid, flags, r_ret);
1102
rcu_read_unlock();
1103
return rv;
1104
}
1105
1106
/* we have received a request and found that res_master_nodeid != our_nodeid,
1107
so we need to return an error or make ourself the master */
1108
1109
static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
1110
int from_nodeid)
1111
{
1112
if (dlm_no_directory(ls)) {
1113
log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
1114
from_nodeid, r->res_master_nodeid,
1115
r->res_dir_nodeid);
1116
dlm_print_rsb(r);
1117
return -ENOTBLK;
1118
}
1119
1120
if (from_nodeid != r->res_dir_nodeid) {
1121
/* our rsb is not master, and another node (not the dir node)
1122
has sent us a request. this is much more common when our
1123
master_nodeid is zero, so limit debug to non-zero. */
1124
1125
if (r->res_master_nodeid) {
1126
log_debug(ls, "validate master from_other %d master %d "
1127
"dir %d first %x %s", from_nodeid,
1128
r->res_master_nodeid, r->res_dir_nodeid,
1129
r->res_first_lkid, r->res_name);
1130
}
1131
return -ENOTBLK;
1132
} else {
1133
/* our rsb is not master, but the dir nodeid has sent us a
1134
request; this could happen with master 0 / res_nodeid -1 */
1135
1136
if (r->res_master_nodeid) {
1137
log_error(ls, "validate master from_dir %d master %d "
1138
"first %x %s",
1139
from_nodeid, r->res_master_nodeid,
1140
r->res_first_lkid, r->res_name);
1141
}
1142
1143
r->res_master_nodeid = dlm_our_nodeid();
1144
r->res_nodeid = 0;
1145
return 0;
1146
}
1147
}
1148
1149
static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid,
1150
int from_nodeid, bool is_inactive, unsigned int flags,
1151
int *r_nodeid, int *result)
1152
{
1153
int fix_master = (flags & DLM_LU_RECOVER_MASTER);
1154
int from_master = (flags & DLM_LU_RECOVER_DIR);
1155
1156
if (r->res_dir_nodeid != our_nodeid) {
1157
/* should not happen, but may as well fix it and carry on */
1158
log_error(ls, "%s res_dir %d our %d %s", __func__,
1159
r->res_dir_nodeid, our_nodeid, r->res_name);
1160
r->res_dir_nodeid = our_nodeid;
1161
}
1162
1163
if (fix_master && r->res_master_nodeid && dlm_is_removed(ls, r->res_master_nodeid)) {
1164
/* Recovery uses this function to set a new master when
1165
* the previous master failed. Setting NEW_MASTER will
1166
* force dlm_recover_masters to call recover_master on this
1167
* rsb even though the res_nodeid is no longer removed.
1168
*/
1169
1170
r->res_master_nodeid = from_nodeid;
1171
r->res_nodeid = from_nodeid;
1172
rsb_set_flag(r, RSB_NEW_MASTER);
1173
1174
if (is_inactive) {
1175
/* I don't think we should ever find it inactive. */
1176
log_error(ls, "%s fix_master inactive", __func__);
1177
dlm_dump_rsb(r);
1178
}
1179
}
1180
1181
if (from_master && (r->res_master_nodeid != from_nodeid)) {
1182
/* this will happen if from_nodeid became master during
1183
* a previous recovery cycle, and we aborted the previous
1184
* cycle before recovering this master value
1185
*/
1186
1187
log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s",
1188
__func__, from_nodeid, r->res_master_nodeid,
1189
r->res_nodeid, r->res_first_lkid, r->res_name);
1190
1191
if (r->res_master_nodeid == our_nodeid) {
1192
log_error(ls, "from_master %d our_master", from_nodeid);
1193
dlm_dump_rsb(r);
1194
goto ret_assign;
1195
}
1196
1197
r->res_master_nodeid = from_nodeid;
1198
r->res_nodeid = from_nodeid;
1199
rsb_set_flag(r, RSB_NEW_MASTER);
1200
}
1201
1202
if (!r->res_master_nodeid) {
1203
/* this will happen if recovery happens while we're looking
1204
* up the master for this rsb
1205
*/
1206
1207
log_debug(ls, "%s master 0 to %d first %x %s", __func__,
1208
from_nodeid, r->res_first_lkid, r->res_name);
1209
r->res_master_nodeid = from_nodeid;
1210
r->res_nodeid = from_nodeid;
1211
}
1212
1213
if (!from_master && !fix_master &&
1214
(r->res_master_nodeid == from_nodeid)) {
1215
/* this can happen when the master sends remove, the dir node
1216
* finds the rsb on the active list and ignores the remove,
1217
* and the former master sends a lookup
1218
*/
1219
1220
log_limit(ls, "%s from master %d flags %x first %x %s",
1221
__func__, from_nodeid, flags, r->res_first_lkid,
1222
r->res_name);
1223
}
1224
1225
ret_assign:
1226
*r_nodeid = r->res_master_nodeid;
1227
if (result)
1228
*result = DLM_LU_MATCH;
1229
}
1230
1231
/*
1232
* We're the dir node for this res and another node wants to know the
1233
* master nodeid. During normal operation (non recovery) this is only
1234
* called from receive_lookup(); master lookups when the local node is
1235
* the dir node are done by find_rsb().
1236
*
1237
* normal operation, we are the dir node for a resource
1238
* . _request_lock
1239
* . set_master
1240
* . send_lookup
1241
* . receive_lookup
1242
* . dlm_master_lookup flags 0
1243
*
1244
* recover directory, we are rebuilding dir for all resources
1245
* . dlm_recover_directory
1246
* . dlm_rcom_names
1247
* remote node sends back the rsb names it is master of and we are dir of
1248
* . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
1249
* we either create new rsb setting remote node as master, or find existing
1250
* rsb and set master to be the remote node.
1251
*
1252
* recover masters, we are finding the new master for resources
1253
* . dlm_recover_masters
1254
* . recover_master
1255
* . dlm_send_rcom_lookup
1256
* . receive_rcom_lookup
1257
* . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
1258
*/
1259
1260
static int _dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
1261
int len, unsigned int flags, int *r_nodeid, int *result)
1262
{
1263
struct dlm_rsb *r = NULL;
1264
uint32_t hash;
1265
int our_nodeid = dlm_our_nodeid();
1266
int dir_nodeid, error;
1267
1268
if (len > DLM_RESNAME_MAXLEN)
1269
return -EINVAL;
1270
1271
if (from_nodeid == our_nodeid) {
1272
log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
1273
our_nodeid, flags);
1274
return -EINVAL;
1275
}
1276
1277
hash = jhash(name, len, 0);
1278
dir_nodeid = dlm_hash2nodeid(ls, hash);
1279
if (dir_nodeid != our_nodeid) {
1280
log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
1281
from_nodeid, dir_nodeid, our_nodeid, hash,
1282
ls->ls_num_nodes);
1283
*r_nodeid = -1;
1284
return -EINVAL;
1285
}
1286
1287
retry:
1288
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1289
if (error)
1290
goto not_found;
1291
1292
/* check if the rsb is active under read lock - likely path */
1293
read_lock_bh(&ls->ls_rsbtbl_lock);
1294
if (!rsb_flag(r, RSB_HASHED)) {
1295
read_unlock_bh(&ls->ls_rsbtbl_lock);
1296
goto not_found;
1297
}
1298
1299
if (rsb_flag(r, RSB_INACTIVE)) {
1300
read_unlock_bh(&ls->ls_rsbtbl_lock);
1301
goto do_inactive;
1302
}
1303
1304
/* because the rsb is active, we need to lock_rsb before
1305
* checking/changing re_master_nodeid
1306
*/
1307
1308
hold_rsb(r);
1309
read_unlock_bh(&ls->ls_rsbtbl_lock);
1310
lock_rsb(r);
1311
1312
__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
1313
flags, r_nodeid, result);
1314
1315
/* the rsb was active */
1316
unlock_rsb(r);
1317
put_rsb(r);
1318
1319
return 0;
1320
1321
do_inactive:
1322
/* unlikely path - check if still part of ls_rsbtbl */
1323
write_lock_bh(&ls->ls_rsbtbl_lock);
1324
1325
/* see comment in find_rsb_dir */
1326
if (rsb_flag(r, RSB_HASHED)) {
1327
if (!rsb_flag(r, RSB_INACTIVE)) {
1328
write_unlock_bh(&ls->ls_rsbtbl_lock);
1329
/* something as changed, very unlikely but
1330
* try again
1331
*/
1332
goto retry;
1333
}
1334
} else {
1335
write_unlock_bh(&ls->ls_rsbtbl_lock);
1336
goto not_found;
1337
}
1338
1339
/* because the rsb is inactive, it's not refcounted and lock_rsb
1340
is not used, but is protected by the rsbtbl lock */
1341
1342
__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags,
1343
r_nodeid, result);
1344
1345
/* A dir record rsb should never be on scan list.
1346
* Except when we are the dir and master node.
1347
* This function should only be called by the dir
1348
* node.
1349
*/
1350
WARN_ON(!list_empty(&r->res_scan_list) &&
1351
r->res_master_nodeid != our_nodeid);
1352
1353
write_unlock_bh(&ls->ls_rsbtbl_lock);
1354
1355
return 0;
1356
1357
not_found:
1358
error = get_rsb_struct(ls, name, len, &r);
1359
if (WARN_ON_ONCE(error))
1360
goto out;
1361
1362
r->res_hash = hash;
1363
r->res_dir_nodeid = our_nodeid;
1364
r->res_master_nodeid = from_nodeid;
1365
r->res_nodeid = from_nodeid;
1366
rsb_set_flag(r, RSB_INACTIVE);
1367
1368
write_lock_bh(&ls->ls_rsbtbl_lock);
1369
error = rsb_insert(r, &ls->ls_rsbtbl);
1370
if (error == -EEXIST) {
1371
/* somebody else was faster and it seems the
1372
* rsb exists now, we do a whole relookup
1373
*/
1374
write_unlock_bh(&ls->ls_rsbtbl_lock);
1375
dlm_free_rsb(r);
1376
goto retry;
1377
} else if (error) {
1378
write_unlock_bh(&ls->ls_rsbtbl_lock);
1379
/* should never happen */
1380
dlm_free_rsb(r);
1381
goto retry;
1382
}
1383
1384
list_add(&r->res_slow_list, &ls->ls_slow_inactive);
1385
write_unlock_bh(&ls->ls_rsbtbl_lock);
1386
1387
if (result)
1388
*result = DLM_LU_ADD;
1389
*r_nodeid = from_nodeid;
1390
out:
1391
return error;
1392
}
1393
1394
int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
1395
int len, unsigned int flags, int *r_nodeid, int *result)
1396
{
1397
int rv;
1398
rcu_read_lock();
1399
rv = _dlm_master_lookup(ls, from_nodeid, name, len, flags, r_nodeid, result);
1400
rcu_read_unlock();
1401
return rv;
1402
}
1403
1404
static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1405
{
1406
struct dlm_rsb *r;
1407
1408
read_lock_bh(&ls->ls_rsbtbl_lock);
1409
list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
1410
if (r->res_hash == hash)
1411
dlm_dump_rsb(r);
1412
}
1413
read_unlock_bh(&ls->ls_rsbtbl_lock);
1414
}
1415
1416
void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
1417
{
1418
struct dlm_rsb *r = NULL;
1419
int error;
1420
1421
rcu_read_lock();
1422
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1423
if (!error)
1424
goto out;
1425
1426
dlm_dump_rsb(r);
1427
out:
1428
rcu_read_unlock();
1429
}
1430
1431
static void deactivate_rsb(struct kref *kref)
1432
{
1433
struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1434
struct dlm_ls *ls = r->res_ls;
1435
int our_nodeid = dlm_our_nodeid();
1436
1437
DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1438
rsb_set_flag(r, RSB_INACTIVE);
1439
list_move(&r->res_slow_list, &ls->ls_slow_inactive);
1440
1441
/*
1442
* When the rsb becomes unused, there are two possibilities:
1443
* 1. Leave the inactive rsb in place (don't remove it).
1444
* 2. Add it to the scan list to be removed.
1445
*
1446
* 1 is done when the rsb is acting as the dir record
1447
* for a remotely mastered rsb. The rsb must be left
1448
* in place as an inactive rsb to act as the dir record.
1449
*
1450
* 2 is done when a) the rsb is not the master and not the
1451
* dir record, b) when the rsb is both the master and the
1452
* dir record, c) when the rsb is master but not dir record.
1453
*
1454
* (If no directory is used, the rsb can always be removed.)
1455
*/
1456
if (dlm_no_directory(ls) ||
1457
(r->res_master_nodeid == our_nodeid ||
1458
dlm_dir_nodeid(r) != our_nodeid))
1459
add_scan(ls, r);
1460
1461
if (r->res_lvbptr) {
1462
dlm_free_lvb(r->res_lvbptr);
1463
r->res_lvbptr = NULL;
1464
}
1465
}
1466
1467
void free_inactive_rsb(struct dlm_rsb *r)
1468
{
1469
WARN_ON_ONCE(!rsb_flag(r, RSB_INACTIVE));
1470
1471
DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1472
DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1473
DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1474
DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1475
DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1476
DLM_ASSERT(list_empty(&r->res_scan_list), dlm_dump_rsb(r););
1477
DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1478
DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r););
1479
1480
dlm_free_rsb(r);
1481
}
1482
1483
/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1484
The rsb must exist as long as any lkb's for it do. */
1485
1486
static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1487
{
1488
hold_rsb(r);
1489
lkb->lkb_resource = r;
1490
}
1491
1492
static void detach_lkb(struct dlm_lkb *lkb)
1493
{
1494
if (lkb->lkb_resource) {
1495
put_rsb(lkb->lkb_resource);
1496
lkb->lkb_resource = NULL;
1497
}
1498
}
1499
1500
static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
1501
unsigned long start, unsigned long end)
1502
{
1503
struct xa_limit limit;
1504
struct dlm_lkb *lkb;
1505
int rv;
1506
1507
limit.max = end;
1508
limit.min = start;
1509
1510
lkb = dlm_allocate_lkb();
1511
if (!lkb)
1512
return -ENOMEM;
1513
1514
lkb->lkb_last_bast_cb_mode = DLM_LOCK_IV;
1515
lkb->lkb_last_cast_cb_mode = DLM_LOCK_IV;
1516
lkb->lkb_last_cb_mode = DLM_LOCK_IV;
1517
lkb->lkb_nodeid = -1;
1518
lkb->lkb_grmode = DLM_LOCK_IV;
1519
kref_init(&lkb->lkb_ref);
1520
INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1521
INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1522
1523
write_lock_bh(&ls->ls_lkbxa_lock);
1524
rv = xa_alloc(&ls->ls_lkbxa, &lkb->lkb_id, lkb, limit, GFP_ATOMIC);
1525
write_unlock_bh(&ls->ls_lkbxa_lock);
1526
1527
if (rv < 0) {
1528
log_error(ls, "create_lkb xa error %d", rv);
1529
dlm_free_lkb(lkb);
1530
return rv;
1531
}
1532
1533
*lkb_ret = lkb;
1534
return 0;
1535
}
1536
1537
static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1538
{
1539
return _create_lkb(ls, lkb_ret, 1, ULONG_MAX);
1540
}
1541
1542
static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1543
{
1544
struct dlm_lkb *lkb;
1545
1546
rcu_read_lock();
1547
lkb = xa_load(&ls->ls_lkbxa, lkid);
1548
if (lkb) {
1549
/* check if lkb is still part of lkbxa under lkbxa_lock as
1550
* the lkb_ref is tight to the lkbxa data structure, see
1551
* __put_lkb().
1552
*/
1553
read_lock_bh(&ls->ls_lkbxa_lock);
1554
if (kref_read(&lkb->lkb_ref))
1555
kref_get(&lkb->lkb_ref);
1556
else
1557
lkb = NULL;
1558
read_unlock_bh(&ls->ls_lkbxa_lock);
1559
}
1560
rcu_read_unlock();
1561
1562
*lkb_ret = lkb;
1563
return lkb ? 0 : -ENOENT;
1564
}
1565
1566
static void kill_lkb(struct kref *kref)
1567
{
1568
struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1569
1570
/* All work is done after the return from kref_put() so we
1571
can release the write_lock before the detach_lkb */
1572
1573
DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1574
}
1575
1576
/* __put_lkb() is used when an lkb may not have an rsb attached to
1577
it so we need to provide the lockspace explicitly */
1578
1579
static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1580
{
1581
uint32_t lkid = lkb->lkb_id;
1582
int rv;
1583
1584
rv = dlm_kref_put_write_lock_bh(&lkb->lkb_ref, kill_lkb,
1585
&ls->ls_lkbxa_lock);
1586
if (rv) {
1587
xa_erase(&ls->ls_lkbxa, lkid);
1588
write_unlock_bh(&ls->ls_lkbxa_lock);
1589
1590
detach_lkb(lkb);
1591
1592
/* for local/process lkbs, lvbptr points to caller's lksb */
1593
if (lkb->lkb_lvbptr && is_master_copy(lkb))
1594
dlm_free_lvb(lkb->lkb_lvbptr);
1595
dlm_free_lkb(lkb);
1596
}
1597
1598
return rv;
1599
}
1600
1601
int dlm_put_lkb(struct dlm_lkb *lkb)
1602
{
1603
struct dlm_ls *ls;
1604
1605
DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1606
DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1607
1608
ls = lkb->lkb_resource->res_ls;
1609
return __put_lkb(ls, lkb);
1610
}
1611
1612
/* This is only called to add a reference when the code already holds
1613
a valid reference to the lkb, so there's no need for locking. */
1614
1615
static inline void hold_lkb(struct dlm_lkb *lkb)
1616
{
1617
kref_get(&lkb->lkb_ref);
1618
}
1619
1620
static void unhold_lkb_assert(struct kref *kref)
1621
{
1622
struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1623
1624
DLM_ASSERT(false, dlm_print_lkb(lkb););
1625
}
1626
1627
/* This is called when we need to remove a reference and are certain
1628
it's not the last ref. e.g. del_lkb is always called between a
1629
find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1630
put_lkb would work fine, but would involve unnecessary locking */
1631
1632
static inline void unhold_lkb(struct dlm_lkb *lkb)
1633
{
1634
kref_put(&lkb->lkb_ref, unhold_lkb_assert);
1635
}
1636
1637
static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1638
int mode)
1639
{
1640
struct dlm_lkb *lkb = NULL, *iter;
1641
1642
list_for_each_entry(iter, head, lkb_statequeue)
1643
if (iter->lkb_rqmode < mode) {
1644
lkb = iter;
1645
list_add_tail(new, &iter->lkb_statequeue);
1646
break;
1647
}
1648
1649
if (!lkb)
1650
list_add_tail(new, head);
1651
}
1652
1653
/* add/remove lkb to rsb's grant/convert/wait queue */
1654
1655
static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1656
{
1657
kref_get(&lkb->lkb_ref);
1658
1659
DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1660
1661
lkb->lkb_timestamp = ktime_get();
1662
1663
lkb->lkb_status = status;
1664
1665
switch (status) {
1666
case DLM_LKSTS_WAITING:
1667
if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1668
list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1669
else
1670
list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1671
break;
1672
case DLM_LKSTS_GRANTED:
1673
/* convention says granted locks kept in order of grmode */
1674
lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1675
lkb->lkb_grmode);
1676
break;
1677
case DLM_LKSTS_CONVERT:
1678
if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1679
list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1680
else
1681
list_add_tail(&lkb->lkb_statequeue,
1682
&r->res_convertqueue);
1683
break;
1684
default:
1685
DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1686
}
1687
}
1688
1689
static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1690
{
1691
lkb->lkb_status = 0;
1692
list_del(&lkb->lkb_statequeue);
1693
unhold_lkb(lkb);
1694
}
1695
1696
static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1697
{
1698
del_lkb(r, lkb);
1699
add_lkb(r, lkb, sts);
1700
}
1701
1702
static int msg_reply_type(int mstype)
1703
{
1704
switch (mstype) {
1705
case DLM_MSG_REQUEST:
1706
return DLM_MSG_REQUEST_REPLY;
1707
case DLM_MSG_CONVERT:
1708
return DLM_MSG_CONVERT_REPLY;
1709
case DLM_MSG_UNLOCK:
1710
return DLM_MSG_UNLOCK_REPLY;
1711
case DLM_MSG_CANCEL:
1712
return DLM_MSG_CANCEL_REPLY;
1713
case DLM_MSG_LOOKUP:
1714
return DLM_MSG_LOOKUP_REPLY;
1715
}
1716
return -1;
1717
}
1718
1719
/* add/remove lkb from global waiters list of lkb's waiting for
1720
a reply from a remote node */
1721
1722
static void add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1723
{
1724
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1725
1726
spin_lock_bh(&ls->ls_waiters_lock);
1727
if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1728
switch (mstype) {
1729
case DLM_MSG_UNLOCK:
1730
set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
1731
break;
1732
case DLM_MSG_CANCEL:
1733
set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
1734
break;
1735
default:
1736
/* should never happen as validate_lock_args() checks
1737
* on lkb_wait_type and validate_unlock_args() only
1738
* creates UNLOCK or CANCEL messages.
1739
*/
1740
WARN_ON_ONCE(1);
1741
goto out;
1742
}
1743
lkb->lkb_wait_count++;
1744
hold_lkb(lkb);
1745
1746
log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1747
lkb->lkb_id, lkb->lkb_wait_type, mstype,
1748
lkb->lkb_wait_count, dlm_iflags_val(lkb));
1749
goto out;
1750
}
1751
1752
DLM_ASSERT(!lkb->lkb_wait_count,
1753
dlm_print_lkb(lkb);
1754
printk("wait_count %d\n", lkb->lkb_wait_count););
1755
1756
lkb->lkb_wait_count++;
1757
lkb->lkb_wait_type = mstype;
1758
lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1759
hold_lkb(lkb);
1760
list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1761
out:
1762
spin_unlock_bh(&ls->ls_waiters_lock);
1763
}
1764
1765
/* We clear the RESEND flag because we might be taking an lkb off the waiters
1766
list as part of process_requestqueue (e.g. a lookup that has an optimized
1767
request reply on the requestqueue) between dlm_recover_waiters_pre() which
1768
set RESEND and dlm_recover_waiters_post() */
1769
1770
static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1771
const struct dlm_message *ms)
1772
{
1773
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1774
int overlap_done = 0;
1775
1776
if (mstype == DLM_MSG_UNLOCK_REPLY &&
1777
test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
1778
log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1779
overlap_done = 1;
1780
goto out_del;
1781
}
1782
1783
if (mstype == DLM_MSG_CANCEL_REPLY &&
1784
test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1785
log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1786
overlap_done = 1;
1787
goto out_del;
1788
}
1789
1790
/* Cancel state was preemptively cleared by a successful convert,
1791
see next comment, nothing to do. */
1792
1793
if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1794
(lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1795
log_debug(ls, "remwait %x cancel_reply wait_type %d",
1796
lkb->lkb_id, lkb->lkb_wait_type);
1797
return -1;
1798
}
1799
1800
/* Remove for the convert reply, and premptively remove for the
1801
cancel reply. A convert has been granted while there's still
1802
an outstanding cancel on it (the cancel is moot and the result
1803
in the cancel reply should be 0). We preempt the cancel reply
1804
because the app gets the convert result and then can follow up
1805
with another op, like convert. This subsequent op would see the
1806
lingering state of the cancel and fail with -EBUSY. */
1807
1808
if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1809
(lkb->lkb_wait_type == DLM_MSG_CONVERT) && ms && !ms->m_result &&
1810
test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1811
log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1812
lkb->lkb_id);
1813
lkb->lkb_wait_type = 0;
1814
lkb->lkb_wait_count--;
1815
unhold_lkb(lkb);
1816
goto out_del;
1817
}
1818
1819
/* N.B. type of reply may not always correspond to type of original
1820
msg due to lookup->request optimization, verify others? */
1821
1822
if (lkb->lkb_wait_type) {
1823
lkb->lkb_wait_type = 0;
1824
goto out_del;
1825
}
1826
1827
log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1828
lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0,
1829
lkb->lkb_remid, mstype, dlm_iflags_val(lkb));
1830
return -1;
1831
1832
out_del:
1833
/* the force-unlock/cancel has completed and we haven't recvd a reply
1834
to the op that was in progress prior to the unlock/cancel; we
1835
give up on any reply to the earlier op. FIXME: not sure when/how
1836
this would happen */
1837
1838
if (overlap_done && lkb->lkb_wait_type) {
1839
log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1840
lkb->lkb_id, mstype, lkb->lkb_wait_type);
1841
lkb->lkb_wait_count--;
1842
unhold_lkb(lkb);
1843
lkb->lkb_wait_type = 0;
1844
}
1845
1846
DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1847
1848
clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
1849
lkb->lkb_wait_count--;
1850
if (!lkb->lkb_wait_count)
1851
list_del_init(&lkb->lkb_wait_reply);
1852
unhold_lkb(lkb);
1853
return 0;
1854
}
1855
1856
static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1857
{
1858
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1859
int error;
1860
1861
spin_lock_bh(&ls->ls_waiters_lock);
1862
error = _remove_from_waiters(lkb, mstype, NULL);
1863
spin_unlock_bh(&ls->ls_waiters_lock);
1864
return error;
1865
}
1866
1867
/* Handles situations where we might be processing a "fake" or "local" reply in
1868
* the recovery context which stops any locking activity. Only debugfs might
1869
* change the lockspace waiters but they will held the recovery lock to ensure
1870
* remove_from_waiters_ms() in local case will be the only user manipulating the
1871
* lockspace waiters in recovery context.
1872
*/
1873
1874
static int remove_from_waiters_ms(struct dlm_lkb *lkb,
1875
const struct dlm_message *ms, bool local)
1876
{
1877
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1878
int error;
1879
1880
if (!local)
1881
spin_lock_bh(&ls->ls_waiters_lock);
1882
else
1883
WARN_ON_ONCE(!rwsem_is_locked(&ls->ls_in_recovery) ||
1884
!dlm_locking_stopped(ls));
1885
error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
1886
if (!local)
1887
spin_unlock_bh(&ls->ls_waiters_lock);
1888
return error;
1889
}
1890
1891
/* lkb is master or local copy */
1892
1893
static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1894
{
1895
int b, len = r->res_ls->ls_lvblen;
1896
1897
/* b=1 lvb returned to caller
1898
b=0 lvb written to rsb or invalidated
1899
b=-1 do nothing */
1900
1901
b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1902
1903
if (b == 1) {
1904
if (!lkb->lkb_lvbptr)
1905
return;
1906
1907
if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1908
return;
1909
1910
if (!r->res_lvbptr)
1911
return;
1912
1913
memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1914
lkb->lkb_lvbseq = r->res_lvbseq;
1915
1916
} else if (b == 0) {
1917
if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1918
rsb_set_flag(r, RSB_VALNOTVALID);
1919
return;
1920
}
1921
1922
if (!lkb->lkb_lvbptr)
1923
return;
1924
1925
if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1926
return;
1927
1928
if (!r->res_lvbptr)
1929
r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1930
1931
if (!r->res_lvbptr)
1932
return;
1933
1934
memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1935
r->res_lvbseq++;
1936
lkb->lkb_lvbseq = r->res_lvbseq;
1937
rsb_clear_flag(r, RSB_VALNOTVALID);
1938
}
1939
1940
if (rsb_flag(r, RSB_VALNOTVALID))
1941
set_bit(DLM_SBF_VALNOTVALID_BIT, &lkb->lkb_sbflags);
1942
}
1943
1944
static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1945
{
1946
if (lkb->lkb_grmode < DLM_LOCK_PW)
1947
return;
1948
1949
if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1950
rsb_set_flag(r, RSB_VALNOTVALID);
1951
return;
1952
}
1953
1954
if (!lkb->lkb_lvbptr)
1955
return;
1956
1957
if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1958
return;
1959
1960
if (!r->res_lvbptr)
1961
r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1962
1963
if (!r->res_lvbptr)
1964
return;
1965
1966
memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1967
r->res_lvbseq++;
1968
rsb_clear_flag(r, RSB_VALNOTVALID);
1969
}
1970
1971
/* lkb is process copy (pc) */
1972
1973
static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1974
const struct dlm_message *ms)
1975
{
1976
int b;
1977
1978
if (!lkb->lkb_lvbptr)
1979
return;
1980
1981
if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1982
return;
1983
1984
b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1985
if (b == 1) {
1986
int len = receive_extralen(ms);
1987
if (len > r->res_ls->ls_lvblen)
1988
len = r->res_ls->ls_lvblen;
1989
memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1990
lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
1991
}
1992
}
1993
1994
/* Manipulate lkb's on rsb's convert/granted/waiting queues
1995
remove_lock -- used for unlock, removes lkb from granted
1996
revert_lock -- used for cancel, moves lkb from convert to granted
1997
grant_lock -- used for request and convert, adds lkb to granted or
1998
moves lkb from convert or waiting to granted
1999
2000
Each of these is used for master or local copy lkb's. There is
2001
also a _pc() variation used to make the corresponding change on
2002
a process copy (pc) lkb. */
2003
2004
static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2005
{
2006
del_lkb(r, lkb);
2007
lkb->lkb_grmode = DLM_LOCK_IV;
2008
/* this unhold undoes the original ref from create_lkb()
2009
so this leads to the lkb being freed */
2010
unhold_lkb(lkb);
2011
}
2012
2013
static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2014
{
2015
set_lvb_unlock(r, lkb);
2016
_remove_lock(r, lkb);
2017
}
2018
2019
static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2020
{
2021
_remove_lock(r, lkb);
2022
}
2023
2024
/* returns: 0 did nothing
2025
1 moved lock to granted
2026
-1 removed lock */
2027
2028
static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2029
{
2030
int rv = 0;
2031
2032
lkb->lkb_rqmode = DLM_LOCK_IV;
2033
2034
switch (lkb->lkb_status) {
2035
case DLM_LKSTS_GRANTED:
2036
break;
2037
case DLM_LKSTS_CONVERT:
2038
move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2039
rv = 1;
2040
break;
2041
case DLM_LKSTS_WAITING:
2042
del_lkb(r, lkb);
2043
lkb->lkb_grmode = DLM_LOCK_IV;
2044
/* this unhold undoes the original ref from create_lkb()
2045
so this leads to the lkb being freed */
2046
unhold_lkb(lkb);
2047
rv = -1;
2048
break;
2049
default:
2050
log_print("invalid status for revert %d", lkb->lkb_status);
2051
}
2052
return rv;
2053
}
2054
2055
static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2056
{
2057
return revert_lock(r, lkb);
2058
}
2059
2060
static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2061
{
2062
if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2063
lkb->lkb_grmode = lkb->lkb_rqmode;
2064
if (lkb->lkb_status)
2065
move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2066
else
2067
add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2068
}
2069
2070
lkb->lkb_rqmode = DLM_LOCK_IV;
2071
lkb->lkb_highbast = 0;
2072
}
2073
2074
static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2075
{
2076
set_lvb_lock(r, lkb);
2077
_grant_lock(r, lkb);
2078
}
2079
2080
static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2081
const struct dlm_message *ms)
2082
{
2083
set_lvb_lock_pc(r, lkb, ms);
2084
_grant_lock(r, lkb);
2085
}
2086
2087
/* called by grant_pending_locks() which means an async grant message must
2088
be sent to the requesting node in addition to granting the lock if the
2089
lkb belongs to a remote node. */
2090
2091
static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2092
{
2093
grant_lock(r, lkb);
2094
if (is_master_copy(lkb))
2095
send_grant(r, lkb);
2096
else
2097
queue_cast(r, lkb, 0);
2098
}
2099
2100
/* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
2101
change the granted/requested modes. We're munging things accordingly in
2102
the process copy.
2103
CONVDEADLK: our grmode may have been forced down to NL to resolve a
2104
conversion deadlock
2105
ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
2106
compatible with other granted locks */
2107
2108
static void munge_demoted(struct dlm_lkb *lkb)
2109
{
2110
if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2111
log_print("munge_demoted %x invalid modes gr %d rq %d",
2112
lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2113
return;
2114
}
2115
2116
lkb->lkb_grmode = DLM_LOCK_NL;
2117
}
2118
2119
static void munge_altmode(struct dlm_lkb *lkb, const struct dlm_message *ms)
2120
{
2121
if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) &&
2122
ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) {
2123
log_print("munge_altmode %x invalid reply type %d",
2124
lkb->lkb_id, le32_to_cpu(ms->m_type));
2125
return;
2126
}
2127
2128
if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2129
lkb->lkb_rqmode = DLM_LOCK_PR;
2130
else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2131
lkb->lkb_rqmode = DLM_LOCK_CW;
2132
else {
2133
log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2134
dlm_print_lkb(lkb);
2135
}
2136
}
2137
2138
static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2139
{
2140
struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
2141
lkb_statequeue);
2142
if (lkb->lkb_id == first->lkb_id)
2143
return 1;
2144
2145
return 0;
2146
}
2147
2148
/* Check if the given lkb conflicts with another lkb on the queue. */
2149
2150
static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2151
{
2152
struct dlm_lkb *this;
2153
2154
list_for_each_entry(this, head, lkb_statequeue) {
2155
if (this == lkb)
2156
continue;
2157
if (!modes_compat(this, lkb))
2158
return 1;
2159
}
2160
return 0;
2161
}
2162
2163
/*
2164
* "A conversion deadlock arises with a pair of lock requests in the converting
2165
* queue for one resource. The granted mode of each lock blocks the requested
2166
* mode of the other lock."
2167
*
2168
* Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2169
* convert queue from being granted, then deadlk/demote lkb.
2170
*
2171
* Example:
2172
* Granted Queue: empty
2173
* Convert Queue: NL->EX (first lock)
2174
* PR->EX (second lock)
2175
*
2176
* The first lock can't be granted because of the granted mode of the second
2177
* lock and the second lock can't be granted because it's not first in the
2178
* list. We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2179
* demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2180
* flag set and return DEMOTED in the lksb flags.
2181
*
2182
* Originally, this function detected conv-deadlk in a more limited scope:
2183
* - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2184
* - if lkb1 was the first entry in the queue (not just earlier), and was
2185
* blocked by the granted mode of lkb2, and there was nothing on the
2186
* granted queue preventing lkb1 from being granted immediately, i.e.
2187
* lkb2 was the only thing preventing lkb1 from being granted.
2188
*
2189
* That second condition meant we'd only say there was conv-deadlk if
2190
* resolving it (by demotion) would lead to the first lock on the convert
2191
* queue being granted right away. It allowed conversion deadlocks to exist
2192
* between locks on the convert queue while they couldn't be granted anyway.
2193
*
2194
* Now, we detect and take action on conversion deadlocks immediately when
2195
* they're created, even if they may not be immediately consequential. If
2196
* lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2197
* mode that would prevent lkb1's conversion from being granted, we do a
2198
* deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2199
* I think this means that the lkb_is_ahead condition below should always
2200
* be zero, i.e. there will never be conv-deadlk between two locks that are
2201
* both already on the convert queue.
2202
*/
2203
2204
static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2205
{
2206
struct dlm_lkb *lkb1;
2207
int lkb_is_ahead = 0;
2208
2209
list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2210
if (lkb1 == lkb2) {
2211
lkb_is_ahead = 1;
2212
continue;
2213
}
2214
2215
if (!lkb_is_ahead) {
2216
if (!modes_compat(lkb2, lkb1))
2217
return 1;
2218
} else {
2219
if (!modes_compat(lkb2, lkb1) &&
2220
!modes_compat(lkb1, lkb2))
2221
return 1;
2222
}
2223
}
2224
return 0;
2225
}
2226
2227
/*
2228
* Return 1 if the lock can be granted, 0 otherwise.
2229
* Also detect and resolve conversion deadlocks.
2230
*
2231
* lkb is the lock to be granted
2232
*
2233
* now is 1 if the function is being called in the context of the
2234
* immediate request, it is 0 if called later, after the lock has been
2235
* queued.
2236
*
2237
* recover is 1 if dlm_recover_grant() is trying to grant conversions
2238
* after recovery.
2239
*
2240
* References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2241
*/
2242
2243
static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2244
int recover)
2245
{
2246
int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2247
2248
/*
2249
* 6-10: Version 5.4 introduced an option to address the phenomenon of
2250
* a new request for a NL mode lock being blocked.
2251
*
2252
* 6-11: If the optional EXPEDITE flag is used with the new NL mode
2253
* request, then it would be granted. In essence, the use of this flag
2254
* tells the Lock Manager to expedite theis request by not considering
2255
* what may be in the CONVERTING or WAITING queues... As of this
2256
* writing, the EXPEDITE flag can be used only with new requests for NL
2257
* mode locks. This flag is not valid for conversion requests.
2258
*
2259
* A shortcut. Earlier checks return an error if EXPEDITE is used in a
2260
* conversion or used with a non-NL requested mode. We also know an
2261
* EXPEDITE request is always granted immediately, so now must always
2262
* be 1. The full condition to grant an expedite request: (now &&
2263
* !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2264
* therefore be shortened to just checking the flag.
2265
*/
2266
2267
if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2268
return 1;
2269
2270
/*
2271
* A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2272
* added to the remaining conditions.
2273
*/
2274
2275
if (queue_conflict(&r->res_grantqueue, lkb))
2276
return 0;
2277
2278
/*
2279
* 6-3: By default, a conversion request is immediately granted if the
2280
* requested mode is compatible with the modes of all other granted
2281
* locks
2282
*/
2283
2284
if (queue_conflict(&r->res_convertqueue, lkb))
2285
return 0;
2286
2287
/*
2288
* The RECOVER_GRANT flag means dlm_recover_grant() is granting
2289
* locks for a recovered rsb, on which lkb's have been rebuilt.
2290
* The lkb's may have been rebuilt on the queues in a different
2291
* order than they were in on the previous master. So, granting
2292
* queued conversions in order after recovery doesn't make sense
2293
* since the order hasn't been preserved anyway. The new order
2294
* could also have created a new "in place" conversion deadlock.
2295
* (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2296
* After recovery, there would be no granted locks, and possibly
2297
* NL->EX, PR->EX, an in-place conversion deadlock.) So, after
2298
* recovery, grant conversions without considering order.
2299
*/
2300
2301
if (conv && recover)
2302
return 1;
2303
2304
/*
2305
* 6-5: But the default algorithm for deciding whether to grant or
2306
* queue conversion requests does not by itself guarantee that such
2307
* requests are serviced on a "first come first serve" basis. This, in
2308
* turn, can lead to a phenomenon known as "indefinate postponement".
2309
*
2310
* 6-7: This issue is dealt with by using the optional QUECVT flag with
2311
* the system service employed to request a lock conversion. This flag
2312
* forces certain conversion requests to be queued, even if they are
2313
* compatible with the granted modes of other locks on the same
2314
* resource. Thus, the use of this flag results in conversion requests
2315
* being ordered on a "first come first servce" basis.
2316
*
2317
* DCT: This condition is all about new conversions being able to occur
2318
* "in place" while the lock remains on the granted queue (assuming
2319
* nothing else conflicts.) IOW if QUECVT isn't set, a conversion
2320
* doesn't _have_ to go onto the convert queue where it's processed in
2321
* order. The "now" variable is necessary to distinguish converts
2322
* being received and processed for the first time now, because once a
2323
* convert is moved to the conversion queue the condition below applies
2324
* requiring fifo granting.
2325
*/
2326
2327
if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2328
return 1;
2329
2330
/*
2331
* Even if the convert is compat with all granted locks,
2332
* QUECVT forces it behind other locks on the convert queue.
2333
*/
2334
2335
if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2336
if (list_empty(&r->res_convertqueue))
2337
return 1;
2338
else
2339
return 0;
2340
}
2341
2342
/*
2343
* The NOORDER flag is set to avoid the standard vms rules on grant
2344
* order.
2345
*/
2346
2347
if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2348
return 1;
2349
2350
/*
2351
* 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2352
* granted until all other conversion requests ahead of it are granted
2353
* and/or canceled.
2354
*/
2355
2356
if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2357
return 1;
2358
2359
/*
2360
* 6-4: By default, a new request is immediately granted only if all
2361
* three of the following conditions are satisfied when the request is
2362
* issued:
2363
* - The queue of ungranted conversion requests for the resource is
2364
* empty.
2365
* - The queue of ungranted new requests for the resource is empty.
2366
* - The mode of the new request is compatible with the most
2367
* restrictive mode of all granted locks on the resource.
2368
*/
2369
2370
if (now && !conv && list_empty(&r->res_convertqueue) &&
2371
list_empty(&r->res_waitqueue))
2372
return 1;
2373
2374
/*
2375
* 6-4: Once a lock request is in the queue of ungranted new requests,
2376
* it cannot be granted until the queue of ungranted conversion
2377
* requests is empty, all ungranted new requests ahead of it are
2378
* granted and/or canceled, and it is compatible with the granted mode
2379
* of the most restrictive lock granted on the resource.
2380
*/
2381
2382
if (!now && !conv && list_empty(&r->res_convertqueue) &&
2383
first_in_list(lkb, &r->res_waitqueue))
2384
return 1;
2385
2386
return 0;
2387
}
2388
2389
static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2390
int recover, int *err)
2391
{
2392
int rv;
2393
int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2394
int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2395
2396
if (err)
2397
*err = 0;
2398
2399
rv = _can_be_granted(r, lkb, now, recover);
2400
if (rv)
2401
goto out;
2402
2403
/*
2404
* The CONVDEADLK flag is non-standard and tells the dlm to resolve
2405
* conversion deadlocks by demoting grmode to NL, otherwise the dlm
2406
* cancels one of the locks.
2407
*/
2408
2409
if (is_convert && can_be_queued(lkb) &&
2410
conversion_deadlock_detect(r, lkb)) {
2411
if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2412
lkb->lkb_grmode = DLM_LOCK_NL;
2413
set_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
2414
} else if (err) {
2415
*err = -EDEADLK;
2416
} else {
2417
log_print("can_be_granted deadlock %x now %d",
2418
lkb->lkb_id, now);
2419
dlm_dump_rsb(r);
2420
}
2421
goto out;
2422
}
2423
2424
/*
2425
* The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2426
* to grant a request in a mode other than the normal rqmode. It's a
2427
* simple way to provide a big optimization to applications that can
2428
* use them.
2429
*/
2430
2431
if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2432
alt = DLM_LOCK_PR;
2433
else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2434
alt = DLM_LOCK_CW;
2435
2436
if (alt) {
2437
lkb->lkb_rqmode = alt;
2438
rv = _can_be_granted(r, lkb, now, 0);
2439
if (rv)
2440
set_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
2441
else
2442
lkb->lkb_rqmode = rqmode;
2443
}
2444
out:
2445
return rv;
2446
}
2447
2448
/* Returns the highest requested mode of all blocked conversions; sets
2449
cw if there's a blocked conversion to DLM_LOCK_CW. */
2450
2451
static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2452
unsigned int *count)
2453
{
2454
struct dlm_lkb *lkb, *s;
2455
int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2456
int hi, demoted, quit, grant_restart, demote_restart;
2457
int deadlk;
2458
2459
quit = 0;
2460
restart:
2461
grant_restart = 0;
2462
demote_restart = 0;
2463
hi = DLM_LOCK_IV;
2464
2465
list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2466
demoted = is_demoted(lkb);
2467
deadlk = 0;
2468
2469
if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2470
grant_lock_pending(r, lkb);
2471
grant_restart = 1;
2472
if (count)
2473
(*count)++;
2474
continue;
2475
}
2476
2477
if (!demoted && is_demoted(lkb)) {
2478
log_print("WARN: pending demoted %x node %d %s",
2479
lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2480
demote_restart = 1;
2481
continue;
2482
}
2483
2484
if (deadlk) {
2485
/*
2486
* If DLM_LKB_NODLKWT flag is set and conversion
2487
* deadlock is detected, we request blocking AST and
2488
* down (or cancel) conversion.
2489
*/
2490
if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) {
2491
if (lkb->lkb_highbast < lkb->lkb_rqmode) {
2492
queue_bast(r, lkb, lkb->lkb_rqmode);
2493
lkb->lkb_highbast = lkb->lkb_rqmode;
2494
}
2495
} else {
2496
log_print("WARN: pending deadlock %x node %d %s",
2497
lkb->lkb_id, lkb->lkb_nodeid,
2498
r->res_name);
2499
dlm_dump_rsb(r);
2500
}
2501
continue;
2502
}
2503
2504
hi = max_t(int, lkb->lkb_rqmode, hi);
2505
2506
if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2507
*cw = 1;
2508
}
2509
2510
if (grant_restart)
2511
goto restart;
2512
if (demote_restart && !quit) {
2513
quit = 1;
2514
goto restart;
2515
}
2516
2517
return max_t(int, high, hi);
2518
}
2519
2520
static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2521
unsigned int *count)
2522
{
2523
struct dlm_lkb *lkb, *s;
2524
2525
list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2526
if (can_be_granted(r, lkb, 0, 0, NULL)) {
2527
grant_lock_pending(r, lkb);
2528
if (count)
2529
(*count)++;
2530
} else {
2531
high = max_t(int, lkb->lkb_rqmode, high);
2532
if (lkb->lkb_rqmode == DLM_LOCK_CW)
2533
*cw = 1;
2534
}
2535
}
2536
2537
return high;
2538
}
2539
2540
/* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2541
on either the convert or waiting queue.
2542
high is the largest rqmode of all locks blocked on the convert or
2543
waiting queue. */
2544
2545
static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2546
{
2547
if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2548
if (gr->lkb_highbast < DLM_LOCK_EX)
2549
return 1;
2550
return 0;
2551
}
2552
2553
if (gr->lkb_highbast < high &&
2554
!__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2555
return 1;
2556
return 0;
2557
}
2558
2559
static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2560
{
2561
struct dlm_lkb *lkb, *s;
2562
int high = DLM_LOCK_IV;
2563
int cw = 0;
2564
2565
if (!is_master(r)) {
2566
log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2567
dlm_dump_rsb(r);
2568
return;
2569
}
2570
2571
high = grant_pending_convert(r, high, &cw, count);
2572
high = grant_pending_wait(r, high, &cw, count);
2573
2574
if (high == DLM_LOCK_IV)
2575
return;
2576
2577
/*
2578
* If there are locks left on the wait/convert queue then send blocking
2579
* ASTs to granted locks based on the largest requested mode (high)
2580
* found above.
2581
*/
2582
2583
list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2584
if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2585
if (cw && high == DLM_LOCK_PR &&
2586
lkb->lkb_grmode == DLM_LOCK_PR)
2587
queue_bast(r, lkb, DLM_LOCK_CW);
2588
else
2589
queue_bast(r, lkb, high);
2590
lkb->lkb_highbast = high;
2591
}
2592
}
2593
}
2594
2595
static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2596
{
2597
if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2598
(gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2599
if (gr->lkb_highbast < DLM_LOCK_EX)
2600
return 1;
2601
return 0;
2602
}
2603
2604
if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2605
return 1;
2606
return 0;
2607
}
2608
2609
static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2610
struct dlm_lkb *lkb)
2611
{
2612
struct dlm_lkb *gr;
2613
2614
list_for_each_entry(gr, head, lkb_statequeue) {
2615
/* skip self when sending basts to convertqueue */
2616
if (gr == lkb)
2617
continue;
2618
if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2619
queue_bast(r, gr, lkb->lkb_rqmode);
2620
gr->lkb_highbast = lkb->lkb_rqmode;
2621
}
2622
}
2623
}
2624
2625
static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2626
{
2627
send_bast_queue(r, &r->res_grantqueue, lkb);
2628
}
2629
2630
static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2631
{
2632
send_bast_queue(r, &r->res_grantqueue, lkb);
2633
send_bast_queue(r, &r->res_convertqueue, lkb);
2634
}
2635
2636
/* set_master(r, lkb) -- set the master nodeid of a resource
2637
2638
The purpose of this function is to set the nodeid field in the given
2639
lkb using the nodeid field in the given rsb. If the rsb's nodeid is
2640
known, it can just be copied to the lkb and the function will return
2641
0. If the rsb's nodeid is _not_ known, it needs to be looked up
2642
before it can be copied to the lkb.
2643
2644
When the rsb nodeid is being looked up remotely, the initial lkb
2645
causing the lookup is kept on the ls_waiters list waiting for the
2646
lookup reply. Other lkb's waiting for the same rsb lookup are kept
2647
on the rsb's res_lookup list until the master is verified.
2648
2649
Return values:
2650
0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2651
1: the rsb master is not available and the lkb has been placed on
2652
a wait queue
2653
*/
2654
2655
static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2656
{
2657
int our_nodeid = dlm_our_nodeid();
2658
2659
if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2660
rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2661
r->res_first_lkid = lkb->lkb_id;
2662
lkb->lkb_nodeid = r->res_nodeid;
2663
return 0;
2664
}
2665
2666
if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2667
list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2668
return 1;
2669
}
2670
2671
if (r->res_master_nodeid == our_nodeid) {
2672
lkb->lkb_nodeid = 0;
2673
return 0;
2674
}
2675
2676
if (r->res_master_nodeid) {
2677
lkb->lkb_nodeid = r->res_master_nodeid;
2678
return 0;
2679
}
2680
2681
if (dlm_dir_nodeid(r) == our_nodeid) {
2682
/* This is a somewhat unusual case; find_rsb will usually
2683
have set res_master_nodeid when dir nodeid is local, but
2684
there are cases where we become the dir node after we've
2685
past find_rsb and go through _request_lock again.
2686
confirm_master() or process_lookup_list() needs to be
2687
called after this. */
2688
log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2689
lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2690
r->res_name);
2691
r->res_master_nodeid = our_nodeid;
2692
r->res_nodeid = 0;
2693
lkb->lkb_nodeid = 0;
2694
return 0;
2695
}
2696
2697
r->res_first_lkid = lkb->lkb_id;
2698
send_lookup(r, lkb);
2699
return 1;
2700
}
2701
2702
static void process_lookup_list(struct dlm_rsb *r)
2703
{
2704
struct dlm_lkb *lkb, *safe;
2705
2706
list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2707
list_del_init(&lkb->lkb_rsb_lookup);
2708
_request_lock(r, lkb);
2709
}
2710
}
2711
2712
/* confirm_master -- confirm (or deny) an rsb's master nodeid */
2713
2714
static void confirm_master(struct dlm_rsb *r, int error)
2715
{
2716
struct dlm_lkb *lkb;
2717
2718
if (!r->res_first_lkid)
2719
return;
2720
2721
switch (error) {
2722
case 0:
2723
case -EINPROGRESS:
2724
r->res_first_lkid = 0;
2725
process_lookup_list(r);
2726
break;
2727
2728
case -EAGAIN:
2729
case -EBADR:
2730
case -ENOTBLK:
2731
/* the remote request failed and won't be retried (it was
2732
a NOQUEUE, or has been canceled/unlocked); make a waiting
2733
lkb the first_lkid */
2734
2735
r->res_first_lkid = 0;
2736
2737
if (!list_empty(&r->res_lookup)) {
2738
lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2739
lkb_rsb_lookup);
2740
list_del_init(&lkb->lkb_rsb_lookup);
2741
r->res_first_lkid = lkb->lkb_id;
2742
_request_lock(r, lkb);
2743
}
2744
break;
2745
2746
default:
2747
log_error(r->res_ls, "confirm_master unknown error %d", error);
2748
}
2749
}
2750
2751
static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2752
int namelen, void (*ast)(void *astparam),
2753
void *astparam,
2754
void (*bast)(void *astparam, int mode),
2755
struct dlm_args *args)
2756
{
2757
int rv = -EINVAL;
2758
2759
/* check for invalid arg usage */
2760
2761
if (mode < 0 || mode > DLM_LOCK_EX)
2762
goto out;
2763
2764
if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2765
goto out;
2766
2767
if (flags & DLM_LKF_CANCEL)
2768
goto out;
2769
2770
if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2771
goto out;
2772
2773
if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2774
goto out;
2775
2776
if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2777
goto out;
2778
2779
if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2780
goto out;
2781
2782
if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2783
goto out;
2784
2785
if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2786
goto out;
2787
2788
if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2789
goto out;
2790
2791
if (!ast || !lksb)
2792
goto out;
2793
2794
if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2795
goto out;
2796
2797
if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2798
goto out;
2799
2800
/* these args will be copied to the lkb in validate_lock_args,
2801
it cannot be done now because when converting locks, fields in
2802
an active lkb cannot be modified before locking the rsb */
2803
2804
args->flags = flags;
2805
args->astfn = ast;
2806
args->astparam = astparam;
2807
args->bastfn = bast;
2808
args->mode = mode;
2809
args->lksb = lksb;
2810
rv = 0;
2811
out:
2812
return rv;
2813
}
2814
2815
static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2816
{
2817
if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2818
DLM_LKF_FORCEUNLOCK))
2819
return -EINVAL;
2820
2821
if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2822
return -EINVAL;
2823
2824
args->flags = flags;
2825
args->astparam = astarg;
2826
return 0;
2827
}
2828
2829
static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2830
struct dlm_args *args)
2831
{
2832
int rv = -EBUSY;
2833
2834
if (args->flags & DLM_LKF_CONVERT) {
2835
if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2836
goto out;
2837
2838
/* lock not allowed if there's any op in progress */
2839
if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2840
goto out;
2841
2842
if (is_overlap(lkb))
2843
goto out;
2844
2845
rv = -EINVAL;
2846
if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
2847
goto out;
2848
2849
if (args->flags & DLM_LKF_QUECVT &&
2850
!__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2851
goto out;
2852
}
2853
2854
lkb->lkb_exflags = args->flags;
2855
dlm_set_sbflags_val(lkb, 0);
2856
lkb->lkb_astfn = args->astfn;
2857
lkb->lkb_astparam = args->astparam;
2858
lkb->lkb_bastfn = args->bastfn;
2859
lkb->lkb_rqmode = args->mode;
2860
lkb->lkb_lksb = args->lksb;
2861
lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2862
lkb->lkb_ownpid = (int) current->pid;
2863
rv = 0;
2864
out:
2865
switch (rv) {
2866
case 0:
2867
break;
2868
case -EINVAL:
2869
/* annoy the user because dlm usage is wrong */
2870
WARN_ON(1);
2871
log_error(ls, "%s %d %x %x %x %d %d", __func__,
2872
rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2873
lkb->lkb_status, lkb->lkb_wait_type);
2874
break;
2875
default:
2876
log_debug(ls, "%s %d %x %x %x %d %d", __func__,
2877
rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2878
lkb->lkb_status, lkb->lkb_wait_type);
2879
break;
2880
}
2881
2882
return rv;
2883
}
2884
2885
/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2886
for success */
2887
2888
/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2889
because there may be a lookup in progress and it's valid to do
2890
cancel/unlockf on it */
2891
2892
static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2893
{
2894
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2895
int rv = -EBUSY;
2896
2897
/* normal unlock not allowed if there's any op in progress */
2898
if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) &&
2899
(lkb->lkb_wait_type || lkb->lkb_wait_count))
2900
goto out;
2901
2902
/* an lkb may be waiting for an rsb lookup to complete where the
2903
lookup was initiated by another lock */
2904
2905
if (!list_empty(&lkb->lkb_rsb_lookup)) {
2906
if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2907
log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2908
list_del_init(&lkb->lkb_rsb_lookup);
2909
queue_cast(lkb->lkb_resource, lkb,
2910
args->flags & DLM_LKF_CANCEL ?
2911
-DLM_ECANCEL : -DLM_EUNLOCK);
2912
unhold_lkb(lkb); /* undoes create_lkb() */
2913
}
2914
/* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2915
goto out;
2916
}
2917
2918
rv = -EINVAL;
2919
if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) {
2920
log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2921
dlm_print_lkb(lkb);
2922
goto out;
2923
}
2924
2925
/* an lkb may still exist even though the lock is EOL'ed due to a
2926
* cancel, unlock or failed noqueue request; an app can't use these
2927
* locks; return same error as if the lkid had not been found at all
2928
*/
2929
2930
if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) {
2931
log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2932
rv = -ENOENT;
2933
goto out;
2934
}
2935
2936
if (is_overlap_unlock(lkb))
2937
goto out;
2938
2939
/* cancel not allowed with another cancel/unlock in progress */
2940
2941
if (args->flags & DLM_LKF_CANCEL) {
2942
if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2943
goto out;
2944
2945
if (is_overlap_cancel(lkb))
2946
goto out;
2947
2948
if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2949
set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2950
rv = -EBUSY;
2951
goto out;
2952
}
2953
2954
/* there's nothing to cancel */
2955
if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2956
!lkb->lkb_wait_type) {
2957
rv = -EBUSY;
2958
goto out;
2959
}
2960
2961
switch (lkb->lkb_wait_type) {
2962
case DLM_MSG_LOOKUP:
2963
case DLM_MSG_REQUEST:
2964
set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2965
rv = -EBUSY;
2966
goto out;
2967
case DLM_MSG_UNLOCK:
2968
case DLM_MSG_CANCEL:
2969
goto out;
2970
}
2971
/* add_to_waiters() will set OVERLAP_CANCEL */
2972
goto out_ok;
2973
}
2974
2975
/* do we need to allow a force-unlock if there's a normal unlock
2976
already in progress? in what conditions could the normal unlock
2977
fail such that we'd want to send a force-unlock to be sure? */
2978
2979
if (args->flags & DLM_LKF_FORCEUNLOCK) {
2980
if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2981
goto out;
2982
2983
if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2984
set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2985
rv = -EBUSY;
2986
goto out;
2987
}
2988
2989
switch (lkb->lkb_wait_type) {
2990
case DLM_MSG_LOOKUP:
2991
case DLM_MSG_REQUEST:
2992
set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2993
rv = -EBUSY;
2994
goto out;
2995
case DLM_MSG_UNLOCK:
2996
goto out;
2997
}
2998
/* add_to_waiters() will set OVERLAP_UNLOCK */
2999
}
3000
3001
out_ok:
3002
/* an overlapping op shouldn't blow away exflags from other op */
3003
lkb->lkb_exflags |= args->flags;
3004
dlm_set_sbflags_val(lkb, 0);
3005
lkb->lkb_astparam = args->astparam;
3006
rv = 0;
3007
out:
3008
switch (rv) {
3009
case 0:
3010
break;
3011
case -EINVAL:
3012
/* annoy the user because dlm usage is wrong */
3013
WARN_ON(1);
3014
log_error(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
3015
lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
3016
args->flags, lkb->lkb_wait_type,
3017
lkb->lkb_resource->res_name);
3018
break;
3019
default:
3020
log_debug(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
3021
lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
3022
args->flags, lkb->lkb_wait_type,
3023
lkb->lkb_resource->res_name);
3024
break;
3025
}
3026
3027
return rv;
3028
}
3029
3030
/*
3031
* Four stage 4 varieties:
3032
* do_request(), do_convert(), do_unlock(), do_cancel()
3033
* These are called on the master node for the given lock and
3034
* from the central locking logic.
3035
*/
3036
3037
static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3038
{
3039
int error = 0;
3040
3041
if (can_be_granted(r, lkb, 1, 0, NULL)) {
3042
grant_lock(r, lkb);
3043
queue_cast(r, lkb, 0);
3044
goto out;
3045
}
3046
3047
if (can_be_queued(lkb)) {
3048
error = -EINPROGRESS;
3049
add_lkb(r, lkb, DLM_LKSTS_WAITING);
3050
goto out;
3051
}
3052
3053
error = -EAGAIN;
3054
queue_cast(r, lkb, -EAGAIN);
3055
out:
3056
return error;
3057
}
3058
3059
static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3060
int error)
3061
{
3062
switch (error) {
3063
case -EAGAIN:
3064
if (force_blocking_asts(lkb))
3065
send_blocking_asts_all(r, lkb);
3066
break;
3067
case -EINPROGRESS:
3068
send_blocking_asts(r, lkb);
3069
break;
3070
}
3071
}
3072
3073
static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3074
{
3075
int error = 0;
3076
int deadlk = 0;
3077
3078
/* changing an existing lock may allow others to be granted */
3079
3080
if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
3081
grant_lock(r, lkb);
3082
queue_cast(r, lkb, 0);
3083
goto out;
3084
}
3085
3086
/* can_be_granted() detected that this lock would block in a conversion
3087
deadlock, so we leave it on the granted queue and return EDEADLK in
3088
the ast for the convert. */
3089
3090
if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
3091
/* it's left on the granted queue */
3092
revert_lock(r, lkb);
3093
queue_cast(r, lkb, -EDEADLK);
3094
error = -EDEADLK;
3095
goto out;
3096
}
3097
3098
/* is_demoted() means the can_be_granted() above set the grmode
3099
to NL, and left us on the granted queue. This auto-demotion
3100
(due to CONVDEADLK) might mean other locks, and/or this lock, are
3101
now grantable. We have to try to grant other converting locks
3102
before we try again to grant this one. */
3103
3104
if (is_demoted(lkb)) {
3105
grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
3106
if (_can_be_granted(r, lkb, 1, 0)) {
3107
grant_lock(r, lkb);
3108
queue_cast(r, lkb, 0);
3109
goto out;
3110
}
3111
/* else fall through and move to convert queue */
3112
}
3113
3114
if (can_be_queued(lkb)) {
3115
error = -EINPROGRESS;
3116
del_lkb(r, lkb);
3117
add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3118
goto out;
3119
}
3120
3121
error = -EAGAIN;
3122
queue_cast(r, lkb, -EAGAIN);
3123
out:
3124
return error;
3125
}
3126
3127
static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3128
int error)
3129
{
3130
switch (error) {
3131
case 0:
3132
grant_pending_locks(r, NULL);
3133
/* grant_pending_locks also sends basts */
3134
break;
3135
case -EAGAIN:
3136
if (force_blocking_asts(lkb))
3137
send_blocking_asts_all(r, lkb);
3138
break;
3139
case -EINPROGRESS:
3140
send_blocking_asts(r, lkb);
3141
break;
3142
}
3143
}
3144
3145
static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3146
{
3147
remove_lock(r, lkb);
3148
queue_cast(r, lkb, -DLM_EUNLOCK);
3149
return -DLM_EUNLOCK;
3150
}
3151
3152
static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3153
int error)
3154
{
3155
grant_pending_locks(r, NULL);
3156
}
3157
3158
/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
3159
3160
static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3161
{
3162
int error;
3163
3164
error = revert_lock(r, lkb);
3165
if (error) {
3166
queue_cast(r, lkb, -DLM_ECANCEL);
3167
return -DLM_ECANCEL;
3168
}
3169
return 0;
3170
}
3171
3172
static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3173
int error)
3174
{
3175
if (error)
3176
grant_pending_locks(r, NULL);
3177
}
3178
3179
/*
3180
* Four stage 3 varieties:
3181
* _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3182
*/
3183
3184
/* add a new lkb to a possibly new rsb, called by requesting process */
3185
3186
static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3187
{
3188
int error;
3189
3190
/* set_master: sets lkb nodeid from r */
3191
3192
error = set_master(r, lkb);
3193
if (error < 0)
3194
goto out;
3195
if (error) {
3196
error = 0;
3197
goto out;
3198
}
3199
3200
if (is_remote(r)) {
3201
/* receive_request() calls do_request() on remote node */
3202
error = send_request(r, lkb);
3203
} else {
3204
error = do_request(r, lkb);
3205
/* for remote locks the request_reply is sent
3206
between do_request and do_request_effects */
3207
do_request_effects(r, lkb, error);
3208
}
3209
out:
3210
return error;
3211
}
3212
3213
/* change some property of an existing lkb, e.g. mode */
3214
3215
static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3216
{
3217
int error;
3218
3219
if (is_remote(r)) {
3220
/* receive_convert() calls do_convert() on remote node */
3221
error = send_convert(r, lkb);
3222
} else {
3223
error = do_convert(r, lkb);
3224
/* for remote locks the convert_reply is sent
3225
between do_convert and do_convert_effects */
3226
do_convert_effects(r, lkb, error);
3227
}
3228
3229
return error;
3230
}
3231
3232
/* remove an existing lkb from the granted queue */
3233
3234
static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3235
{
3236
int error;
3237
3238
if (is_remote(r)) {
3239
/* receive_unlock() calls do_unlock() on remote node */
3240
error = send_unlock(r, lkb);
3241
} else {
3242
error = do_unlock(r, lkb);
3243
/* for remote locks the unlock_reply is sent
3244
between do_unlock and do_unlock_effects */
3245
do_unlock_effects(r, lkb, error);
3246
}
3247
3248
return error;
3249
}
3250
3251
/* remove an existing lkb from the convert or wait queue */
3252
3253
static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3254
{
3255
int error;
3256
3257
if (is_remote(r)) {
3258
/* receive_cancel() calls do_cancel() on remote node */
3259
error = send_cancel(r, lkb);
3260
} else {
3261
error = do_cancel(r, lkb);
3262
/* for remote locks the cancel_reply is sent
3263
between do_cancel and do_cancel_effects */
3264
do_cancel_effects(r, lkb, error);
3265
}
3266
3267
return error;
3268
}
3269
3270
/*
3271
* Four stage 2 varieties:
3272
* request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3273
*/
3274
3275
static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3276
const void *name, int len,
3277
struct dlm_args *args)
3278
{
3279
struct dlm_rsb *r;
3280
int error;
3281
3282
error = validate_lock_args(ls, lkb, args);
3283
if (error)
3284
return error;
3285
3286
error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3287
if (error)
3288
return error;
3289
3290
lock_rsb(r);
3291
3292
attach_lkb(r, lkb);
3293
lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3294
3295
error = _request_lock(r, lkb);
3296
3297
unlock_rsb(r);
3298
put_rsb(r);
3299
return error;
3300
}
3301
3302
static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3303
struct dlm_args *args)
3304
{
3305
struct dlm_rsb *r;
3306
int error;
3307
3308
r = lkb->lkb_resource;
3309
3310
hold_rsb(r);
3311
lock_rsb(r);
3312
3313
error = validate_lock_args(ls, lkb, args);
3314
if (error)
3315
goto out;
3316
3317
error = _convert_lock(r, lkb);
3318
out:
3319
unlock_rsb(r);
3320
put_rsb(r);
3321
return error;
3322
}
3323
3324
static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3325
struct dlm_args *args)
3326
{
3327
struct dlm_rsb *r;
3328
int error;
3329
3330
r = lkb->lkb_resource;
3331
3332
hold_rsb(r);
3333
lock_rsb(r);
3334
3335
error = validate_unlock_args(lkb, args);
3336
if (error)
3337
goto out;
3338
3339
error = _unlock_lock(r, lkb);
3340
out:
3341
unlock_rsb(r);
3342
put_rsb(r);
3343
return error;
3344
}
3345
3346
static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3347
struct dlm_args *args)
3348
{
3349
struct dlm_rsb *r;
3350
int error;
3351
3352
r = lkb->lkb_resource;
3353
3354
hold_rsb(r);
3355
lock_rsb(r);
3356
3357
error = validate_unlock_args(lkb, args);
3358
if (error)
3359
goto out;
3360
3361
error = _cancel_lock(r, lkb);
3362
out:
3363
unlock_rsb(r);
3364
put_rsb(r);
3365
return error;
3366
}
3367
3368
/*
3369
* Two stage 1 varieties: dlm_lock() and dlm_unlock()
3370
*/
3371
3372
int dlm_lock(dlm_lockspace_t *lockspace,
3373
int mode,
3374
struct dlm_lksb *lksb,
3375
uint32_t flags,
3376
const void *name,
3377
unsigned int namelen,
3378
uint32_t parent_lkid,
3379
void (*ast) (void *astarg),
3380
void *astarg,
3381
void (*bast) (void *astarg, int mode))
3382
{
3383
struct dlm_ls *ls;
3384
struct dlm_lkb *lkb;
3385
struct dlm_args args;
3386
int error, convert = flags & DLM_LKF_CONVERT;
3387
3388
ls = dlm_find_lockspace_local(lockspace);
3389
if (!ls)
3390
return -EINVAL;
3391
3392
dlm_lock_recovery(ls);
3393
3394
if (convert)
3395
error = find_lkb(ls, lksb->sb_lkid, &lkb);
3396
else
3397
error = create_lkb(ls, &lkb);
3398
3399
if (error)
3400
goto out;
3401
3402
trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
3403
3404
error = set_lock_args(mode, lksb, flags, namelen, ast, astarg, bast,
3405
&args);
3406
if (error)
3407
goto out_put;
3408
3409
if (convert)
3410
error = convert_lock(ls, lkb, &args);
3411
else
3412
error = request_lock(ls, lkb, name, namelen, &args);
3413
3414
if (error == -EINPROGRESS)
3415
error = 0;
3416
out_put:
3417
trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, true);
3418
3419
if (convert || error)
3420
__put_lkb(ls, lkb);
3421
if (error == -EAGAIN || error == -EDEADLK)
3422
error = 0;
3423
out:
3424
dlm_unlock_recovery(ls);
3425
dlm_put_lockspace(ls);
3426
return error;
3427
}
3428
3429
int dlm_unlock(dlm_lockspace_t *lockspace,
3430
uint32_t lkid,
3431
uint32_t flags,
3432
struct dlm_lksb *lksb,
3433
void *astarg)
3434
{
3435
struct dlm_ls *ls;
3436
struct dlm_lkb *lkb;
3437
struct dlm_args args;
3438
int error;
3439
3440
ls = dlm_find_lockspace_local(lockspace);
3441
if (!ls)
3442
return -EINVAL;
3443
3444
dlm_lock_recovery(ls);
3445
3446
error = find_lkb(ls, lkid, &lkb);
3447
if (error)
3448
goto out;
3449
3450
trace_dlm_unlock_start(ls, lkb, flags);
3451
3452
error = set_unlock_args(flags, astarg, &args);
3453
if (error)
3454
goto out_put;
3455
3456
if (flags & DLM_LKF_CANCEL)
3457
error = cancel_lock(ls, lkb, &args);
3458
else
3459
error = unlock_lock(ls, lkb, &args);
3460
3461
if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3462
error = 0;
3463
if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3464
error = 0;
3465
out_put:
3466
trace_dlm_unlock_end(ls, lkb, flags, error);
3467
3468
dlm_put_lkb(lkb);
3469
out:
3470
dlm_unlock_recovery(ls);
3471
dlm_put_lockspace(ls);
3472
return error;
3473
}
3474
3475
/*
3476
* send/receive routines for remote operations and replies
3477
*
3478
* send_args
3479
* send_common
3480
* send_request receive_request
3481
* send_convert receive_convert
3482
* send_unlock receive_unlock
3483
* send_cancel receive_cancel
3484
* send_grant receive_grant
3485
* send_bast receive_bast
3486
* send_lookup receive_lookup
3487
* send_remove receive_remove
3488
*
3489
* send_common_reply
3490
* receive_request_reply send_request_reply
3491
* receive_convert_reply send_convert_reply
3492
* receive_unlock_reply send_unlock_reply
3493
* receive_cancel_reply send_cancel_reply
3494
* receive_lookup_reply send_lookup_reply
3495
*/
3496
3497
static int _create_message(struct dlm_ls *ls, int mb_len,
3498
int to_nodeid, int mstype,
3499
struct dlm_message **ms_ret,
3500
struct dlm_mhandle **mh_ret)
3501
{
3502
struct dlm_message *ms;
3503
struct dlm_mhandle *mh;
3504
char *mb;
3505
3506
/* get_buffer gives us a message handle (mh) that we need to
3507
pass into midcomms_commit and a message buffer (mb) that we
3508
write our data into */
3509
3510
mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb);
3511
if (!mh)
3512
return -ENOBUFS;
3513
3514
ms = (struct dlm_message *) mb;
3515
3516
ms->m_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3517
ms->m_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id);
3518
ms->m_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
3519
ms->m_header.h_length = cpu_to_le16(mb_len);
3520
ms->m_header.h_cmd = DLM_MSG;
3521
3522
ms->m_type = cpu_to_le32(mstype);
3523
3524
*mh_ret = mh;
3525
*ms_ret = ms;
3526
return 0;
3527
}
3528
3529
static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3530
int to_nodeid, int mstype,
3531
struct dlm_message **ms_ret,
3532
struct dlm_mhandle **mh_ret)
3533
{
3534
int mb_len = sizeof(struct dlm_message);
3535
3536
switch (mstype) {
3537
case DLM_MSG_REQUEST:
3538
case DLM_MSG_LOOKUP:
3539
case DLM_MSG_REMOVE:
3540
mb_len += r->res_length;
3541
break;
3542
case DLM_MSG_CONVERT:
3543
case DLM_MSG_UNLOCK:
3544
case DLM_MSG_REQUEST_REPLY:
3545
case DLM_MSG_CONVERT_REPLY:
3546
case DLM_MSG_GRANT:
3547
if (lkb && lkb->lkb_lvbptr && (lkb->lkb_exflags & DLM_LKF_VALBLK))
3548
mb_len += r->res_ls->ls_lvblen;
3549
break;
3550
}
3551
3552
return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3553
ms_ret, mh_ret);
3554
}
3555
3556
/* further lowcomms enhancements or alternate implementations may make
3557
the return value from this function useful at some point */
3558
3559
static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms,
3560
const void *name, int namelen)
3561
{
3562
dlm_midcomms_commit_mhandle(mh, name, namelen);
3563
return 0;
3564
}
3565
3566
static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3567
struct dlm_message *ms)
3568
{
3569
ms->m_nodeid = cpu_to_le32(lkb->lkb_nodeid);
3570
ms->m_pid = cpu_to_le32(lkb->lkb_ownpid);
3571
ms->m_lkid = cpu_to_le32(lkb->lkb_id);
3572
ms->m_remid = cpu_to_le32(lkb->lkb_remid);
3573
ms->m_exflags = cpu_to_le32(lkb->lkb_exflags);
3574
ms->m_sbflags = cpu_to_le32(dlm_sbflags_val(lkb));
3575
ms->m_flags = cpu_to_le32(dlm_dflags_val(lkb));
3576
ms->m_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
3577
ms->m_status = cpu_to_le32(lkb->lkb_status);
3578
ms->m_grmode = cpu_to_le32(lkb->lkb_grmode);
3579
ms->m_rqmode = cpu_to_le32(lkb->lkb_rqmode);
3580
ms->m_hash = cpu_to_le32(r->res_hash);
3581
3582
/* m_result and m_bastmode are set from function args,
3583
not from lkb fields */
3584
3585
if (lkb->lkb_bastfn)
3586
ms->m_asts |= cpu_to_le32(DLM_CB_BAST);
3587
if (lkb->lkb_astfn)
3588
ms->m_asts |= cpu_to_le32(DLM_CB_CAST);
3589
3590
/* compare with switch in create_message; send_remove() doesn't
3591
use send_args() */
3592
3593
switch (ms->m_type) {
3594
case cpu_to_le32(DLM_MSG_REQUEST):
3595
case cpu_to_le32(DLM_MSG_LOOKUP):
3596
memcpy(ms->m_extra, r->res_name, r->res_length);
3597
break;
3598
case cpu_to_le32(DLM_MSG_CONVERT):
3599
case cpu_to_le32(DLM_MSG_UNLOCK):
3600
case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3601
case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3602
case cpu_to_le32(DLM_MSG_GRANT):
3603
if (!lkb->lkb_lvbptr || !(lkb->lkb_exflags & DLM_LKF_VALBLK))
3604
break;
3605
memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3606
break;
3607
}
3608
}
3609
3610
static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3611
{
3612
struct dlm_message *ms;
3613
struct dlm_mhandle *mh;
3614
int to_nodeid, error;
3615
3616
to_nodeid = r->res_nodeid;
3617
3618
add_to_waiters(lkb, mstype, to_nodeid);
3619
error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3620
if (error)
3621
goto fail;
3622
3623
send_args(r, lkb, ms);
3624
3625
error = send_message(mh, ms, r->res_name, r->res_length);
3626
if (error)
3627
goto fail;
3628
return 0;
3629
3630
fail:
3631
remove_from_waiters(lkb, msg_reply_type(mstype));
3632
return error;
3633
}
3634
3635
static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3636
{
3637
return send_common(r, lkb, DLM_MSG_REQUEST);
3638
}
3639
3640
static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3641
{
3642
int error;
3643
3644
error = send_common(r, lkb, DLM_MSG_CONVERT);
3645
3646
/* down conversions go without a reply from the master */
3647
if (!error && down_conversion(lkb)) {
3648
remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3649
r->res_ls->ls_local_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
3650
r->res_ls->ls_local_ms.m_result = 0;
3651
__receive_convert_reply(r, lkb, &r->res_ls->ls_local_ms, true);
3652
}
3653
3654
return error;
3655
}
3656
3657
/* FIXME: if this lkb is the only lock we hold on the rsb, then set
3658
MASTER_UNCERTAIN to force the next request on the rsb to confirm
3659
that the master is still correct. */
3660
3661
static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3662
{
3663
return send_common(r, lkb, DLM_MSG_UNLOCK);
3664
}
3665
3666
static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3667
{
3668
return send_common(r, lkb, DLM_MSG_CANCEL);
3669
}
3670
3671
static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3672
{
3673
struct dlm_message *ms;
3674
struct dlm_mhandle *mh;
3675
int to_nodeid, error;
3676
3677
to_nodeid = lkb->lkb_nodeid;
3678
3679
error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3680
if (error)
3681
goto out;
3682
3683
send_args(r, lkb, ms);
3684
3685
ms->m_result = 0;
3686
3687
error = send_message(mh, ms, r->res_name, r->res_length);
3688
out:
3689
return error;
3690
}
3691
3692
static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3693
{
3694
struct dlm_message *ms;
3695
struct dlm_mhandle *mh;
3696
int to_nodeid, error;
3697
3698
to_nodeid = lkb->lkb_nodeid;
3699
3700
error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3701
if (error)
3702
goto out;
3703
3704
send_args(r, lkb, ms);
3705
3706
ms->m_bastmode = cpu_to_le32(mode);
3707
3708
error = send_message(mh, ms, r->res_name, r->res_length);
3709
out:
3710
return error;
3711
}
3712
3713
static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3714
{
3715
struct dlm_message *ms;
3716
struct dlm_mhandle *mh;
3717
int to_nodeid, error;
3718
3719
to_nodeid = dlm_dir_nodeid(r);
3720
3721
add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3722
error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3723
if (error)
3724
goto fail;
3725
3726
send_args(r, lkb, ms);
3727
3728
error = send_message(mh, ms, r->res_name, r->res_length);
3729
if (error)
3730
goto fail;
3731
return 0;
3732
3733
fail:
3734
remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3735
return error;
3736
}
3737
3738
static int send_remove(struct dlm_rsb *r)
3739
{
3740
struct dlm_message *ms;
3741
struct dlm_mhandle *mh;
3742
int to_nodeid, error;
3743
3744
to_nodeid = dlm_dir_nodeid(r);
3745
3746
error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3747
if (error)
3748
goto out;
3749
3750
memcpy(ms->m_extra, r->res_name, r->res_length);
3751
ms->m_hash = cpu_to_le32(r->res_hash);
3752
3753
error = send_message(mh, ms, r->res_name, r->res_length);
3754
out:
3755
return error;
3756
}
3757
3758
static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3759
int mstype, int rv)
3760
{
3761
struct dlm_message *ms;
3762
struct dlm_mhandle *mh;
3763
int to_nodeid, error;
3764
3765
to_nodeid = lkb->lkb_nodeid;
3766
3767
error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3768
if (error)
3769
goto out;
3770
3771
send_args(r, lkb, ms);
3772
3773
ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3774
3775
error = send_message(mh, ms, r->res_name, r->res_length);
3776
out:
3777
return error;
3778
}
3779
3780
static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3781
{
3782
return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3783
}
3784
3785
static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3786
{
3787
return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3788
}
3789
3790
static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3791
{
3792
return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3793
}
3794
3795
static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3796
{
3797
return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3798
}
3799
3800
static int send_lookup_reply(struct dlm_ls *ls,
3801
const struct dlm_message *ms_in, int ret_nodeid,
3802
int rv)
3803
{
3804
struct dlm_rsb *r = &ls->ls_local_rsb;
3805
struct dlm_message *ms;
3806
struct dlm_mhandle *mh;
3807
int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
3808
3809
error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3810
if (error)
3811
goto out;
3812
3813
ms->m_lkid = ms_in->m_lkid;
3814
ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3815
ms->m_nodeid = cpu_to_le32(ret_nodeid);
3816
3817
error = send_message(mh, ms, ms_in->m_extra, receive_extralen(ms_in));
3818
out:
3819
return error;
3820
}
3821
3822
/* which args we save from a received message depends heavily on the type
3823
of message, unlike the send side where we can safely send everything about
3824
the lkb for any type of message */
3825
3826
static void receive_flags(struct dlm_lkb *lkb, const struct dlm_message *ms)
3827
{
3828
lkb->lkb_exflags = le32_to_cpu(ms->m_exflags);
3829
dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3830
dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3831
}
3832
3833
static void receive_flags_reply(struct dlm_lkb *lkb,
3834
const struct dlm_message *ms,
3835
bool local)
3836
{
3837
if (local)
3838
return;
3839
3840
dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3841
dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3842
}
3843
3844
static int receive_extralen(const struct dlm_message *ms)
3845
{
3846
return (le16_to_cpu(ms->m_header.h_length) -
3847
sizeof(struct dlm_message));
3848
}
3849
3850
static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3851
const struct dlm_message *ms)
3852
{
3853
int len;
3854
3855
if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3856
if (!lkb->lkb_lvbptr)
3857
lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3858
if (!lkb->lkb_lvbptr)
3859
return -ENOMEM;
3860
len = receive_extralen(ms);
3861
if (len > ls->ls_lvblen)
3862
len = ls->ls_lvblen;
3863
memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3864
}
3865
return 0;
3866
}
3867
3868
static void fake_bastfn(void *astparam, int mode)
3869
{
3870
log_print("fake_bastfn should not be called");
3871
}
3872
3873
static void fake_astfn(void *astparam)
3874
{
3875
log_print("fake_astfn should not be called");
3876
}
3877
3878
static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3879
const struct dlm_message *ms)
3880
{
3881
lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3882
lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
3883
lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3884
lkb->lkb_grmode = DLM_LOCK_IV;
3885
lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3886
3887
lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL;
3888
lkb->lkb_astfn = (ms->m_asts & cpu_to_le32(DLM_CB_CAST)) ? &fake_astfn : NULL;
3889
3890
if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3891
/* lkb was just created so there won't be an lvb yet */
3892
lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3893
if (!lkb->lkb_lvbptr)
3894
return -ENOMEM;
3895
}
3896
3897
return 0;
3898
}
3899
3900
static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3901
const struct dlm_message *ms)
3902
{
3903
if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3904
return -EBUSY;
3905
3906
if (receive_lvb(ls, lkb, ms))
3907
return -ENOMEM;
3908
3909
lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3910
lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
3911
3912
return 0;
3913
}
3914
3915
static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3916
const struct dlm_message *ms)
3917
{
3918
if (receive_lvb(ls, lkb, ms))
3919
return -ENOMEM;
3920
return 0;
3921
}
3922
3923
/* We fill in the local-lkb fields with the info that send_xxxx_reply()
3924
uses to send a reply and that the remote end uses to process the reply. */
3925
3926
static void setup_local_lkb(struct dlm_ls *ls, const struct dlm_message *ms)
3927
{
3928
struct dlm_lkb *lkb = &ls->ls_local_lkb;
3929
lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3930
lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3931
}
3932
3933
/* This is called after the rsb is locked so that we can safely inspect
3934
fields in the lkb. */
3935
3936
static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms)
3937
{
3938
int from = le32_to_cpu(ms->m_header.h_nodeid);
3939
int error = 0;
3940
3941
/* currently mixing of user/kernel locks are not supported */
3942
if (ms->m_flags & cpu_to_le32(BIT(DLM_DFL_USER_BIT)) &&
3943
!test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
3944
log_error(lkb->lkb_resource->res_ls,
3945
"got user dlm message for a kernel lock");
3946
error = -EINVAL;
3947
goto out;
3948
}
3949
3950
switch (ms->m_type) {
3951
case cpu_to_le32(DLM_MSG_CONVERT):
3952
case cpu_to_le32(DLM_MSG_UNLOCK):
3953
case cpu_to_le32(DLM_MSG_CANCEL):
3954
if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3955
error = -EINVAL;
3956
break;
3957
3958
case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3959
case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
3960
case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
3961
case cpu_to_le32(DLM_MSG_GRANT):
3962
case cpu_to_le32(DLM_MSG_BAST):
3963
if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3964
error = -EINVAL;
3965
break;
3966
3967
case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3968
if (!is_process_copy(lkb))
3969
error = -EINVAL;
3970
else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3971
error = -EINVAL;
3972
break;
3973
3974
default:
3975
error = -EINVAL;
3976
}
3977
3978
out:
3979
if (error)
3980
log_error(lkb->lkb_resource->res_ls,
3981
"ignore invalid message %d from %d %x %x %x %d",
3982
le32_to_cpu(ms->m_type), from, lkb->lkb_id,
3983
lkb->lkb_remid, dlm_iflags_val(lkb),
3984
lkb->lkb_nodeid);
3985
return error;
3986
}
3987
3988
static int receive_request(struct dlm_ls *ls, const struct dlm_message *ms)
3989
{
3990
struct dlm_lkb *lkb;
3991
struct dlm_rsb *r;
3992
int from_nodeid;
3993
int error, namelen = 0;
3994
3995
from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3996
3997
error = create_lkb(ls, &lkb);
3998
if (error)
3999
goto fail;
4000
4001
receive_flags(lkb, ms);
4002
set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
4003
error = receive_request_args(ls, lkb, ms);
4004
if (error) {
4005
__put_lkb(ls, lkb);
4006
goto fail;
4007
}
4008
4009
/* The dir node is the authority on whether we are the master
4010
for this rsb or not, so if the master sends us a request, we should
4011
recreate the rsb if we've destroyed it. This race happens when we
4012
send a remove message to the dir node at the same time that the dir
4013
node sends us a request for the rsb. */
4014
4015
namelen = receive_extralen(ms);
4016
4017
error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4018
R_RECEIVE_REQUEST, &r);
4019
if (error) {
4020
__put_lkb(ls, lkb);
4021
goto fail;
4022
}
4023
4024
lock_rsb(r);
4025
4026
if (r->res_master_nodeid != dlm_our_nodeid()) {
4027
error = validate_master_nodeid(ls, r, from_nodeid);
4028
if (error) {
4029
unlock_rsb(r);
4030
put_rsb(r);
4031
__put_lkb(ls, lkb);
4032
goto fail;
4033
}
4034
}
4035
4036
attach_lkb(r, lkb);
4037
error = do_request(r, lkb);
4038
send_request_reply(r, lkb, error);
4039
do_request_effects(r, lkb, error);
4040
4041
unlock_rsb(r);
4042
put_rsb(r);
4043
4044
if (error == -EINPROGRESS)
4045
error = 0;
4046
if (error)
4047
dlm_put_lkb(lkb);
4048
return 0;
4049
4050
fail:
4051
/* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
4052
and do this receive_request again from process_lookup_list once
4053
we get the lookup reply. This would avoid a many repeated
4054
ENOTBLK request failures when the lookup reply designating us
4055
as master is delayed. */
4056
4057
if (error != -ENOTBLK) {
4058
log_limit(ls, "receive_request %x from %d %d",
4059
le32_to_cpu(ms->m_lkid), from_nodeid, error);
4060
}
4061
4062
setup_local_lkb(ls, ms);
4063
send_request_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4064
return error;
4065
}
4066
4067
static int receive_convert(struct dlm_ls *ls, const struct dlm_message *ms)
4068
{
4069
struct dlm_lkb *lkb;
4070
struct dlm_rsb *r;
4071
int error, reply = 1;
4072
4073
error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4074
if (error)
4075
goto fail;
4076
4077
if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4078
log_error(ls, "receive_convert %x remid %x recover_seq %llu "
4079
"remote %d %x", lkb->lkb_id, lkb->lkb_remid,
4080
(unsigned long long)lkb->lkb_recover_seq,
4081
le32_to_cpu(ms->m_header.h_nodeid),
4082
le32_to_cpu(ms->m_lkid));
4083
error = -ENOENT;
4084
dlm_put_lkb(lkb);
4085
goto fail;
4086
}
4087
4088
r = lkb->lkb_resource;
4089
4090
hold_rsb(r);
4091
lock_rsb(r);
4092
4093
error = validate_message(lkb, ms);
4094
if (error)
4095
goto out;
4096
4097
receive_flags(lkb, ms);
4098
4099
error = receive_convert_args(ls, lkb, ms);
4100
if (error) {
4101
send_convert_reply(r, lkb, error);
4102
goto out;
4103
}
4104
4105
reply = !down_conversion(lkb);
4106
4107
error = do_convert(r, lkb);
4108
if (reply)
4109
send_convert_reply(r, lkb, error);
4110
do_convert_effects(r, lkb, error);
4111
out:
4112
unlock_rsb(r);
4113
put_rsb(r);
4114
dlm_put_lkb(lkb);
4115
return 0;
4116
4117
fail:
4118
setup_local_lkb(ls, ms);
4119
send_convert_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4120
return error;
4121
}
4122
4123
static int receive_unlock(struct dlm_ls *ls, const struct dlm_message *ms)
4124
{
4125
struct dlm_lkb *lkb;
4126
struct dlm_rsb *r;
4127
int error;
4128
4129
error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4130
if (error)
4131
goto fail;
4132
4133
if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4134
log_error(ls, "receive_unlock %x remid %x remote %d %x",
4135
lkb->lkb_id, lkb->lkb_remid,
4136
le32_to_cpu(ms->m_header.h_nodeid),
4137
le32_to_cpu(ms->m_lkid));
4138
error = -ENOENT;
4139
dlm_put_lkb(lkb);
4140
goto fail;
4141
}
4142
4143
r = lkb->lkb_resource;
4144
4145
hold_rsb(r);
4146
lock_rsb(r);
4147
4148
error = validate_message(lkb, ms);
4149
if (error)
4150
goto out;
4151
4152
receive_flags(lkb, ms);
4153
4154
error = receive_unlock_args(ls, lkb, ms);
4155
if (error) {
4156
send_unlock_reply(r, lkb, error);
4157
goto out;
4158
}
4159
4160
error = do_unlock(r, lkb);
4161
send_unlock_reply(r, lkb, error);
4162
do_unlock_effects(r, lkb, error);
4163
out:
4164
unlock_rsb(r);
4165
put_rsb(r);
4166
dlm_put_lkb(lkb);
4167
return 0;
4168
4169
fail:
4170
setup_local_lkb(ls, ms);
4171
send_unlock_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4172
return error;
4173
}
4174
4175
static int receive_cancel(struct dlm_ls *ls, const struct dlm_message *ms)
4176
{
4177
struct dlm_lkb *lkb;
4178
struct dlm_rsb *r;
4179
int error;
4180
4181
error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4182
if (error)
4183
goto fail;
4184
4185
receive_flags(lkb, ms);
4186
4187
r = lkb->lkb_resource;
4188
4189
hold_rsb(r);
4190
lock_rsb(r);
4191
4192
error = validate_message(lkb, ms);
4193
if (error)
4194
goto out;
4195
4196
error = do_cancel(r, lkb);
4197
send_cancel_reply(r, lkb, error);
4198
do_cancel_effects(r, lkb, error);
4199
out:
4200
unlock_rsb(r);
4201
put_rsb(r);
4202
dlm_put_lkb(lkb);
4203
return 0;
4204
4205
fail:
4206
setup_local_lkb(ls, ms);
4207
send_cancel_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4208
return error;
4209
}
4210
4211
static int receive_grant(struct dlm_ls *ls, const struct dlm_message *ms)
4212
{
4213
struct dlm_lkb *lkb;
4214
struct dlm_rsb *r;
4215
int error;
4216
4217
error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4218
if (error)
4219
return error;
4220
4221
r = lkb->lkb_resource;
4222
4223
hold_rsb(r);
4224
lock_rsb(r);
4225
4226
error = validate_message(lkb, ms);
4227
if (error)
4228
goto out;
4229
4230
receive_flags_reply(lkb, ms, false);
4231
if (is_altmode(lkb))
4232
munge_altmode(lkb, ms);
4233
grant_lock_pc(r, lkb, ms);
4234
queue_cast(r, lkb, 0);
4235
out:
4236
unlock_rsb(r);
4237
put_rsb(r);
4238
dlm_put_lkb(lkb);
4239
return 0;
4240
}
4241
4242
static int receive_bast(struct dlm_ls *ls, const struct dlm_message *ms)
4243
{
4244
struct dlm_lkb *lkb;
4245
struct dlm_rsb *r;
4246
int error;
4247
4248
error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4249
if (error)
4250
return error;
4251
4252
r = lkb->lkb_resource;
4253
4254
hold_rsb(r);
4255
lock_rsb(r);
4256
4257
error = validate_message(lkb, ms);
4258
if (error)
4259
goto out;
4260
4261
queue_bast(r, lkb, le32_to_cpu(ms->m_bastmode));
4262
lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode);
4263
out:
4264
unlock_rsb(r);
4265
put_rsb(r);
4266
dlm_put_lkb(lkb);
4267
return 0;
4268
}
4269
4270
static void receive_lookup(struct dlm_ls *ls, const struct dlm_message *ms)
4271
{
4272
int len, error, ret_nodeid, from_nodeid, our_nodeid;
4273
4274
from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4275
our_nodeid = dlm_our_nodeid();
4276
4277
len = receive_extralen(ms);
4278
4279
error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4280
&ret_nodeid, NULL);
4281
4282
/* Optimization: we're master so treat lookup as a request */
4283
if (!error && ret_nodeid == our_nodeid) {
4284
receive_request(ls, ms);
4285
return;
4286
}
4287
send_lookup_reply(ls, ms, ret_nodeid, error);
4288
}
4289
4290
static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
4291
{
4292
char name[DLM_RESNAME_MAXLEN+1];
4293
struct dlm_rsb *r;
4294
int rv, len, dir_nodeid, from_nodeid;
4295
4296
from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4297
4298
len = receive_extralen(ms);
4299
4300
if (len > DLM_RESNAME_MAXLEN) {
4301
log_error(ls, "receive_remove from %d bad len %d",
4302
from_nodeid, len);
4303
return;
4304
}
4305
4306
dir_nodeid = dlm_hash2nodeid(ls, le32_to_cpu(ms->m_hash));
4307
if (dir_nodeid != dlm_our_nodeid()) {
4308
log_error(ls, "receive_remove from %d bad nodeid %d",
4309
from_nodeid, dir_nodeid);
4310
return;
4311
}
4312
4313
/*
4314
* Look for inactive rsb, if it's there, free it.
4315
* If the rsb is active, it's being used, and we should ignore this
4316
* message. This is an expected race between the dir node sending a
4317
* request to the master node at the same time as the master node sends
4318
* a remove to the dir node. The resolution to that race is for the
4319
* dir node to ignore the remove message, and the master node to
4320
* recreate the master rsb when it gets a request from the dir node for
4321
* an rsb it doesn't have.
4322
*/
4323
4324
memset(name, 0, sizeof(name));
4325
memcpy(name, ms->m_extra, len);
4326
4327
rcu_read_lock();
4328
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
4329
if (rv) {
4330
rcu_read_unlock();
4331
/* should not happen */
4332
log_error(ls, "%s from %d not found %s", __func__,
4333
from_nodeid, name);
4334
return;
4335
}
4336
4337
write_lock_bh(&ls->ls_rsbtbl_lock);
4338
if (!rsb_flag(r, RSB_HASHED)) {
4339
rcu_read_unlock();
4340
write_unlock_bh(&ls->ls_rsbtbl_lock);
4341
/* should not happen */
4342
log_error(ls, "%s from %d got removed during removal %s",
4343
__func__, from_nodeid, name);
4344
return;
4345
}
4346
/* at this stage the rsb can only being freed here */
4347
rcu_read_unlock();
4348
4349
if (!rsb_flag(r, RSB_INACTIVE)) {
4350
if (r->res_master_nodeid != from_nodeid) {
4351
/* should not happen */
4352
log_error(ls, "receive_remove on active rsb from %d master %d",
4353
from_nodeid, r->res_master_nodeid);
4354
dlm_print_rsb(r);
4355
write_unlock_bh(&ls->ls_rsbtbl_lock);
4356
return;
4357
}
4358
4359
/* Ignore the remove message, see race comment above. */
4360
4361
log_debug(ls, "receive_remove from %d master %d first %x %s",
4362
from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4363
name);
4364
write_unlock_bh(&ls->ls_rsbtbl_lock);
4365
return;
4366
}
4367
4368
if (r->res_master_nodeid != from_nodeid) {
4369
log_error(ls, "receive_remove inactive from %d master %d",
4370
from_nodeid, r->res_master_nodeid);
4371
dlm_print_rsb(r);
4372
write_unlock_bh(&ls->ls_rsbtbl_lock);
4373
return;
4374
}
4375
4376
list_del(&r->res_slow_list);
4377
rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
4378
dlm_rhash_rsb_params);
4379
rsb_clear_flag(r, RSB_HASHED);
4380
write_unlock_bh(&ls->ls_rsbtbl_lock);
4381
4382
free_inactive_rsb(r);
4383
}
4384
4385
static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms)
4386
{
4387
do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid));
4388
}
4389
4390
static int receive_request_reply(struct dlm_ls *ls,
4391
const struct dlm_message *ms)
4392
{
4393
struct dlm_lkb *lkb;
4394
struct dlm_rsb *r;
4395
int error, mstype, result;
4396
int from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4397
4398
error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4399
if (error)
4400
return error;
4401
4402
r = lkb->lkb_resource;
4403
hold_rsb(r);
4404
lock_rsb(r);
4405
4406
error = validate_message(lkb, ms);
4407
if (error)
4408
goto out;
4409
4410
mstype = lkb->lkb_wait_type;
4411
error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4412
if (error) {
4413
log_error(ls, "receive_request_reply %x remote %d %x result %d",
4414
lkb->lkb_id, from_nodeid, le32_to_cpu(ms->m_lkid),
4415
from_dlm_errno(le32_to_cpu(ms->m_result)));
4416
dlm_dump_rsb(r);
4417
goto out;
4418
}
4419
4420
/* Optimization: the dir node was also the master, so it took our
4421
lookup as a request and sent request reply instead of lookup reply */
4422
if (mstype == DLM_MSG_LOOKUP) {
4423
r->res_master_nodeid = from_nodeid;
4424
r->res_nodeid = from_nodeid;
4425
lkb->lkb_nodeid = from_nodeid;
4426
}
4427
4428
/* this is the value returned from do_request() on the master */
4429
result = from_dlm_errno(le32_to_cpu(ms->m_result));
4430
4431
switch (result) {
4432
case -EAGAIN:
4433
/* request would block (be queued) on remote master */
4434
queue_cast(r, lkb, -EAGAIN);
4435
confirm_master(r, -EAGAIN);
4436
unhold_lkb(lkb); /* undoes create_lkb() */
4437
break;
4438
4439
case -EINPROGRESS:
4440
case 0:
4441
/* request was queued or granted on remote master */
4442
receive_flags_reply(lkb, ms, false);
4443
lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
4444
if (is_altmode(lkb))
4445
munge_altmode(lkb, ms);
4446
if (result) {
4447
add_lkb(r, lkb, DLM_LKSTS_WAITING);
4448
} else {
4449
grant_lock_pc(r, lkb, ms);
4450
queue_cast(r, lkb, 0);
4451
}
4452
confirm_master(r, result);
4453
break;
4454
4455
case -EBADR:
4456
case -ENOTBLK:
4457
/* find_rsb failed to find rsb or rsb wasn't master */
4458
log_limit(ls, "receive_request_reply %x from %d %d "
4459
"master %d dir %d first %x %s", lkb->lkb_id,
4460
from_nodeid, result, r->res_master_nodeid,
4461
r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4462
4463
if (r->res_dir_nodeid != dlm_our_nodeid() &&
4464
r->res_master_nodeid != dlm_our_nodeid()) {
4465
/* cause _request_lock->set_master->send_lookup */
4466
r->res_master_nodeid = 0;
4467
r->res_nodeid = -1;
4468
lkb->lkb_nodeid = -1;
4469
}
4470
4471
if (is_overlap(lkb)) {
4472
/* we'll ignore error in cancel/unlock reply */
4473
queue_cast_overlap(r, lkb);
4474
confirm_master(r, result);
4475
unhold_lkb(lkb); /* undoes create_lkb() */
4476
} else {
4477
_request_lock(r, lkb);
4478
4479
if (r->res_master_nodeid == dlm_our_nodeid())
4480
confirm_master(r, 0);
4481
}
4482
break;
4483
4484
default:
4485
log_error(ls, "receive_request_reply %x error %d",
4486
lkb->lkb_id, result);
4487
}
4488
4489
if ((result == 0 || result == -EINPROGRESS) &&
4490
test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
4491
log_debug(ls, "receive_request_reply %x result %d unlock",
4492
lkb->lkb_id, result);
4493
clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4494
send_unlock(r, lkb);
4495
} else if ((result == -EINPROGRESS) &&
4496
test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
4497
&lkb->lkb_iflags)) {
4498
log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4499
clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4500
send_cancel(r, lkb);
4501
} else {
4502
clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4503
clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4504
}
4505
out:
4506
unlock_rsb(r);
4507
put_rsb(r);
4508
dlm_put_lkb(lkb);
4509
return 0;
4510
}
4511
4512
static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4513
const struct dlm_message *ms, bool local)
4514
{
4515
/* this is the value returned from do_convert() on the master */
4516
switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4517
case -EAGAIN:
4518
/* convert would block (be queued) on remote master */
4519
queue_cast(r, lkb, -EAGAIN);
4520
break;
4521
4522
case -EDEADLK:
4523
receive_flags_reply(lkb, ms, local);
4524
revert_lock_pc(r, lkb);
4525
queue_cast(r, lkb, -EDEADLK);
4526
break;
4527
4528
case -EINPROGRESS:
4529
/* convert was queued on remote master */
4530
receive_flags_reply(lkb, ms, local);
4531
if (is_demoted(lkb))
4532
munge_demoted(lkb);
4533
del_lkb(r, lkb);
4534
add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4535
break;
4536
4537
case 0:
4538
/* convert was granted on remote master */
4539
receive_flags_reply(lkb, ms, local);
4540
if (is_demoted(lkb))
4541
munge_demoted(lkb);
4542
grant_lock_pc(r, lkb, ms);
4543
queue_cast(r, lkb, 0);
4544
break;
4545
4546
default:
4547
log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4548
lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4549
le32_to_cpu(ms->m_lkid),
4550
from_dlm_errno(le32_to_cpu(ms->m_result)));
4551
dlm_print_rsb(r);
4552
dlm_print_lkb(lkb);
4553
}
4554
}
4555
4556
static void _receive_convert_reply(struct dlm_lkb *lkb,
4557
const struct dlm_message *ms, bool local)
4558
{
4559
struct dlm_rsb *r = lkb->lkb_resource;
4560
int error;
4561
4562
hold_rsb(r);
4563
lock_rsb(r);
4564
4565
error = validate_message(lkb, ms);
4566
if (error)
4567
goto out;
4568
4569
error = remove_from_waiters_ms(lkb, ms, local);
4570
if (error)
4571
goto out;
4572
4573
__receive_convert_reply(r, lkb, ms, local);
4574
out:
4575
unlock_rsb(r);
4576
put_rsb(r);
4577
}
4578
4579
static int receive_convert_reply(struct dlm_ls *ls,
4580
const struct dlm_message *ms)
4581
{
4582
struct dlm_lkb *lkb;
4583
int error;
4584
4585
error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4586
if (error)
4587
return error;
4588
4589
_receive_convert_reply(lkb, ms, false);
4590
dlm_put_lkb(lkb);
4591
return 0;
4592
}
4593
4594
static void _receive_unlock_reply(struct dlm_lkb *lkb,
4595
const struct dlm_message *ms, bool local)
4596
{
4597
struct dlm_rsb *r = lkb->lkb_resource;
4598
int error;
4599
4600
hold_rsb(r);
4601
lock_rsb(r);
4602
4603
error = validate_message(lkb, ms);
4604
if (error)
4605
goto out;
4606
4607
error = remove_from_waiters_ms(lkb, ms, local);
4608
if (error)
4609
goto out;
4610
4611
/* this is the value returned from do_unlock() on the master */
4612
4613
switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4614
case -DLM_EUNLOCK:
4615
receive_flags_reply(lkb, ms, local);
4616
remove_lock_pc(r, lkb);
4617
queue_cast(r, lkb, -DLM_EUNLOCK);
4618
break;
4619
case -ENOENT:
4620
break;
4621
default:
4622
log_error(r->res_ls, "receive_unlock_reply %x error %d",
4623
lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result)));
4624
}
4625
out:
4626
unlock_rsb(r);
4627
put_rsb(r);
4628
}
4629
4630
static int receive_unlock_reply(struct dlm_ls *ls,
4631
const struct dlm_message *ms)
4632
{
4633
struct dlm_lkb *lkb;
4634
int error;
4635
4636
error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4637
if (error)
4638
return error;
4639
4640
_receive_unlock_reply(lkb, ms, false);
4641
dlm_put_lkb(lkb);
4642
return 0;
4643
}
4644
4645
static void _receive_cancel_reply(struct dlm_lkb *lkb,
4646
const struct dlm_message *ms, bool local)
4647
{
4648
struct dlm_rsb *r = lkb->lkb_resource;
4649
int error;
4650
4651
hold_rsb(r);
4652
lock_rsb(r);
4653
4654
error = validate_message(lkb, ms);
4655
if (error)
4656
goto out;
4657
4658
error = remove_from_waiters_ms(lkb, ms, local);
4659
if (error)
4660
goto out;
4661
4662
/* this is the value returned from do_cancel() on the master */
4663
4664
switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4665
case -DLM_ECANCEL:
4666
receive_flags_reply(lkb, ms, local);
4667
revert_lock_pc(r, lkb);
4668
queue_cast(r, lkb, -DLM_ECANCEL);
4669
break;
4670
case 0:
4671
break;
4672
default:
4673
log_error(r->res_ls, "receive_cancel_reply %x error %d",
4674
lkb->lkb_id,
4675
from_dlm_errno(le32_to_cpu(ms->m_result)));
4676
}
4677
out:
4678
unlock_rsb(r);
4679
put_rsb(r);
4680
}
4681
4682
static int receive_cancel_reply(struct dlm_ls *ls,
4683
const struct dlm_message *ms)
4684
{
4685
struct dlm_lkb *lkb;
4686
int error;
4687
4688
error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4689
if (error)
4690
return error;
4691
4692
_receive_cancel_reply(lkb, ms, false);
4693
dlm_put_lkb(lkb);
4694
return 0;
4695
}
4696
4697
static void receive_lookup_reply(struct dlm_ls *ls,
4698
const struct dlm_message *ms)
4699
{
4700
struct dlm_lkb *lkb;
4701
struct dlm_rsb *r;
4702
int error, ret_nodeid;
4703
int do_lookup_list = 0;
4704
4705
error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb);
4706
if (error) {
4707
log_error(ls, "%s no lkid %x", __func__,
4708
le32_to_cpu(ms->m_lkid));
4709
return;
4710
}
4711
4712
/* ms->m_result is the value returned by dlm_master_lookup on dir node
4713
FIXME: will a non-zero error ever be returned? */
4714
4715
r = lkb->lkb_resource;
4716
hold_rsb(r);
4717
lock_rsb(r);
4718
4719
error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4720
if (error)
4721
goto out;
4722
4723
ret_nodeid = le32_to_cpu(ms->m_nodeid);
4724
4725
/* We sometimes receive a request from the dir node for this
4726
rsb before we've received the dir node's loookup_reply for it.
4727
The request from the dir node implies we're the master, so we set
4728
ourself as master in receive_request_reply, and verify here that
4729
we are indeed the master. */
4730
4731
if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4732
/* This should never happen */
4733
log_error(ls, "receive_lookup_reply %x from %d ret %d "
4734
"master %d dir %d our %d first %x %s",
4735
lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4736
ret_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
4737
dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4738
}
4739
4740
if (ret_nodeid == dlm_our_nodeid()) {
4741
r->res_master_nodeid = ret_nodeid;
4742
r->res_nodeid = 0;
4743
do_lookup_list = 1;
4744
r->res_first_lkid = 0;
4745
} else if (ret_nodeid == -1) {
4746
/* the remote node doesn't believe it's the dir node */
4747
log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4748
lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid));
4749
r->res_master_nodeid = 0;
4750
r->res_nodeid = -1;
4751
lkb->lkb_nodeid = -1;
4752
} else {
4753
/* set_master() will set lkb_nodeid from r */
4754
r->res_master_nodeid = ret_nodeid;
4755
r->res_nodeid = ret_nodeid;
4756
}
4757
4758
if (is_overlap(lkb)) {
4759
log_debug(ls, "receive_lookup_reply %x unlock %x",
4760
lkb->lkb_id, dlm_iflags_val(lkb));
4761
queue_cast_overlap(r, lkb);
4762
unhold_lkb(lkb); /* undoes create_lkb() */
4763
goto out_list;
4764
}
4765
4766
_request_lock(r, lkb);
4767
4768
out_list:
4769
if (do_lookup_list)
4770
process_lookup_list(r);
4771
out:
4772
unlock_rsb(r);
4773
put_rsb(r);
4774
dlm_put_lkb(lkb);
4775
}
4776
4777
static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4778
uint32_t saved_seq)
4779
{
4780
int error = 0, noent = 0;
4781
4782
if (WARN_ON_ONCE(!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid)))) {
4783
log_limit(ls, "receive %d from non-member %d %x %x %d",
4784
le32_to_cpu(ms->m_type),
4785
le32_to_cpu(ms->m_header.h_nodeid),
4786
le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4787
from_dlm_errno(le32_to_cpu(ms->m_result)));
4788
return;
4789
}
4790
4791
switch (ms->m_type) {
4792
4793
/* messages sent to a master node */
4794
4795
case cpu_to_le32(DLM_MSG_REQUEST):
4796
error = receive_request(ls, ms);
4797
break;
4798
4799
case cpu_to_le32(DLM_MSG_CONVERT):
4800
error = receive_convert(ls, ms);
4801
break;
4802
4803
case cpu_to_le32(DLM_MSG_UNLOCK):
4804
error = receive_unlock(ls, ms);
4805
break;
4806
4807
case cpu_to_le32(DLM_MSG_CANCEL):
4808
noent = 1;
4809
error = receive_cancel(ls, ms);
4810
break;
4811
4812
/* messages sent from a master node (replies to above) */
4813
4814
case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
4815
error = receive_request_reply(ls, ms);
4816
break;
4817
4818
case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
4819
error = receive_convert_reply(ls, ms);
4820
break;
4821
4822
case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
4823
error = receive_unlock_reply(ls, ms);
4824
break;
4825
4826
case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
4827
error = receive_cancel_reply(ls, ms);
4828
break;
4829
4830
/* messages sent from a master node (only two types of async msg) */
4831
4832
case cpu_to_le32(DLM_MSG_GRANT):
4833
noent = 1;
4834
error = receive_grant(ls, ms);
4835
break;
4836
4837
case cpu_to_le32(DLM_MSG_BAST):
4838
noent = 1;
4839
error = receive_bast(ls, ms);
4840
break;
4841
4842
/* messages sent to a dir node */
4843
4844
case cpu_to_le32(DLM_MSG_LOOKUP):
4845
receive_lookup(ls, ms);
4846
break;
4847
4848
case cpu_to_le32(DLM_MSG_REMOVE):
4849
receive_remove(ls, ms);
4850
break;
4851
4852
/* messages sent from a dir node (remove has no reply) */
4853
4854
case cpu_to_le32(DLM_MSG_LOOKUP_REPLY):
4855
receive_lookup_reply(ls, ms);
4856
break;
4857
4858
/* other messages */
4859
4860
case cpu_to_le32(DLM_MSG_PURGE):
4861
receive_purge(ls, ms);
4862
break;
4863
4864
default:
4865
log_error(ls, "unknown message type %d",
4866
le32_to_cpu(ms->m_type));
4867
}
4868
4869
/*
4870
* When checking for ENOENT, we're checking the result of
4871
* find_lkb(m_remid):
4872
*
4873
* The lock id referenced in the message wasn't found. This may
4874
* happen in normal usage for the async messages and cancel, so
4875
* only use log_debug for them.
4876
*
4877
* Some errors are expected and normal.
4878
*/
4879
4880
if (error == -ENOENT && noent) {
4881
log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4882
le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4883
le32_to_cpu(ms->m_header.h_nodeid),
4884
le32_to_cpu(ms->m_lkid), saved_seq);
4885
} else if (error == -ENOENT) {
4886
log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4887
le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4888
le32_to_cpu(ms->m_header.h_nodeid),
4889
le32_to_cpu(ms->m_lkid), saved_seq);
4890
4891
if (ms->m_type == cpu_to_le32(DLM_MSG_CONVERT))
4892
dlm_dump_rsb_hash(ls, le32_to_cpu(ms->m_hash));
4893
}
4894
4895
if (error == -EINVAL) {
4896
log_error(ls, "receive %d inval from %d lkid %x remid %x "
4897
"saved_seq %u",
4898
le32_to_cpu(ms->m_type),
4899
le32_to_cpu(ms->m_header.h_nodeid),
4900
le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4901
saved_seq);
4902
}
4903
}
4904
4905
/* If the lockspace is in recovery mode (locking stopped), then normal
4906
messages are saved on the requestqueue for processing after recovery is
4907
done. When not in recovery mode, we wait for dlm_recoverd to drain saved
4908
messages off the requestqueue before we process new ones. This occurs right
4909
after recovery completes when we transition from saving all messages on
4910
requestqueue, to processing all the saved messages, to processing new
4911
messages as they arrive. */
4912
4913
static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4914
int nodeid)
4915
{
4916
try_again:
4917
read_lock_bh(&ls->ls_requestqueue_lock);
4918
if (test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
4919
/* If we were a member of this lockspace, left, and rejoined,
4920
other nodes may still be sending us messages from the
4921
lockspace generation before we left. */
4922
if (WARN_ON_ONCE(!ls->ls_generation)) {
4923
read_unlock_bh(&ls->ls_requestqueue_lock);
4924
log_limit(ls, "receive %d from %d ignore old gen",
4925
le32_to_cpu(ms->m_type), nodeid);
4926
return;
4927
}
4928
4929
read_unlock_bh(&ls->ls_requestqueue_lock);
4930
write_lock_bh(&ls->ls_requestqueue_lock);
4931
/* recheck because we hold writelock now */
4932
if (!test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
4933
write_unlock_bh(&ls->ls_requestqueue_lock);
4934
goto try_again;
4935
}
4936
4937
dlm_add_requestqueue(ls, nodeid, ms);
4938
write_unlock_bh(&ls->ls_requestqueue_lock);
4939
} else {
4940
_receive_message(ls, ms, 0);
4941
read_unlock_bh(&ls->ls_requestqueue_lock);
4942
}
4943
}
4944
4945
/* This is called by dlm_recoverd to process messages that were saved on
4946
the requestqueue. */
4947
4948
void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms,
4949
uint32_t saved_seq)
4950
{
4951
_receive_message(ls, ms, saved_seq);
4952
}
4953
4954
/* This is called by the midcomms layer when something is received for
4955
the lockspace. It could be either a MSG (normal message sent as part of
4956
standard locking activity) or an RCOM (recovery message sent as part of
4957
lockspace recovery). */
4958
4959
void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
4960
{
4961
const struct dlm_header *hd = &p->header;
4962
struct dlm_ls *ls;
4963
int type = 0;
4964
4965
switch (hd->h_cmd) {
4966
case DLM_MSG:
4967
type = le32_to_cpu(p->message.m_type);
4968
break;
4969
case DLM_RCOM:
4970
type = le32_to_cpu(p->rcom.rc_type);
4971
break;
4972
default:
4973
log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4974
return;
4975
}
4976
4977
if (le32_to_cpu(hd->h_nodeid) != nodeid) {
4978
log_print("invalid h_nodeid %d from %d lockspace %x",
4979
le32_to_cpu(hd->h_nodeid), nodeid,
4980
le32_to_cpu(hd->u.h_lockspace));
4981
return;
4982
}
4983
4984
ls = dlm_find_lockspace_global(le32_to_cpu(hd->u.h_lockspace));
4985
if (!ls) {
4986
if (dlm_config.ci_log_debug) {
4987
printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
4988
"%u from %d cmd %d type %d\n",
4989
le32_to_cpu(hd->u.h_lockspace), nodeid,
4990
hd->h_cmd, type);
4991
}
4992
4993
if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4994
dlm_send_ls_not_ready(nodeid, &p->rcom);
4995
return;
4996
}
4997
4998
/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4999
be inactive (in this ls) before transitioning to recovery mode */
5000
5001
read_lock_bh(&ls->ls_recv_active);
5002
if (hd->h_cmd == DLM_MSG)
5003
dlm_receive_message(ls, &p->message, nodeid);
5004
else if (hd->h_cmd == DLM_RCOM)
5005
dlm_receive_rcom(ls, &p->rcom, nodeid);
5006
else
5007
log_error(ls, "invalid h_cmd %d from %d lockspace %x",
5008
hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace));
5009
read_unlock_bh(&ls->ls_recv_active);
5010
5011
dlm_put_lockspace(ls);
5012
}
5013
5014
static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
5015
struct dlm_message *ms_local)
5016
{
5017
if (middle_conversion(lkb)) {
5018
log_rinfo(ls, "%s %x middle convert in progress", __func__,
5019
lkb->lkb_id);
5020
5021
/* We sent this lock to the new master. The new master will
5022
* tell us when it's granted. We no longer need a reply, so
5023
* use a fake reply to put the lkb into the right state.
5024
*/
5025
hold_lkb(lkb);
5026
memset(ms_local, 0, sizeof(struct dlm_message));
5027
ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
5028
ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
5029
ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5030
_receive_convert_reply(lkb, ms_local, true);
5031
unhold_lkb(lkb);
5032
5033
} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
5034
set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5035
}
5036
5037
/* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
5038
conversions are async; there's no reply from the remote master */
5039
}
5040
5041
/* A waiting lkb needs recovery if the master node has failed, or
5042
the master node is changing (only when no directory is used) */
5043
5044
static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
5045
int dir_nodeid)
5046
{
5047
if (dlm_no_directory(ls))
5048
return 1;
5049
5050
if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
5051
return 1;
5052
5053
return 0;
5054
}
5055
5056
/* Recovery for locks that are waiting for replies from nodes that are now
5057
gone. We can just complete unlocks and cancels by faking a reply from the
5058
dead node. Requests and up-conversions we flag to be resent after
5059
recovery. Down-conversions can just be completed with a fake reply like
5060
unlocks. Conversions between PR and CW need special attention. */
5061
5062
void dlm_recover_waiters_pre(struct dlm_ls *ls)
5063
{
5064
struct dlm_lkb *lkb, *safe;
5065
struct dlm_message *ms_local;
5066
int wait_type, local_unlock_result, local_cancel_result;
5067
int dir_nodeid;
5068
5069
ms_local = kmalloc(sizeof(*ms_local), GFP_KERNEL);
5070
if (!ms_local)
5071
return;
5072
5073
list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
5074
5075
dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
5076
5077
/* exclude debug messages about unlocks because there can be so
5078
many and they aren't very interesting */
5079
5080
if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
5081
log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5082
"lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
5083
lkb->lkb_id,
5084
lkb->lkb_remid,
5085
lkb->lkb_wait_type,
5086
lkb->lkb_resource->res_nodeid,
5087
lkb->lkb_nodeid,
5088
lkb->lkb_wait_nodeid,
5089
dir_nodeid);
5090
}
5091
5092
/* all outstanding lookups, regardless of destination will be
5093
resent after recovery is done */
5094
5095
if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
5096
set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5097
continue;
5098
}
5099
5100
if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
5101
continue;
5102
5103
wait_type = lkb->lkb_wait_type;
5104
local_unlock_result = -DLM_EUNLOCK;
5105
local_cancel_result = -DLM_ECANCEL;
5106
5107
/* Main reply may have been received leaving a zero wait_type,
5108
but a reply for the overlapping op may not have been
5109
received. In that case we need to fake the appropriate
5110
reply for the overlap op. */
5111
5112
if (!wait_type) {
5113
if (is_overlap_cancel(lkb)) {
5114
wait_type = DLM_MSG_CANCEL;
5115
if (lkb->lkb_grmode == DLM_LOCK_IV)
5116
local_cancel_result = 0;
5117
}
5118
if (is_overlap_unlock(lkb)) {
5119
wait_type = DLM_MSG_UNLOCK;
5120
if (lkb->lkb_grmode == DLM_LOCK_IV)
5121
local_unlock_result = -ENOENT;
5122
}
5123
5124
log_debug(ls, "rwpre overlap %x %x %d %d %d",
5125
lkb->lkb_id, dlm_iflags_val(lkb), wait_type,
5126
local_cancel_result, local_unlock_result);
5127
}
5128
5129
switch (wait_type) {
5130
5131
case DLM_MSG_REQUEST:
5132
set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5133
break;
5134
5135
case DLM_MSG_CONVERT:
5136
recover_convert_waiter(ls, lkb, ms_local);
5137
break;
5138
5139
case DLM_MSG_UNLOCK:
5140
hold_lkb(lkb);
5141
memset(ms_local, 0, sizeof(struct dlm_message));
5142
ms_local->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
5143
ms_local->m_result = cpu_to_le32(to_dlm_errno(local_unlock_result));
5144
ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5145
_receive_unlock_reply(lkb, ms_local, true);
5146
dlm_put_lkb(lkb);
5147
break;
5148
5149
case DLM_MSG_CANCEL:
5150
hold_lkb(lkb);
5151
memset(ms_local, 0, sizeof(struct dlm_message));
5152
ms_local->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
5153
ms_local->m_result = cpu_to_le32(to_dlm_errno(local_cancel_result));
5154
ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5155
_receive_cancel_reply(lkb, ms_local, true);
5156
dlm_put_lkb(lkb);
5157
break;
5158
5159
default:
5160
log_error(ls, "invalid lkb wait_type %d %d",
5161
lkb->lkb_wait_type, wait_type);
5162
}
5163
schedule();
5164
}
5165
kfree(ms_local);
5166
}
5167
5168
static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5169
{
5170
struct dlm_lkb *lkb = NULL, *iter;
5171
5172
spin_lock_bh(&ls->ls_waiters_lock);
5173
list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
5174
if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) {
5175
hold_lkb(iter);
5176
lkb = iter;
5177
break;
5178
}
5179
}
5180
spin_unlock_bh(&ls->ls_waiters_lock);
5181
5182
return lkb;
5183
}
5184
5185
/*
5186
* Forced state reset for locks that were in the middle of remote operations
5187
* when recovery happened (i.e. lkbs that were on the waiters list, waiting
5188
* for a reply from a remote operation.) The lkbs remaining on the waiters
5189
* list need to be reevaluated; some may need resending to a different node
5190
* than previously, and some may now need local handling rather than remote.
5191
*
5192
* First, the lkb state for the voided remote operation is forcibly reset,
5193
* equivalent to what remove_from_waiters() would normally do:
5194
* . lkb removed from ls_waiters list
5195
* . lkb wait_type cleared
5196
* . lkb waiters_count cleared
5197
* . lkb ref count decremented for each waiters_count (almost always 1,
5198
* but possibly 2 in case of cancel/unlock overlapping, which means
5199
* two remote replies were being expected for the lkb.)
5200
*
5201
* Second, the lkb is reprocessed like an original operation would be,
5202
* by passing it to _request_lock or _convert_lock, which will either
5203
* process the lkb operation locally, or send it to a remote node again
5204
* and put the lkb back onto the waiters list.
5205
*
5206
* When reprocessing the lkb, we may find that it's flagged for an overlapping
5207
* force-unlock or cancel, either from before recovery began, or after recovery
5208
* finished. If this is the case, the unlock/cancel is done directly, and the
5209
* original operation is not initiated again (no _request_lock/_convert_lock.)
5210
*/
5211
5212
int dlm_recover_waiters_post(struct dlm_ls *ls)
5213
{
5214
struct dlm_lkb *lkb;
5215
struct dlm_rsb *r;
5216
int error = 0, mstype, err, oc, ou;
5217
5218
while (1) {
5219
if (dlm_locking_stopped(ls)) {
5220
log_debug(ls, "recover_waiters_post aborted");
5221
error = -EINTR;
5222
break;
5223
}
5224
5225
/*
5226
* Find an lkb from the waiters list that's been affected by
5227
* recovery node changes, and needs to be reprocessed. Does
5228
* hold_lkb(), adding a refcount.
5229
*/
5230
lkb = find_resend_waiter(ls);
5231
if (!lkb)
5232
break;
5233
5234
r = lkb->lkb_resource;
5235
hold_rsb(r);
5236
lock_rsb(r);
5237
5238
/*
5239
* If the lkb has been flagged for a force unlock or cancel,
5240
* then the reprocessing below will be replaced by just doing
5241
* the unlock/cancel directly.
5242
*/
5243
mstype = lkb->lkb_wait_type;
5244
oc = test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
5245
&lkb->lkb_iflags);
5246
ou = test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT,
5247
&lkb->lkb_iflags);
5248
err = 0;
5249
5250
log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5251
"lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5252
"overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5253
r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5254
dlm_dir_nodeid(r), oc, ou);
5255
5256
/*
5257
* No reply to the pre-recovery operation will now be received,
5258
* so a forced equivalent of remove_from_waiters() is needed to
5259
* reset the waiters state that was in place before recovery.
5260
*/
5261
5262
clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5263
5264
/* Forcibly clear wait_type */
5265
lkb->lkb_wait_type = 0;
5266
5267
/*
5268
* Forcibly reset wait_count and associated refcount. The
5269
* wait_count will almost always be 1, but in case of an
5270
* overlapping unlock/cancel it could be 2: see where
5271
* add_to_waiters() finds the lkb is already on the waiters
5272
* list and does lkb_wait_count++; hold_lkb().
5273
*/
5274
while (lkb->lkb_wait_count) {
5275
lkb->lkb_wait_count--;
5276
unhold_lkb(lkb);
5277
}
5278
5279
/* Forcibly remove from waiters list */
5280
spin_lock_bh(&ls->ls_waiters_lock);
5281
list_del_init(&lkb->lkb_wait_reply);
5282
spin_unlock_bh(&ls->ls_waiters_lock);
5283
5284
/*
5285
* The lkb is now clear of all prior waiters state and can be
5286
* processed locally, or sent to remote node again, or directly
5287
* cancelled/unlocked.
5288
*/
5289
5290
if (oc || ou) {
5291
/* do an unlock or cancel instead of resending */
5292
switch (mstype) {
5293
case DLM_MSG_LOOKUP:
5294
case DLM_MSG_REQUEST:
5295
queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5296
-DLM_ECANCEL);
5297
unhold_lkb(lkb); /* undoes create_lkb() */
5298
break;
5299
case DLM_MSG_CONVERT:
5300
if (oc) {
5301
queue_cast(r, lkb, -DLM_ECANCEL);
5302
} else {
5303
lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5304
_unlock_lock(r, lkb);
5305
}
5306
break;
5307
default:
5308
err = 1;
5309
}
5310
} else {
5311
switch (mstype) {
5312
case DLM_MSG_LOOKUP:
5313
case DLM_MSG_REQUEST:
5314
_request_lock(r, lkb);
5315
if (r->res_nodeid != -1 && is_master(r))
5316
confirm_master(r, 0);
5317
break;
5318
case DLM_MSG_CONVERT:
5319
_convert_lock(r, lkb);
5320
break;
5321
default:
5322
err = 1;
5323
}
5324
}
5325
5326
if (err) {
5327
log_error(ls, "waiter %x msg %d r_nodeid %d "
5328
"dir_nodeid %d overlap %d %d",
5329
lkb->lkb_id, mstype, r->res_nodeid,
5330
dlm_dir_nodeid(r), oc, ou);
5331
}
5332
unlock_rsb(r);
5333
put_rsb(r);
5334
dlm_put_lkb(lkb);
5335
}
5336
5337
return error;
5338
}
5339
5340
static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5341
struct list_head *list)
5342
{
5343
struct dlm_lkb *lkb, *safe;
5344
5345
list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5346
if (!is_master_copy(lkb))
5347
continue;
5348
5349
/* don't purge lkbs we've added in recover_master_copy for
5350
the current recovery seq */
5351
5352
if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5353
continue;
5354
5355
del_lkb(r, lkb);
5356
5357
/* this put should free the lkb */
5358
if (!dlm_put_lkb(lkb))
5359
log_error(ls, "purged mstcpy lkb not released");
5360
}
5361
}
5362
5363
void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5364
{
5365
struct dlm_ls *ls = r->res_ls;
5366
5367
purge_mstcpy_list(ls, r, &r->res_grantqueue);
5368
purge_mstcpy_list(ls, r, &r->res_convertqueue);
5369
purge_mstcpy_list(ls, r, &r->res_waitqueue);
5370
}
5371
5372
static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5373
struct list_head *list,
5374
int nodeid_gone, unsigned int *count)
5375
{
5376
struct dlm_lkb *lkb, *safe;
5377
5378
list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5379
if (!is_master_copy(lkb))
5380
continue;
5381
5382
if ((lkb->lkb_nodeid == nodeid_gone) ||
5383
dlm_is_removed(ls, lkb->lkb_nodeid)) {
5384
5385
/* tell recover_lvb to invalidate the lvb
5386
because a node holding EX/PW failed */
5387
if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5388
(lkb->lkb_grmode >= DLM_LOCK_PW)) {
5389
rsb_set_flag(r, RSB_RECOVER_LVB_INVAL);
5390
}
5391
5392
del_lkb(r, lkb);
5393
5394
/* this put should free the lkb */
5395
if (!dlm_put_lkb(lkb))
5396
log_error(ls, "purged dead lkb not released");
5397
5398
rsb_set_flag(r, RSB_RECOVER_GRANT);
5399
5400
(*count)++;
5401
}
5402
}
5403
}
5404
5405
/* Get rid of locks held by nodes that are gone. */
5406
5407
void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list)
5408
{
5409
struct dlm_rsb *r;
5410
struct dlm_member *memb;
5411
int nodes_count = 0;
5412
int nodeid_gone = 0;
5413
unsigned int lkb_count = 0;
5414
5415
/* cache one removed nodeid to optimize the common
5416
case of a single node removed */
5417
5418
list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5419
nodes_count++;
5420
nodeid_gone = memb->nodeid;
5421
}
5422
5423
if (!nodes_count)
5424
return;
5425
5426
list_for_each_entry(r, root_list, res_root_list) {
5427
lock_rsb(r);
5428
if (r->res_nodeid != -1 && is_master(r)) {
5429
purge_dead_list(ls, r, &r->res_grantqueue,
5430
nodeid_gone, &lkb_count);
5431
purge_dead_list(ls, r, &r->res_convertqueue,
5432
nodeid_gone, &lkb_count);
5433
purge_dead_list(ls, r, &r->res_waitqueue,
5434
nodeid_gone, &lkb_count);
5435
}
5436
unlock_rsb(r);
5437
5438
cond_resched();
5439
}
5440
5441
if (lkb_count)
5442
log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
5443
lkb_count, nodes_count);
5444
}
5445
5446
static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls)
5447
{
5448
struct dlm_rsb *r;
5449
5450
read_lock_bh(&ls->ls_rsbtbl_lock);
5451
list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
5452
if (!rsb_flag(r, RSB_RECOVER_GRANT))
5453
continue;
5454
if (!is_master(r)) {
5455
rsb_clear_flag(r, RSB_RECOVER_GRANT);
5456
continue;
5457
}
5458
hold_rsb(r);
5459
read_unlock_bh(&ls->ls_rsbtbl_lock);
5460
return r;
5461
}
5462
read_unlock_bh(&ls->ls_rsbtbl_lock);
5463
return NULL;
5464
}
5465
5466
/*
5467
* Attempt to grant locks on resources that we are the master of.
5468
* Locks may have become grantable during recovery because locks
5469
* from departed nodes have been purged (or not rebuilt), allowing
5470
* previously blocked locks to now be granted. The subset of rsb's
5471
* we are interested in are those with lkb's on either the convert or
5472
* waiting queues.
5473
*
5474
* Simplest would be to go through each master rsb and check for non-empty
5475
* convert or waiting queues, and attempt to grant on those rsbs.
5476
* Checking the queues requires lock_rsb, though, for which we'd need
5477
* to release the rsbtbl lock. This would make iterating through all
5478
* rsb's very inefficient. So, we rely on earlier recovery routines
5479
* to set RECOVER_GRANT on any rsb's that we should attempt to grant
5480
* locks for.
5481
*/
5482
5483
void dlm_recover_grant(struct dlm_ls *ls)
5484
{
5485
struct dlm_rsb *r;
5486
unsigned int count = 0;
5487
unsigned int rsb_count = 0;
5488
unsigned int lkb_count = 0;
5489
5490
while (1) {
5491
r = find_grant_rsb(ls);
5492
if (!r)
5493
break;
5494
5495
rsb_count++;
5496
count = 0;
5497
lock_rsb(r);
5498
/* the RECOVER_GRANT flag is checked in the grant path */
5499
grant_pending_locks(r, &count);
5500
rsb_clear_flag(r, RSB_RECOVER_GRANT);
5501
lkb_count += count;
5502
confirm_master(r, 0);
5503
unlock_rsb(r);
5504
put_rsb(r);
5505
cond_resched();
5506
}
5507
5508
if (lkb_count)
5509
log_rinfo(ls, "dlm_recover_grant %u locks on %u resources",
5510
lkb_count, rsb_count);
5511
}
5512
5513
static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5514
uint32_t remid)
5515
{
5516
struct dlm_lkb *lkb;
5517
5518
list_for_each_entry(lkb, head, lkb_statequeue) {
5519
if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5520
return lkb;
5521
}
5522
return NULL;
5523
}
5524
5525
static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5526
uint32_t remid)
5527
{
5528
struct dlm_lkb *lkb;
5529
5530
lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5531
if (lkb)
5532
return lkb;
5533
lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5534
if (lkb)
5535
return lkb;
5536
lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5537
if (lkb)
5538
return lkb;
5539
return NULL;
5540
}
5541
5542
/* needs at least dlm_rcom + rcom_lock */
5543
static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5544
struct dlm_rsb *r, const struct dlm_rcom *rc)
5545
{
5546
struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5547
5548
lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5549
lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5550
lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5551
lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5552
dlm_set_dflags_val(lkb, le32_to_cpu(rl->rl_flags));
5553
set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
5554
lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5555
lkb->lkb_rqmode = rl->rl_rqmode;
5556
lkb->lkb_grmode = rl->rl_grmode;
5557
/* don't set lkb_status because add_lkb wants to itself */
5558
5559
lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5560
lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5561
5562
if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5563
int lvblen = le16_to_cpu(rc->rc_header.h_length) -
5564
sizeof(struct dlm_rcom) - sizeof(struct rcom_lock);
5565
if (lvblen > ls->ls_lvblen)
5566
return -EINVAL;
5567
lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5568
if (!lkb->lkb_lvbptr)
5569
return -ENOMEM;
5570
memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5571
}
5572
5573
/* Conversions between PR and CW (middle modes) need special handling.
5574
The real granted mode of these converting locks cannot be determined
5575
until all locks have been rebuilt on the rsb (recover_conversion) */
5576
5577
if (rl->rl_status == DLM_LKSTS_CONVERT && middle_conversion(lkb)) {
5578
/* We may need to adjust grmode depending on other granted locks. */
5579
log_limit(ls, "%s %x middle convert gr %d rq %d remote %d %x",
5580
__func__, lkb->lkb_id, lkb->lkb_grmode,
5581
lkb->lkb_rqmode, lkb->lkb_nodeid, lkb->lkb_remid);
5582
rsb_set_flag(r, RSB_RECOVER_CONVERT);
5583
}
5584
5585
return 0;
5586
}
5587
5588
/* This lkb may have been recovered in a previous aborted recovery so we need
5589
to check if the rsb already has an lkb with the given remote nodeid/lkid.
5590
If so we just send back a standard reply. If not, we create a new lkb with
5591
the given values and send back our lkid. We send back our lkid by sending
5592
back the rcom_lock struct we got but with the remid field filled in. */
5593
5594
/* needs at least dlm_rcom + rcom_lock */
5595
int dlm_recover_master_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5596
__le32 *rl_remid, __le32 *rl_result)
5597
{
5598
struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5599
struct dlm_rsb *r;
5600
struct dlm_lkb *lkb;
5601
uint32_t remid = 0;
5602
int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5603
int error;
5604
5605
/* init rl_remid with rcom lock rl_remid */
5606
*rl_remid = rl->rl_remid;
5607
5608
if (rl->rl_parent_lkid) {
5609
error = -EOPNOTSUPP;
5610
goto out;
5611
}
5612
5613
remid = le32_to_cpu(rl->rl_lkid);
5614
5615
/* In general we expect the rsb returned to be R_MASTER, but we don't
5616
have to require it. Recovery of masters on one node can overlap
5617
recovery of locks on another node, so one node can send us MSTCPY
5618
locks before we've made ourselves master of this rsb. We can still
5619
add new MSTCPY locks that we receive here without any harm; when
5620
we make ourselves master, dlm_recover_masters() won't touch the
5621
MSTCPY locks we've received early. */
5622
5623
error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5624
from_nodeid, R_RECEIVE_RECOVER, &r);
5625
if (error)
5626
goto out;
5627
5628
lock_rsb(r);
5629
5630
if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5631
log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5632
from_nodeid, remid);
5633
error = -EBADR;
5634
goto out_unlock;
5635
}
5636
5637
lkb = search_remid(r, from_nodeid, remid);
5638
if (lkb) {
5639
error = -EEXIST;
5640
goto out_remid;
5641
}
5642
5643
error = create_lkb(ls, &lkb);
5644
if (error)
5645
goto out_unlock;
5646
5647
error = receive_rcom_lock_args(ls, lkb, r, rc);
5648
if (error) {
5649
__put_lkb(ls, lkb);
5650
goto out_unlock;
5651
}
5652
5653
attach_lkb(r, lkb);
5654
add_lkb(r, lkb, rl->rl_status);
5655
ls->ls_recover_locks_in++;
5656
5657
if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5658
rsb_set_flag(r, RSB_RECOVER_GRANT);
5659
5660
out_remid:
5661
/* this is the new value returned to the lock holder for
5662
saving in its process-copy lkb */
5663
*rl_remid = cpu_to_le32(lkb->lkb_id);
5664
5665
lkb->lkb_recover_seq = ls->ls_recover_seq;
5666
5667
out_unlock:
5668
unlock_rsb(r);
5669
put_rsb(r);
5670
out:
5671
if (error && error != -EEXIST)
5672
log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
5673
from_nodeid, remid, error);
5674
*rl_result = cpu_to_le32(error);
5675
return error;
5676
}
5677
5678
/* needs at least dlm_rcom + rcom_lock */
5679
int dlm_recover_process_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5680
uint64_t seq)
5681
{
5682
struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5683
struct dlm_rsb *r;
5684
struct dlm_lkb *lkb;
5685
uint32_t lkid, remid;
5686
int error, result;
5687
5688
lkid = le32_to_cpu(rl->rl_lkid);
5689
remid = le32_to_cpu(rl->rl_remid);
5690
result = le32_to_cpu(rl->rl_result);
5691
5692
error = find_lkb(ls, lkid, &lkb);
5693
if (error) {
5694
log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5695
lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5696
result);
5697
return error;
5698
}
5699
5700
r = lkb->lkb_resource;
5701
hold_rsb(r);
5702
lock_rsb(r);
5703
5704
if (!is_process_copy(lkb)) {
5705
log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5706
lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5707
result);
5708
dlm_dump_rsb(r);
5709
unlock_rsb(r);
5710
put_rsb(r);
5711
dlm_put_lkb(lkb);
5712
return -EINVAL;
5713
}
5714
5715
switch (result) {
5716
case -EBADR:
5717
/* There's a chance the new master received our lock before
5718
dlm_recover_master_reply(), this wouldn't happen if we did
5719
a barrier between recover_masters and recover_locks. */
5720
5721
log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5722
lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5723
result);
5724
5725
dlm_send_rcom_lock(r, lkb, seq);
5726
goto out;
5727
case -EEXIST:
5728
case 0:
5729
lkb->lkb_remid = remid;
5730
break;
5731
default:
5732
log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5733
lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5734
result);
5735
}
5736
5737
/* an ack for dlm_recover_locks() which waits for replies from
5738
all the locks it sends to new masters */
5739
dlm_recovered_lock(r);
5740
out:
5741
unlock_rsb(r);
5742
put_rsb(r);
5743
dlm_put_lkb(lkb);
5744
5745
return 0;
5746
}
5747
5748
int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5749
int mode, uint32_t flags, void *name, unsigned int namelen)
5750
{
5751
struct dlm_lkb *lkb;
5752
struct dlm_args args;
5753
bool do_put = true;
5754
int error;
5755
5756
dlm_lock_recovery(ls);
5757
5758
error = create_lkb(ls, &lkb);
5759
if (error) {
5760
kfree(ua);
5761
goto out;
5762
}
5763
5764
trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
5765
5766
if (flags & DLM_LKF_VALBLK) {
5767
ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5768
if (!ua->lksb.sb_lvbptr) {
5769
kfree(ua);
5770
error = -ENOMEM;
5771
goto out_put;
5772
}
5773
}
5774
error = set_lock_args(mode, &ua->lksb, flags, namelen, fake_astfn, ua,
5775
fake_bastfn, &args);
5776
if (error) {
5777
kfree(ua->lksb.sb_lvbptr);
5778
ua->lksb.sb_lvbptr = NULL;
5779
kfree(ua);
5780
goto out_put;
5781
}
5782
5783
/* After ua is attached to lkb it will be freed by dlm_free_lkb().
5784
When DLM_DFL_USER_BIT is set, the dlm knows that this is a userspace
5785
lock and that lkb_astparam is the dlm_user_args structure. */
5786
set_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags);
5787
error = request_lock(ls, lkb, name, namelen, &args);
5788
5789
switch (error) {
5790
case 0:
5791
break;
5792
case -EINPROGRESS:
5793
error = 0;
5794
break;
5795
case -EAGAIN:
5796
error = 0;
5797
fallthrough;
5798
default:
5799
goto out_put;
5800
}
5801
5802
/* add this new lkb to the per-process list of locks */
5803
spin_lock_bh(&ua->proc->locks_spin);
5804
hold_lkb(lkb);
5805
list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5806
spin_unlock_bh(&ua->proc->locks_spin);
5807
do_put = false;
5808
out_put:
5809
trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false);
5810
if (do_put)
5811
__put_lkb(ls, lkb);
5812
out:
5813
dlm_unlock_recovery(ls);
5814
return error;
5815
}
5816
5817
int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5818
int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
5819
{
5820
struct dlm_lkb *lkb;
5821
struct dlm_args args;
5822
struct dlm_user_args *ua;
5823
int error;
5824
5825
dlm_lock_recovery(ls);
5826
5827
error = find_lkb(ls, lkid, &lkb);
5828
if (error)
5829
goto out;
5830
5831
trace_dlm_lock_start(ls, lkb, NULL, 0, mode, flags);
5832
5833
/* user can change the params on its lock when it converts it, or
5834
add an lvb that didn't exist before */
5835
5836
ua = lkb->lkb_ua;
5837
5838
if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5839
ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5840
if (!ua->lksb.sb_lvbptr) {
5841
error = -ENOMEM;
5842
goto out_put;
5843
}
5844
}
5845
if (lvb_in && ua->lksb.sb_lvbptr)
5846
memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5847
5848
ua->xid = ua_tmp->xid;
5849
ua->castparam = ua_tmp->castparam;
5850
ua->castaddr = ua_tmp->castaddr;
5851
ua->bastparam = ua_tmp->bastparam;
5852
ua->bastaddr = ua_tmp->bastaddr;
5853
ua->user_lksb = ua_tmp->user_lksb;
5854
5855
error = set_lock_args(mode, &ua->lksb, flags, 0, fake_astfn, ua,
5856
fake_bastfn, &args);
5857
if (error)
5858
goto out_put;
5859
5860
error = convert_lock(ls, lkb, &args);
5861
5862
if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5863
error = 0;
5864
out_put:
5865
trace_dlm_lock_end(ls, lkb, NULL, 0, mode, flags, error, false);
5866
dlm_put_lkb(lkb);
5867
out:
5868
dlm_unlock_recovery(ls);
5869
kfree(ua_tmp);
5870
return error;
5871
}
5872
5873
/*
5874
* The caller asks for an orphan lock on a given resource with a given mode.
5875
* If a matching lock exists, it's moved to the owner's list of locks and
5876
* the lkid is returned.
5877
*/
5878
5879
int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5880
int mode, uint32_t flags, void *name, unsigned int namelen,
5881
uint32_t *lkid)
5882
{
5883
struct dlm_lkb *lkb = NULL, *iter;
5884
struct dlm_user_args *ua;
5885
int found_other_mode = 0;
5886
int rv = 0;
5887
5888
spin_lock_bh(&ls->ls_orphans_lock);
5889
list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) {
5890
if (iter->lkb_resource->res_length != namelen)
5891
continue;
5892
if (memcmp(iter->lkb_resource->res_name, name, namelen))
5893
continue;
5894
if (iter->lkb_grmode != mode) {
5895
found_other_mode = 1;
5896
continue;
5897
}
5898
5899
lkb = iter;
5900
list_del_init(&iter->lkb_ownqueue);
5901
clear_bit(DLM_DFL_ORPHAN_BIT, &iter->lkb_dflags);
5902
*lkid = iter->lkb_id;
5903
break;
5904
}
5905
spin_unlock_bh(&ls->ls_orphans_lock);
5906
5907
if (!lkb && found_other_mode) {
5908
rv = -EAGAIN;
5909
goto out;
5910
}
5911
5912
if (!lkb) {
5913
rv = -ENOENT;
5914
goto out;
5915
}
5916
5917
lkb->lkb_exflags = flags;
5918
lkb->lkb_ownpid = (int) current->pid;
5919
5920
ua = lkb->lkb_ua;
5921
5922
ua->proc = ua_tmp->proc;
5923
ua->xid = ua_tmp->xid;
5924
ua->castparam = ua_tmp->castparam;
5925
ua->castaddr = ua_tmp->castaddr;
5926
ua->bastparam = ua_tmp->bastparam;
5927
ua->bastaddr = ua_tmp->bastaddr;
5928
ua->user_lksb = ua_tmp->user_lksb;
5929
5930
/*
5931
* The lkb reference from the ls_orphans list was not
5932
* removed above, and is now considered the reference
5933
* for the proc locks list.
5934
*/
5935
5936
spin_lock_bh(&ua->proc->locks_spin);
5937
list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5938
spin_unlock_bh(&ua->proc->locks_spin);
5939
out:
5940
kfree(ua_tmp);
5941
return rv;
5942
}
5943
5944
int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5945
uint32_t flags, uint32_t lkid, char *lvb_in)
5946
{
5947
struct dlm_lkb *lkb;
5948
struct dlm_args args;
5949
struct dlm_user_args *ua;
5950
int error;
5951
5952
dlm_lock_recovery(ls);
5953
5954
error = find_lkb(ls, lkid, &lkb);
5955
if (error)
5956
goto out;
5957
5958
trace_dlm_unlock_start(ls, lkb, flags);
5959
5960
ua = lkb->lkb_ua;
5961
5962
if (lvb_in && ua->lksb.sb_lvbptr)
5963
memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5964
if (ua_tmp->castparam)
5965
ua->castparam = ua_tmp->castparam;
5966
ua->user_lksb = ua_tmp->user_lksb;
5967
5968
error = set_unlock_args(flags, ua, &args);
5969
if (error)
5970
goto out_put;
5971
5972
error = unlock_lock(ls, lkb, &args);
5973
5974
if (error == -DLM_EUNLOCK)
5975
error = 0;
5976
/* from validate_unlock_args() */
5977
if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
5978
error = 0;
5979
if (error)
5980
goto out_put;
5981
5982
spin_lock_bh(&ua->proc->locks_spin);
5983
/* dlm_user_add_cb() may have already taken lkb off the proc list */
5984
if (!list_empty(&lkb->lkb_ownqueue))
5985
list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
5986
spin_unlock_bh(&ua->proc->locks_spin);
5987
out_put:
5988
trace_dlm_unlock_end(ls, lkb, flags, error);
5989
dlm_put_lkb(lkb);
5990
out:
5991
dlm_unlock_recovery(ls);
5992
kfree(ua_tmp);
5993
return error;
5994
}
5995
5996
int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5997
uint32_t flags, uint32_t lkid)
5998
{
5999
struct dlm_lkb *lkb;
6000
struct dlm_args args;
6001
struct dlm_user_args *ua;
6002
int error;
6003
6004
dlm_lock_recovery(ls);
6005
6006
error = find_lkb(ls, lkid, &lkb);
6007
if (error)
6008
goto out;
6009
6010
trace_dlm_unlock_start(ls, lkb, flags);
6011
6012
ua = lkb->lkb_ua;
6013
if (ua_tmp->castparam)
6014
ua->castparam = ua_tmp->castparam;
6015
ua->user_lksb = ua_tmp->user_lksb;
6016
6017
error = set_unlock_args(flags, ua, &args);
6018
if (error)
6019
goto out_put;
6020
6021
error = cancel_lock(ls, lkb, &args);
6022
6023
if (error == -DLM_ECANCEL)
6024
error = 0;
6025
/* from validate_unlock_args() */
6026
if (error == -EBUSY)
6027
error = 0;
6028
out_put:
6029
trace_dlm_unlock_end(ls, lkb, flags, error);
6030
dlm_put_lkb(lkb);
6031
out:
6032
dlm_unlock_recovery(ls);
6033
kfree(ua_tmp);
6034
return error;
6035
}
6036
6037
int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
6038
{
6039
struct dlm_lkb *lkb;
6040
struct dlm_args args;
6041
struct dlm_user_args *ua;
6042
struct dlm_rsb *r;
6043
int error;
6044
6045
dlm_lock_recovery(ls);
6046
6047
error = find_lkb(ls, lkid, &lkb);
6048
if (error)
6049
goto out;
6050
6051
trace_dlm_unlock_start(ls, lkb, flags);
6052
6053
ua = lkb->lkb_ua;
6054
6055
error = set_unlock_args(flags, ua, &args);
6056
if (error)
6057
goto out_put;
6058
6059
/* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
6060
6061
r = lkb->lkb_resource;
6062
hold_rsb(r);
6063
lock_rsb(r);
6064
6065
error = validate_unlock_args(lkb, &args);
6066
if (error)
6067
goto out_r;
6068
set_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags);
6069
6070
error = _cancel_lock(r, lkb);
6071
out_r:
6072
unlock_rsb(r);
6073
put_rsb(r);
6074
6075
if (error == -DLM_ECANCEL)
6076
error = 0;
6077
/* from validate_unlock_args() */
6078
if (error == -EBUSY)
6079
error = 0;
6080
out_put:
6081
trace_dlm_unlock_end(ls, lkb, flags, error);
6082
dlm_put_lkb(lkb);
6083
out:
6084
dlm_unlock_recovery(ls);
6085
return error;
6086
}
6087
6088
/* lkb's that are removed from the waiters list by revert are just left on the
6089
orphans list with the granted orphan locks, to be freed by purge */
6090
6091
static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6092
{
6093
struct dlm_args args;
6094
int error;
6095
6096
hold_lkb(lkb); /* reference for the ls_orphans list */
6097
spin_lock_bh(&ls->ls_orphans_lock);
6098
list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
6099
spin_unlock_bh(&ls->ls_orphans_lock);
6100
6101
set_unlock_args(0, lkb->lkb_ua, &args);
6102
6103
error = cancel_lock(ls, lkb, &args);
6104
if (error == -DLM_ECANCEL)
6105
error = 0;
6106
return error;
6107
}
6108
6109
/* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
6110
granted. Regardless of what rsb queue the lock is on, it's removed and
6111
freed. The IVVALBLK flag causes the lvb on the resource to be invalidated
6112
if our lock is PW/EX (it's ignored if our granted mode is smaller.) */
6113
6114
static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6115
{
6116
struct dlm_args args;
6117
int error;
6118
6119
set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK,
6120
lkb->lkb_ua, &args);
6121
6122
error = unlock_lock(ls, lkb, &args);
6123
if (error == -DLM_EUNLOCK)
6124
error = 0;
6125
return error;
6126
}
6127
6128
/* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
6129
(which does lock_rsb) due to deadlock with receiving a message that does
6130
lock_rsb followed by dlm_user_add_cb() */
6131
6132
static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
6133
struct dlm_user_proc *proc)
6134
{
6135
struct dlm_lkb *lkb = NULL;
6136
6137
spin_lock_bh(&ls->ls_clear_proc_locks);
6138
if (list_empty(&proc->locks))
6139
goto out;
6140
6141
lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
6142
list_del_init(&lkb->lkb_ownqueue);
6143
6144
if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6145
set_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags);
6146
else
6147
set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6148
out:
6149
spin_unlock_bh(&ls->ls_clear_proc_locks);
6150
return lkb;
6151
}
6152
6153
/* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
6154
1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
6155
which we clear here. */
6156
6157
/* proc CLOSING flag is set so no more device_reads should look at proc->asts
6158
list, and no more device_writes should add lkb's to proc->locks list; so we
6159
shouldn't need to take asts_spin or locks_spin here. this assumes that
6160
device reads/writes/closes are serialized -- FIXME: we may need to serialize
6161
them ourself. */
6162
6163
void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6164
{
6165
struct dlm_callback *cb, *cb_safe;
6166
struct dlm_lkb *lkb, *safe;
6167
6168
dlm_lock_recovery(ls);
6169
6170
while (1) {
6171
lkb = del_proc_lock(ls, proc);
6172
if (!lkb)
6173
break;
6174
if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6175
orphan_proc_lock(ls, lkb);
6176
else
6177
unlock_proc_lock(ls, lkb);
6178
6179
/* this removes the reference for the proc->locks list
6180
added by dlm_user_request, it may result in the lkb
6181
being freed */
6182
6183
dlm_put_lkb(lkb);
6184
}
6185
6186
spin_lock_bh(&ls->ls_clear_proc_locks);
6187
6188
/* in-progress unlocks */
6189
list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6190
list_del_init(&lkb->lkb_ownqueue);
6191
set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6192
dlm_put_lkb(lkb);
6193
}
6194
6195
list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
6196
list_del(&cb->list);
6197
dlm_free_cb(cb);
6198
}
6199
6200
spin_unlock_bh(&ls->ls_clear_proc_locks);
6201
dlm_unlock_recovery(ls);
6202
}
6203
6204
static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6205
{
6206
struct dlm_callback *cb, *cb_safe;
6207
struct dlm_lkb *lkb, *safe;
6208
6209
while (1) {
6210
lkb = NULL;
6211
spin_lock_bh(&proc->locks_spin);
6212
if (!list_empty(&proc->locks)) {
6213
lkb = list_entry(proc->locks.next, struct dlm_lkb,
6214
lkb_ownqueue);
6215
list_del_init(&lkb->lkb_ownqueue);
6216
}
6217
spin_unlock_bh(&proc->locks_spin);
6218
6219
if (!lkb)
6220
break;
6221
6222
set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6223
unlock_proc_lock(ls, lkb);
6224
dlm_put_lkb(lkb); /* ref from proc->locks list */
6225
}
6226
6227
spin_lock_bh(&proc->locks_spin);
6228
list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6229
list_del_init(&lkb->lkb_ownqueue);
6230
set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6231
dlm_put_lkb(lkb);
6232
}
6233
spin_unlock_bh(&proc->locks_spin);
6234
6235
spin_lock_bh(&proc->asts_spin);
6236
list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
6237
list_del(&cb->list);
6238
dlm_free_cb(cb);
6239
}
6240
spin_unlock_bh(&proc->asts_spin);
6241
}
6242
6243
/* pid of 0 means purge all orphans */
6244
6245
static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6246
{
6247
struct dlm_lkb *lkb, *safe;
6248
6249
spin_lock_bh(&ls->ls_orphans_lock);
6250
list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6251
if (pid && lkb->lkb_ownpid != pid)
6252
continue;
6253
unlock_proc_lock(ls, lkb);
6254
list_del_init(&lkb->lkb_ownqueue);
6255
dlm_put_lkb(lkb);
6256
}
6257
spin_unlock_bh(&ls->ls_orphans_lock);
6258
}
6259
6260
static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6261
{
6262
struct dlm_message *ms;
6263
struct dlm_mhandle *mh;
6264
int error;
6265
6266
error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6267
DLM_MSG_PURGE, &ms, &mh);
6268
if (error)
6269
return error;
6270
ms->m_nodeid = cpu_to_le32(nodeid);
6271
ms->m_pid = cpu_to_le32(pid);
6272
6273
return send_message(mh, ms, NULL, 0);
6274
}
6275
6276
int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6277
int nodeid, int pid)
6278
{
6279
int error = 0;
6280
6281
if (nodeid && (nodeid != dlm_our_nodeid())) {
6282
error = send_purge(ls, nodeid, pid);
6283
} else {
6284
dlm_lock_recovery(ls);
6285
if (pid == current->pid)
6286
purge_proc_locks(ls, proc);
6287
else
6288
do_purge(ls, nodeid, pid);
6289
dlm_unlock_recovery(ls);
6290
}
6291
return error;
6292
}
6293
6294
/* debug functionality */
6295
int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
6296
int lkb_nodeid, unsigned int lkb_dflags, int lkb_status)
6297
{
6298
struct dlm_lksb *lksb;
6299
struct dlm_lkb *lkb;
6300
struct dlm_rsb *r;
6301
int error;
6302
6303
/* we currently can't set a valid user lock */
6304
if (lkb_dflags & BIT(DLM_DFL_USER_BIT))
6305
return -EOPNOTSUPP;
6306
6307
lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
6308
if (!lksb)
6309
return -ENOMEM;
6310
6311
error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1);
6312
if (error) {
6313
kfree(lksb);
6314
return error;
6315
}
6316
6317
dlm_set_dflags_val(lkb, lkb_dflags);
6318
lkb->lkb_nodeid = lkb_nodeid;
6319
lkb->lkb_lksb = lksb;
6320
/* user specific pointer, just don't have it NULL for kernel locks */
6321
if (~lkb_dflags & BIT(DLM_DFL_USER_BIT))
6322
lkb->lkb_astparam = (void *)0xDEADBEEF;
6323
6324
error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
6325
if (error) {
6326
kfree(lksb);
6327
__put_lkb(ls, lkb);
6328
return error;
6329
}
6330
6331
lock_rsb(r);
6332
attach_lkb(r, lkb);
6333
add_lkb(r, lkb, lkb_status);
6334
unlock_rsb(r);
6335
put_rsb(r);
6336
6337
return 0;
6338
}
6339
6340
int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
6341
int mstype, int to_nodeid)
6342
{
6343
struct dlm_lkb *lkb;
6344
int error;
6345
6346
error = find_lkb(ls, lkb_id, &lkb);
6347
if (error)
6348
return error;
6349
6350
add_to_waiters(lkb, mstype, to_nodeid);
6351
dlm_put_lkb(lkb);
6352
return 0;
6353
}
6354
6355
6356