Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
awilliam
GitHub Repository: awilliam/linux-vfio
Path: blob/master/fs/dlm/lock.c
15109 views
1
/******************************************************************************
2
*******************************************************************************
3
**
4
** Copyright (C) 2005-2010 Red Hat, Inc. All rights reserved.
5
**
6
** This copyrighted material is made available to anyone wishing to use,
7
** modify, copy, or redistribute it subject to the terms and conditions
8
** of the GNU General Public License v.2.
9
**
10
*******************************************************************************
11
******************************************************************************/
12
13
/* Central locking logic has four stages:
14
15
dlm_lock()
16
dlm_unlock()
17
18
request_lock(ls, lkb)
19
convert_lock(ls, lkb)
20
unlock_lock(ls, lkb)
21
cancel_lock(ls, lkb)
22
23
_request_lock(r, lkb)
24
_convert_lock(r, lkb)
25
_unlock_lock(r, lkb)
26
_cancel_lock(r, lkb)
27
28
do_request(r, lkb)
29
do_convert(r, lkb)
30
do_unlock(r, lkb)
31
do_cancel(r, lkb)
32
33
Stage 1 (lock, unlock) is mainly about checking input args and
34
splitting into one of the four main operations:
35
36
dlm_lock = request_lock
37
dlm_lock+CONVERT = convert_lock
38
dlm_unlock = unlock_lock
39
dlm_unlock+CANCEL = cancel_lock
40
41
Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42
provided to the next stage.
43
44
Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45
When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47
Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
48
given rsb and lkb and queues callbacks.
49
50
For remote operations, send_xxxx() results in the corresponding do_xxxx()
51
function being executed on the remote node. The connecting send/receive
52
calls on local (L) and remote (R) nodes:
53
54
L: send_xxxx() -> R: receive_xxxx()
55
R: do_xxxx()
56
L: receive_xxxx_reply() <- R: send_xxxx_reply()
57
*/
58
#include <linux/types.h>
59
#include <linux/slab.h>
60
#include "dlm_internal.h"
61
#include <linux/dlm_device.h>
62
#include "memory.h"
63
#include "lowcomms.h"
64
#include "requestqueue.h"
65
#include "util.h"
66
#include "dir.h"
67
#include "member.h"
68
#include "lockspace.h"
69
#include "ast.h"
70
#include "lock.h"
71
#include "rcom.h"
72
#include "recover.h"
73
#include "lvb_table.h"
74
#include "user.h"
75
#include "config.h"
76
77
static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
78
static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
79
static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
80
static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
81
static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
82
static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
83
static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
84
static int send_remove(struct dlm_rsb *r);
85
static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
86
static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87
static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
88
struct dlm_message *ms);
89
static int receive_extralen(struct dlm_message *ms);
90
static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
91
static void del_timeout(struct dlm_lkb *lkb);
92
93
/*
94
* Lock compatibilty matrix - thanks Steve
95
* UN = Unlocked state. Not really a state, used as a flag
96
* PD = Padding. Used to make the matrix a nice power of two in size
97
* Other states are the same as the VMS DLM.
98
* Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
99
*/
100
101
static const int __dlm_compat_matrix[8][8] = {
102
/* UN NL CR CW PR PW EX PD */
103
{1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
104
{1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
105
{1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
106
{1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
107
{1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
108
{1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
109
{1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
110
{0, 0, 0, 0, 0, 0, 0, 0} /* PD */
111
};
112
113
/*
114
* This defines the direction of transfer of LVB data.
115
* Granted mode is the row; requested mode is the column.
116
* Usage: matrix[grmode+1][rqmode+1]
117
* 1 = LVB is returned to the caller
118
* 0 = LVB is written to the resource
119
* -1 = nothing happens to the LVB
120
*/
121
122
const int dlm_lvb_operations[8][8] = {
123
/* UN NL CR CW PR PW EX PD*/
124
{ -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
125
{ -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
126
{ -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
127
{ -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
128
{ -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
129
{ -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
130
{ -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
131
{ -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
132
};
133
134
#define modes_compat(gr, rq) \
135
__dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
136
137
int dlm_modes_compat(int mode1, int mode2)
138
{
139
return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
140
}
141
142
/*
143
* Compatibility matrix for conversions with QUECVT set.
144
* Granted mode is the row; requested mode is the column.
145
* Usage: matrix[grmode+1][rqmode+1]
146
*/
147
148
static const int __quecvt_compat_matrix[8][8] = {
149
/* UN NL CR CW PR PW EX PD */
150
{0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
151
{0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
152
{0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
153
{0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
154
{0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
155
{0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
156
{0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
157
{0, 0, 0, 0, 0, 0, 0, 0} /* PD */
158
};
159
160
void dlm_print_lkb(struct dlm_lkb *lkb)
161
{
162
printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
163
" status %d rqmode %d grmode %d wait_type %d\n",
164
lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
165
lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
166
lkb->lkb_grmode, lkb->lkb_wait_type);
167
}
168
169
static void dlm_print_rsb(struct dlm_rsb *r)
170
{
171
printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
172
r->res_nodeid, r->res_flags, r->res_first_lkid,
173
r->res_recover_locks_count, r->res_name);
174
}
175
176
void dlm_dump_rsb(struct dlm_rsb *r)
177
{
178
struct dlm_lkb *lkb;
179
180
dlm_print_rsb(r);
181
182
printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
183
list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
184
printk(KERN_ERR "rsb lookup list\n");
185
list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
186
dlm_print_lkb(lkb);
187
printk(KERN_ERR "rsb grant queue:\n");
188
list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
189
dlm_print_lkb(lkb);
190
printk(KERN_ERR "rsb convert queue:\n");
191
list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
192
dlm_print_lkb(lkb);
193
printk(KERN_ERR "rsb wait queue:\n");
194
list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
195
dlm_print_lkb(lkb);
196
}
197
198
/* Threads cannot use the lockspace while it's being recovered */
199
200
static inline void dlm_lock_recovery(struct dlm_ls *ls)
201
{
202
down_read(&ls->ls_in_recovery);
203
}
204
205
void dlm_unlock_recovery(struct dlm_ls *ls)
206
{
207
up_read(&ls->ls_in_recovery);
208
}
209
210
int dlm_lock_recovery_try(struct dlm_ls *ls)
211
{
212
return down_read_trylock(&ls->ls_in_recovery);
213
}
214
215
static inline int can_be_queued(struct dlm_lkb *lkb)
216
{
217
return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
218
}
219
220
static inline int force_blocking_asts(struct dlm_lkb *lkb)
221
{
222
return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
223
}
224
225
static inline int is_demoted(struct dlm_lkb *lkb)
226
{
227
return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
228
}
229
230
static inline int is_altmode(struct dlm_lkb *lkb)
231
{
232
return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
233
}
234
235
static inline int is_granted(struct dlm_lkb *lkb)
236
{
237
return (lkb->lkb_status == DLM_LKSTS_GRANTED);
238
}
239
240
static inline int is_remote(struct dlm_rsb *r)
241
{
242
DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
243
return !!r->res_nodeid;
244
}
245
246
static inline int is_process_copy(struct dlm_lkb *lkb)
247
{
248
return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
249
}
250
251
static inline int is_master_copy(struct dlm_lkb *lkb)
252
{
253
if (lkb->lkb_flags & DLM_IFL_MSTCPY)
254
DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
255
return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
256
}
257
258
static inline int middle_conversion(struct dlm_lkb *lkb)
259
{
260
if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
261
(lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
262
return 1;
263
return 0;
264
}
265
266
static inline int down_conversion(struct dlm_lkb *lkb)
267
{
268
return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
269
}
270
271
static inline int is_overlap_unlock(struct dlm_lkb *lkb)
272
{
273
return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
274
}
275
276
static inline int is_overlap_cancel(struct dlm_lkb *lkb)
277
{
278
return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
279
}
280
281
static inline int is_overlap(struct dlm_lkb *lkb)
282
{
283
return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
284
DLM_IFL_OVERLAP_CANCEL));
285
}
286
287
static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
288
{
289
if (is_master_copy(lkb))
290
return;
291
292
del_timeout(lkb);
293
294
DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
295
296
/* if the operation was a cancel, then return -DLM_ECANCEL, if a
297
timeout caused the cancel then return -ETIMEDOUT */
298
if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
299
lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
300
rv = -ETIMEDOUT;
301
}
302
303
if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
304
lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
305
rv = -EDEADLK;
306
}
307
308
dlm_add_ast(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
309
}
310
311
static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
312
{
313
queue_cast(r, lkb,
314
is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
315
}
316
317
static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
318
{
319
if (is_master_copy(lkb)) {
320
send_bast(r, lkb, rqmode);
321
} else {
322
dlm_add_ast(lkb, DLM_CB_BAST, rqmode, 0, 0);
323
}
324
}
325
326
/*
327
* Basic operations on rsb's and lkb's
328
*/
329
330
static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
331
{
332
struct dlm_rsb *r;
333
334
r = dlm_allocate_rsb(ls, len);
335
if (!r)
336
return NULL;
337
338
r->res_ls = ls;
339
r->res_length = len;
340
memcpy(r->res_name, name, len);
341
mutex_init(&r->res_mutex);
342
343
INIT_LIST_HEAD(&r->res_lookup);
344
INIT_LIST_HEAD(&r->res_grantqueue);
345
INIT_LIST_HEAD(&r->res_convertqueue);
346
INIT_LIST_HEAD(&r->res_waitqueue);
347
INIT_LIST_HEAD(&r->res_root_list);
348
INIT_LIST_HEAD(&r->res_recover_list);
349
350
return r;
351
}
352
353
static int search_rsb_list(struct list_head *head, char *name, int len,
354
unsigned int flags, struct dlm_rsb **r_ret)
355
{
356
struct dlm_rsb *r;
357
int error = 0;
358
359
list_for_each_entry(r, head, res_hashchain) {
360
if (len == r->res_length && !memcmp(name, r->res_name, len))
361
goto found;
362
}
363
*r_ret = NULL;
364
return -EBADR;
365
366
found:
367
if (r->res_nodeid && (flags & R_MASTER))
368
error = -ENOTBLK;
369
*r_ret = r;
370
return error;
371
}
372
373
static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
374
unsigned int flags, struct dlm_rsb **r_ret)
375
{
376
struct dlm_rsb *r;
377
int error;
378
379
error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
380
if (!error) {
381
kref_get(&r->res_ref);
382
goto out;
383
}
384
error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
385
if (error)
386
goto out;
387
388
list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
389
390
if (dlm_no_directory(ls))
391
goto out;
392
393
if (r->res_nodeid == -1) {
394
rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
395
r->res_first_lkid = 0;
396
} else if (r->res_nodeid > 0) {
397
rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
398
r->res_first_lkid = 0;
399
} else {
400
DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
401
DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
402
}
403
out:
404
*r_ret = r;
405
return error;
406
}
407
408
static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
409
unsigned int flags, struct dlm_rsb **r_ret)
410
{
411
int error;
412
spin_lock(&ls->ls_rsbtbl[b].lock);
413
error = _search_rsb(ls, name, len, b, flags, r_ret);
414
spin_unlock(&ls->ls_rsbtbl[b].lock);
415
return error;
416
}
417
418
/*
419
* Find rsb in rsbtbl and potentially create/add one
420
*
421
* Delaying the release of rsb's has a similar benefit to applications keeping
422
* NL locks on an rsb, but without the guarantee that the cached master value
423
* will still be valid when the rsb is reused. Apps aren't always smart enough
424
* to keep NL locks on an rsb that they may lock again shortly; this can lead
425
* to excessive master lookups and removals if we don't delay the release.
426
*
427
* Searching for an rsb means looking through both the normal list and toss
428
* list. When found on the toss list the rsb is moved to the normal list with
429
* ref count of 1; when found on normal list the ref count is incremented.
430
*/
431
432
static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
433
unsigned int flags, struct dlm_rsb **r_ret)
434
{
435
struct dlm_rsb *r = NULL, *tmp;
436
uint32_t hash, bucket;
437
int error = -EINVAL;
438
439
if (namelen > DLM_RESNAME_MAXLEN)
440
goto out;
441
442
if (dlm_no_directory(ls))
443
flags |= R_CREATE;
444
445
error = 0;
446
hash = jhash(name, namelen, 0);
447
bucket = hash & (ls->ls_rsbtbl_size - 1);
448
449
error = search_rsb(ls, name, namelen, bucket, flags, &r);
450
if (!error)
451
goto out;
452
453
if (error == -EBADR && !(flags & R_CREATE))
454
goto out;
455
456
/* the rsb was found but wasn't a master copy */
457
if (error == -ENOTBLK)
458
goto out;
459
460
error = -ENOMEM;
461
r = create_rsb(ls, name, namelen);
462
if (!r)
463
goto out;
464
465
r->res_hash = hash;
466
r->res_bucket = bucket;
467
r->res_nodeid = -1;
468
kref_init(&r->res_ref);
469
470
/* With no directory, the master can be set immediately */
471
if (dlm_no_directory(ls)) {
472
int nodeid = dlm_dir_nodeid(r);
473
if (nodeid == dlm_our_nodeid())
474
nodeid = 0;
475
r->res_nodeid = nodeid;
476
}
477
478
spin_lock(&ls->ls_rsbtbl[bucket].lock);
479
error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
480
if (!error) {
481
spin_unlock(&ls->ls_rsbtbl[bucket].lock);
482
dlm_free_rsb(r);
483
r = tmp;
484
goto out;
485
}
486
list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
487
spin_unlock(&ls->ls_rsbtbl[bucket].lock);
488
error = 0;
489
out:
490
*r_ret = r;
491
return error;
492
}
493
494
/* This is only called to add a reference when the code already holds
495
a valid reference to the rsb, so there's no need for locking. */
496
497
static inline void hold_rsb(struct dlm_rsb *r)
498
{
499
kref_get(&r->res_ref);
500
}
501
502
void dlm_hold_rsb(struct dlm_rsb *r)
503
{
504
hold_rsb(r);
505
}
506
507
static void toss_rsb(struct kref *kref)
508
{
509
struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
510
struct dlm_ls *ls = r->res_ls;
511
512
DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
513
kref_init(&r->res_ref);
514
list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
515
r->res_toss_time = jiffies;
516
if (r->res_lvbptr) {
517
dlm_free_lvb(r->res_lvbptr);
518
r->res_lvbptr = NULL;
519
}
520
}
521
522
/* When all references to the rsb are gone it's transferred to
523
the tossed list for later disposal. */
524
525
static void put_rsb(struct dlm_rsb *r)
526
{
527
struct dlm_ls *ls = r->res_ls;
528
uint32_t bucket = r->res_bucket;
529
530
spin_lock(&ls->ls_rsbtbl[bucket].lock);
531
kref_put(&r->res_ref, toss_rsb);
532
spin_unlock(&ls->ls_rsbtbl[bucket].lock);
533
}
534
535
void dlm_put_rsb(struct dlm_rsb *r)
536
{
537
put_rsb(r);
538
}
539
540
/* See comment for unhold_lkb */
541
542
static void unhold_rsb(struct dlm_rsb *r)
543
{
544
int rv;
545
rv = kref_put(&r->res_ref, toss_rsb);
546
DLM_ASSERT(!rv, dlm_dump_rsb(r););
547
}
548
549
static void kill_rsb(struct kref *kref)
550
{
551
struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
552
553
/* All work is done after the return from kref_put() so we
554
can release the write_lock before the remove and free. */
555
556
DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
557
DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
558
DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
559
DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
560
DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
561
DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
562
}
563
564
/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
565
The rsb must exist as long as any lkb's for it do. */
566
567
static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
568
{
569
hold_rsb(r);
570
lkb->lkb_resource = r;
571
}
572
573
static void detach_lkb(struct dlm_lkb *lkb)
574
{
575
if (lkb->lkb_resource) {
576
put_rsb(lkb->lkb_resource);
577
lkb->lkb_resource = NULL;
578
}
579
}
580
581
static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
582
{
583
struct dlm_lkb *lkb, *tmp;
584
uint32_t lkid = 0;
585
uint16_t bucket;
586
587
lkb = dlm_allocate_lkb(ls);
588
if (!lkb)
589
return -ENOMEM;
590
591
lkb->lkb_nodeid = -1;
592
lkb->lkb_grmode = DLM_LOCK_IV;
593
kref_init(&lkb->lkb_ref);
594
INIT_LIST_HEAD(&lkb->lkb_ownqueue);
595
INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
596
INIT_LIST_HEAD(&lkb->lkb_time_list);
597
INIT_LIST_HEAD(&lkb->lkb_astqueue);
598
599
get_random_bytes(&bucket, sizeof(bucket));
600
bucket &= (ls->ls_lkbtbl_size - 1);
601
602
write_lock(&ls->ls_lkbtbl[bucket].lock);
603
604
/* counter can roll over so we must verify lkid is not in use */
605
606
while (lkid == 0) {
607
lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
608
609
list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
610
lkb_idtbl_list) {
611
if (tmp->lkb_id != lkid)
612
continue;
613
lkid = 0;
614
break;
615
}
616
}
617
618
lkb->lkb_id = lkid;
619
list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
620
write_unlock(&ls->ls_lkbtbl[bucket].lock);
621
622
*lkb_ret = lkb;
623
return 0;
624
}
625
626
static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
627
{
628
struct dlm_lkb *lkb;
629
uint16_t bucket = (lkid >> 16);
630
631
list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
632
if (lkb->lkb_id == lkid)
633
return lkb;
634
}
635
return NULL;
636
}
637
638
static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
639
{
640
struct dlm_lkb *lkb;
641
uint16_t bucket = (lkid >> 16);
642
643
if (bucket >= ls->ls_lkbtbl_size)
644
return -EBADSLT;
645
646
read_lock(&ls->ls_lkbtbl[bucket].lock);
647
lkb = __find_lkb(ls, lkid);
648
if (lkb)
649
kref_get(&lkb->lkb_ref);
650
read_unlock(&ls->ls_lkbtbl[bucket].lock);
651
652
*lkb_ret = lkb;
653
return lkb ? 0 : -ENOENT;
654
}
655
656
static void kill_lkb(struct kref *kref)
657
{
658
struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
659
660
/* All work is done after the return from kref_put() so we
661
can release the write_lock before the detach_lkb */
662
663
DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
664
}
665
666
/* __put_lkb() is used when an lkb may not have an rsb attached to
667
it so we need to provide the lockspace explicitly */
668
669
static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
670
{
671
uint16_t bucket = (lkb->lkb_id >> 16);
672
673
write_lock(&ls->ls_lkbtbl[bucket].lock);
674
if (kref_put(&lkb->lkb_ref, kill_lkb)) {
675
list_del(&lkb->lkb_idtbl_list);
676
write_unlock(&ls->ls_lkbtbl[bucket].lock);
677
678
detach_lkb(lkb);
679
680
/* for local/process lkbs, lvbptr points to caller's lksb */
681
if (lkb->lkb_lvbptr && is_master_copy(lkb))
682
dlm_free_lvb(lkb->lkb_lvbptr);
683
dlm_free_lkb(lkb);
684
return 1;
685
} else {
686
write_unlock(&ls->ls_lkbtbl[bucket].lock);
687
return 0;
688
}
689
}
690
691
int dlm_put_lkb(struct dlm_lkb *lkb)
692
{
693
struct dlm_ls *ls;
694
695
DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
696
DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
697
698
ls = lkb->lkb_resource->res_ls;
699
return __put_lkb(ls, lkb);
700
}
701
702
/* This is only called to add a reference when the code already holds
703
a valid reference to the lkb, so there's no need for locking. */
704
705
static inline void hold_lkb(struct dlm_lkb *lkb)
706
{
707
kref_get(&lkb->lkb_ref);
708
}
709
710
/* This is called when we need to remove a reference and are certain
711
it's not the last ref. e.g. del_lkb is always called between a
712
find_lkb/put_lkb and is always the inverse of a previous add_lkb.
713
put_lkb would work fine, but would involve unnecessary locking */
714
715
static inline void unhold_lkb(struct dlm_lkb *lkb)
716
{
717
int rv;
718
rv = kref_put(&lkb->lkb_ref, kill_lkb);
719
DLM_ASSERT(!rv, dlm_print_lkb(lkb););
720
}
721
722
static void lkb_add_ordered(struct list_head *new, struct list_head *head,
723
int mode)
724
{
725
struct dlm_lkb *lkb = NULL;
726
727
list_for_each_entry(lkb, head, lkb_statequeue)
728
if (lkb->lkb_rqmode < mode)
729
break;
730
731
__list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
732
}
733
734
/* add/remove lkb to rsb's grant/convert/wait queue */
735
736
static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
737
{
738
kref_get(&lkb->lkb_ref);
739
740
DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
741
742
lkb->lkb_timestamp = ktime_get();
743
744
lkb->lkb_status = status;
745
746
switch (status) {
747
case DLM_LKSTS_WAITING:
748
if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
749
list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
750
else
751
list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
752
break;
753
case DLM_LKSTS_GRANTED:
754
/* convention says granted locks kept in order of grmode */
755
lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
756
lkb->lkb_grmode);
757
break;
758
case DLM_LKSTS_CONVERT:
759
if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
760
list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
761
else
762
list_add_tail(&lkb->lkb_statequeue,
763
&r->res_convertqueue);
764
break;
765
default:
766
DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
767
}
768
}
769
770
static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
771
{
772
lkb->lkb_status = 0;
773
list_del(&lkb->lkb_statequeue);
774
unhold_lkb(lkb);
775
}
776
777
static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
778
{
779
hold_lkb(lkb);
780
del_lkb(r, lkb);
781
add_lkb(r, lkb, sts);
782
unhold_lkb(lkb);
783
}
784
785
static int msg_reply_type(int mstype)
786
{
787
switch (mstype) {
788
case DLM_MSG_REQUEST:
789
return DLM_MSG_REQUEST_REPLY;
790
case DLM_MSG_CONVERT:
791
return DLM_MSG_CONVERT_REPLY;
792
case DLM_MSG_UNLOCK:
793
return DLM_MSG_UNLOCK_REPLY;
794
case DLM_MSG_CANCEL:
795
return DLM_MSG_CANCEL_REPLY;
796
case DLM_MSG_LOOKUP:
797
return DLM_MSG_LOOKUP_REPLY;
798
}
799
return -1;
800
}
801
802
static int nodeid_warned(int nodeid, int num_nodes, int *warned)
803
{
804
int i;
805
806
for (i = 0; i < num_nodes; i++) {
807
if (!warned[i]) {
808
warned[i] = nodeid;
809
return 0;
810
}
811
if (warned[i] == nodeid)
812
return 1;
813
}
814
return 0;
815
}
816
817
void dlm_scan_waiters(struct dlm_ls *ls)
818
{
819
struct dlm_lkb *lkb;
820
ktime_t zero = ktime_set(0, 0);
821
s64 us;
822
s64 debug_maxus = 0;
823
u32 debug_scanned = 0;
824
u32 debug_expired = 0;
825
int num_nodes = 0;
826
int *warned = NULL;
827
828
if (!dlm_config.ci_waitwarn_us)
829
return;
830
831
mutex_lock(&ls->ls_waiters_mutex);
832
833
list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
834
if (ktime_equal(lkb->lkb_wait_time, zero))
835
continue;
836
837
debug_scanned++;
838
839
us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
840
841
if (us < dlm_config.ci_waitwarn_us)
842
continue;
843
844
lkb->lkb_wait_time = zero;
845
846
debug_expired++;
847
if (us > debug_maxus)
848
debug_maxus = us;
849
850
if (!num_nodes) {
851
num_nodes = ls->ls_num_nodes;
852
warned = kmalloc(GFP_KERNEL, num_nodes * sizeof(int));
853
if (warned)
854
memset(warned, 0, num_nodes * sizeof(int));
855
}
856
if (!warned)
857
continue;
858
if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
859
continue;
860
861
log_error(ls, "waitwarn %x %lld %d us check connection to "
862
"node %d", lkb->lkb_id, (long long)us,
863
dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
864
}
865
mutex_unlock(&ls->ls_waiters_mutex);
866
867
if (warned)
868
kfree(warned);
869
870
if (debug_expired)
871
log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
872
debug_scanned, debug_expired,
873
dlm_config.ci_waitwarn_us, (long long)debug_maxus);
874
}
875
876
/* add/remove lkb from global waiters list of lkb's waiting for
877
a reply from a remote node */
878
879
static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
880
{
881
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
882
int error = 0;
883
884
mutex_lock(&ls->ls_waiters_mutex);
885
886
if (is_overlap_unlock(lkb) ||
887
(is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
888
error = -EINVAL;
889
goto out;
890
}
891
892
if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
893
switch (mstype) {
894
case DLM_MSG_UNLOCK:
895
lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
896
break;
897
case DLM_MSG_CANCEL:
898
lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
899
break;
900
default:
901
error = -EBUSY;
902
goto out;
903
}
904
lkb->lkb_wait_count++;
905
hold_lkb(lkb);
906
907
log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
908
lkb->lkb_id, lkb->lkb_wait_type, mstype,
909
lkb->lkb_wait_count, lkb->lkb_flags);
910
goto out;
911
}
912
913
DLM_ASSERT(!lkb->lkb_wait_count,
914
dlm_print_lkb(lkb);
915
printk("wait_count %d\n", lkb->lkb_wait_count););
916
917
lkb->lkb_wait_count++;
918
lkb->lkb_wait_type = mstype;
919
lkb->lkb_wait_time = ktime_get();
920
lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
921
hold_lkb(lkb);
922
list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
923
out:
924
if (error)
925
log_error(ls, "addwait error %x %d flags %x %d %d %s",
926
lkb->lkb_id, error, lkb->lkb_flags, mstype,
927
lkb->lkb_wait_type, lkb->lkb_resource->res_name);
928
mutex_unlock(&ls->ls_waiters_mutex);
929
return error;
930
}
931
932
/* We clear the RESEND flag because we might be taking an lkb off the waiters
933
list as part of process_requestqueue (e.g. a lookup that has an optimized
934
request reply on the requestqueue) between dlm_recover_waiters_pre() which
935
set RESEND and dlm_recover_waiters_post() */
936
937
static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
938
struct dlm_message *ms)
939
{
940
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
941
int overlap_done = 0;
942
943
if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
944
log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
945
lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
946
overlap_done = 1;
947
goto out_del;
948
}
949
950
if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
951
log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
952
lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
953
overlap_done = 1;
954
goto out_del;
955
}
956
957
/* Cancel state was preemptively cleared by a successful convert,
958
see next comment, nothing to do. */
959
960
if ((mstype == DLM_MSG_CANCEL_REPLY) &&
961
(lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
962
log_debug(ls, "remwait %x cancel_reply wait_type %d",
963
lkb->lkb_id, lkb->lkb_wait_type);
964
return -1;
965
}
966
967
/* Remove for the convert reply, and premptively remove for the
968
cancel reply. A convert has been granted while there's still
969
an outstanding cancel on it (the cancel is moot and the result
970
in the cancel reply should be 0). We preempt the cancel reply
971
because the app gets the convert result and then can follow up
972
with another op, like convert. This subsequent op would see the
973
lingering state of the cancel and fail with -EBUSY. */
974
975
if ((mstype == DLM_MSG_CONVERT_REPLY) &&
976
(lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
977
is_overlap_cancel(lkb) && ms && !ms->m_result) {
978
log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
979
lkb->lkb_id);
980
lkb->lkb_wait_type = 0;
981
lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
982
lkb->lkb_wait_count--;
983
goto out_del;
984
}
985
986
/* N.B. type of reply may not always correspond to type of original
987
msg due to lookup->request optimization, verify others? */
988
989
if (lkb->lkb_wait_type) {
990
lkb->lkb_wait_type = 0;
991
goto out_del;
992
}
993
994
log_error(ls, "remwait error %x reply %d flags %x no wait_type",
995
lkb->lkb_id, mstype, lkb->lkb_flags);
996
return -1;
997
998
out_del:
999
/* the force-unlock/cancel has completed and we haven't recvd a reply
1000
to the op that was in progress prior to the unlock/cancel; we
1001
give up on any reply to the earlier op. FIXME: not sure when/how
1002
this would happen */
1003
1004
if (overlap_done && lkb->lkb_wait_type) {
1005
log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1006
lkb->lkb_id, mstype, lkb->lkb_wait_type);
1007
lkb->lkb_wait_count--;
1008
lkb->lkb_wait_type = 0;
1009
}
1010
1011
DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1012
1013
lkb->lkb_flags &= ~DLM_IFL_RESEND;
1014
lkb->lkb_wait_count--;
1015
if (!lkb->lkb_wait_count)
1016
list_del_init(&lkb->lkb_wait_reply);
1017
unhold_lkb(lkb);
1018
return 0;
1019
}
1020
1021
static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1022
{
1023
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1024
int error;
1025
1026
mutex_lock(&ls->ls_waiters_mutex);
1027
error = _remove_from_waiters(lkb, mstype, NULL);
1028
mutex_unlock(&ls->ls_waiters_mutex);
1029
return error;
1030
}
1031
1032
/* Handles situations where we might be processing a "fake" or "stub" reply in
1033
which we can't try to take waiters_mutex again. */
1034
1035
static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1036
{
1037
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1038
int error;
1039
1040
if (ms->m_flags != DLM_IFL_STUB_MS)
1041
mutex_lock(&ls->ls_waiters_mutex);
1042
error = _remove_from_waiters(lkb, ms->m_type, ms);
1043
if (ms->m_flags != DLM_IFL_STUB_MS)
1044
mutex_unlock(&ls->ls_waiters_mutex);
1045
return error;
1046
}
1047
1048
static void dir_remove(struct dlm_rsb *r)
1049
{
1050
int to_nodeid;
1051
1052
if (dlm_no_directory(r->res_ls))
1053
return;
1054
1055
to_nodeid = dlm_dir_nodeid(r);
1056
if (to_nodeid != dlm_our_nodeid())
1057
send_remove(r);
1058
else
1059
dlm_dir_remove_entry(r->res_ls, to_nodeid,
1060
r->res_name, r->res_length);
1061
}
1062
1063
/* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
1064
found since they are in order of newest to oldest? */
1065
1066
static int shrink_bucket(struct dlm_ls *ls, int b)
1067
{
1068
struct dlm_rsb *r;
1069
int count = 0, found;
1070
1071
for (;;) {
1072
found = 0;
1073
spin_lock(&ls->ls_rsbtbl[b].lock);
1074
list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
1075
res_hashchain) {
1076
if (!time_after_eq(jiffies, r->res_toss_time +
1077
dlm_config.ci_toss_secs * HZ))
1078
continue;
1079
found = 1;
1080
break;
1081
}
1082
1083
if (!found) {
1084
spin_unlock(&ls->ls_rsbtbl[b].lock);
1085
break;
1086
}
1087
1088
if (kref_put(&r->res_ref, kill_rsb)) {
1089
list_del(&r->res_hashchain);
1090
spin_unlock(&ls->ls_rsbtbl[b].lock);
1091
1092
if (is_master(r))
1093
dir_remove(r);
1094
dlm_free_rsb(r);
1095
count++;
1096
} else {
1097
spin_unlock(&ls->ls_rsbtbl[b].lock);
1098
log_error(ls, "tossed rsb in use %s", r->res_name);
1099
}
1100
}
1101
1102
return count;
1103
}
1104
1105
void dlm_scan_rsbs(struct dlm_ls *ls)
1106
{
1107
int i;
1108
1109
for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1110
shrink_bucket(ls, i);
1111
if (dlm_locking_stopped(ls))
1112
break;
1113
cond_resched();
1114
}
1115
}
1116
1117
static void add_timeout(struct dlm_lkb *lkb)
1118
{
1119
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1120
1121
if (is_master_copy(lkb))
1122
return;
1123
1124
if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1125
!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1126
lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1127
goto add_it;
1128
}
1129
if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1130
goto add_it;
1131
return;
1132
1133
add_it:
1134
DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1135
mutex_lock(&ls->ls_timeout_mutex);
1136
hold_lkb(lkb);
1137
list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1138
mutex_unlock(&ls->ls_timeout_mutex);
1139
}
1140
1141
static void del_timeout(struct dlm_lkb *lkb)
1142
{
1143
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1144
1145
mutex_lock(&ls->ls_timeout_mutex);
1146
if (!list_empty(&lkb->lkb_time_list)) {
1147
list_del_init(&lkb->lkb_time_list);
1148
unhold_lkb(lkb);
1149
}
1150
mutex_unlock(&ls->ls_timeout_mutex);
1151
}
1152
1153
/* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1154
lkb_lksb_timeout without lock_rsb? Note: we can't lock timeout_mutex
1155
and then lock rsb because of lock ordering in add_timeout. We may need
1156
to specify some special timeout-related bits in the lkb that are just to
1157
be accessed under the timeout_mutex. */
1158
1159
void dlm_scan_timeout(struct dlm_ls *ls)
1160
{
1161
struct dlm_rsb *r;
1162
struct dlm_lkb *lkb;
1163
int do_cancel, do_warn;
1164
s64 wait_us;
1165
1166
for (;;) {
1167
if (dlm_locking_stopped(ls))
1168
break;
1169
1170
do_cancel = 0;
1171
do_warn = 0;
1172
mutex_lock(&ls->ls_timeout_mutex);
1173
list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1174
1175
wait_us = ktime_to_us(ktime_sub(ktime_get(),
1176
lkb->lkb_timestamp));
1177
1178
if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1179
wait_us >= (lkb->lkb_timeout_cs * 10000))
1180
do_cancel = 1;
1181
1182
if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1183
wait_us >= dlm_config.ci_timewarn_cs * 10000)
1184
do_warn = 1;
1185
1186
if (!do_cancel && !do_warn)
1187
continue;
1188
hold_lkb(lkb);
1189
break;
1190
}
1191
mutex_unlock(&ls->ls_timeout_mutex);
1192
1193
if (!do_cancel && !do_warn)
1194
break;
1195
1196
r = lkb->lkb_resource;
1197
hold_rsb(r);
1198
lock_rsb(r);
1199
1200
if (do_warn) {
1201
/* clear flag so we only warn once */
1202
lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1203
if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1204
del_timeout(lkb);
1205
dlm_timeout_warn(lkb);
1206
}
1207
1208
if (do_cancel) {
1209
log_debug(ls, "timeout cancel %x node %d %s",
1210
lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1211
lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1212
lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1213
del_timeout(lkb);
1214
_cancel_lock(r, lkb);
1215
}
1216
1217
unlock_rsb(r);
1218
unhold_rsb(r);
1219
dlm_put_lkb(lkb);
1220
}
1221
}
1222
1223
/* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1224
dlm_recoverd before checking/setting ls_recover_begin. */
1225
1226
void dlm_adjust_timeouts(struct dlm_ls *ls)
1227
{
1228
struct dlm_lkb *lkb;
1229
u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1230
1231
ls->ls_recover_begin = 0;
1232
mutex_lock(&ls->ls_timeout_mutex);
1233
list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1234
lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1235
mutex_unlock(&ls->ls_timeout_mutex);
1236
1237
if (!dlm_config.ci_waitwarn_us)
1238
return;
1239
1240
mutex_lock(&ls->ls_waiters_mutex);
1241
list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1242
if (ktime_to_us(lkb->lkb_wait_time))
1243
lkb->lkb_wait_time = ktime_get();
1244
}
1245
mutex_unlock(&ls->ls_waiters_mutex);
1246
}
1247
1248
/* lkb is master or local copy */
1249
1250
static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1251
{
1252
int b, len = r->res_ls->ls_lvblen;
1253
1254
/* b=1 lvb returned to caller
1255
b=0 lvb written to rsb or invalidated
1256
b=-1 do nothing */
1257
1258
b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1259
1260
if (b == 1) {
1261
if (!lkb->lkb_lvbptr)
1262
return;
1263
1264
if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1265
return;
1266
1267
if (!r->res_lvbptr)
1268
return;
1269
1270
memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1271
lkb->lkb_lvbseq = r->res_lvbseq;
1272
1273
} else if (b == 0) {
1274
if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1275
rsb_set_flag(r, RSB_VALNOTVALID);
1276
return;
1277
}
1278
1279
if (!lkb->lkb_lvbptr)
1280
return;
1281
1282
if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1283
return;
1284
1285
if (!r->res_lvbptr)
1286
r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1287
1288
if (!r->res_lvbptr)
1289
return;
1290
1291
memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1292
r->res_lvbseq++;
1293
lkb->lkb_lvbseq = r->res_lvbseq;
1294
rsb_clear_flag(r, RSB_VALNOTVALID);
1295
}
1296
1297
if (rsb_flag(r, RSB_VALNOTVALID))
1298
lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1299
}
1300
1301
static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1302
{
1303
if (lkb->lkb_grmode < DLM_LOCK_PW)
1304
return;
1305
1306
if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1307
rsb_set_flag(r, RSB_VALNOTVALID);
1308
return;
1309
}
1310
1311
if (!lkb->lkb_lvbptr)
1312
return;
1313
1314
if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1315
return;
1316
1317
if (!r->res_lvbptr)
1318
r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1319
1320
if (!r->res_lvbptr)
1321
return;
1322
1323
memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1324
r->res_lvbseq++;
1325
rsb_clear_flag(r, RSB_VALNOTVALID);
1326
}
1327
1328
/* lkb is process copy (pc) */
1329
1330
static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1331
struct dlm_message *ms)
1332
{
1333
int b;
1334
1335
if (!lkb->lkb_lvbptr)
1336
return;
1337
1338
if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1339
return;
1340
1341
b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1342
if (b == 1) {
1343
int len = receive_extralen(ms);
1344
if (len > DLM_RESNAME_MAXLEN)
1345
len = DLM_RESNAME_MAXLEN;
1346
memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1347
lkb->lkb_lvbseq = ms->m_lvbseq;
1348
}
1349
}
1350
1351
/* Manipulate lkb's on rsb's convert/granted/waiting queues
1352
remove_lock -- used for unlock, removes lkb from granted
1353
revert_lock -- used for cancel, moves lkb from convert to granted
1354
grant_lock -- used for request and convert, adds lkb to granted or
1355
moves lkb from convert or waiting to granted
1356
1357
Each of these is used for master or local copy lkb's. There is
1358
also a _pc() variation used to make the corresponding change on
1359
a process copy (pc) lkb. */
1360
1361
static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1362
{
1363
del_lkb(r, lkb);
1364
lkb->lkb_grmode = DLM_LOCK_IV;
1365
/* this unhold undoes the original ref from create_lkb()
1366
so this leads to the lkb being freed */
1367
unhold_lkb(lkb);
1368
}
1369
1370
static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1371
{
1372
set_lvb_unlock(r, lkb);
1373
_remove_lock(r, lkb);
1374
}
1375
1376
static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1377
{
1378
_remove_lock(r, lkb);
1379
}
1380
1381
/* returns: 0 did nothing
1382
1 moved lock to granted
1383
-1 removed lock */
1384
1385
static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1386
{
1387
int rv = 0;
1388
1389
lkb->lkb_rqmode = DLM_LOCK_IV;
1390
1391
switch (lkb->lkb_status) {
1392
case DLM_LKSTS_GRANTED:
1393
break;
1394
case DLM_LKSTS_CONVERT:
1395
move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1396
rv = 1;
1397
break;
1398
case DLM_LKSTS_WAITING:
1399
del_lkb(r, lkb);
1400
lkb->lkb_grmode = DLM_LOCK_IV;
1401
/* this unhold undoes the original ref from create_lkb()
1402
so this leads to the lkb being freed */
1403
unhold_lkb(lkb);
1404
rv = -1;
1405
break;
1406
default:
1407
log_print("invalid status for revert %d", lkb->lkb_status);
1408
}
1409
return rv;
1410
}
1411
1412
static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1413
{
1414
return revert_lock(r, lkb);
1415
}
1416
1417
static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1418
{
1419
if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1420
lkb->lkb_grmode = lkb->lkb_rqmode;
1421
if (lkb->lkb_status)
1422
move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1423
else
1424
add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1425
}
1426
1427
lkb->lkb_rqmode = DLM_LOCK_IV;
1428
}
1429
1430
static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1431
{
1432
set_lvb_lock(r, lkb);
1433
_grant_lock(r, lkb);
1434
lkb->lkb_highbast = 0;
1435
}
1436
1437
static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1438
struct dlm_message *ms)
1439
{
1440
set_lvb_lock_pc(r, lkb, ms);
1441
_grant_lock(r, lkb);
1442
}
1443
1444
/* called by grant_pending_locks() which means an async grant message must
1445
be sent to the requesting node in addition to granting the lock if the
1446
lkb belongs to a remote node. */
1447
1448
static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1449
{
1450
grant_lock(r, lkb);
1451
if (is_master_copy(lkb))
1452
send_grant(r, lkb);
1453
else
1454
queue_cast(r, lkb, 0);
1455
}
1456
1457
/* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1458
change the granted/requested modes. We're munging things accordingly in
1459
the process copy.
1460
CONVDEADLK: our grmode may have been forced down to NL to resolve a
1461
conversion deadlock
1462
ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1463
compatible with other granted locks */
1464
1465
static void munge_demoted(struct dlm_lkb *lkb)
1466
{
1467
if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1468
log_print("munge_demoted %x invalid modes gr %d rq %d",
1469
lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1470
return;
1471
}
1472
1473
lkb->lkb_grmode = DLM_LOCK_NL;
1474
}
1475
1476
static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1477
{
1478
if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1479
ms->m_type != DLM_MSG_GRANT) {
1480
log_print("munge_altmode %x invalid reply type %d",
1481
lkb->lkb_id, ms->m_type);
1482
return;
1483
}
1484
1485
if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1486
lkb->lkb_rqmode = DLM_LOCK_PR;
1487
else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1488
lkb->lkb_rqmode = DLM_LOCK_CW;
1489
else {
1490
log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1491
dlm_print_lkb(lkb);
1492
}
1493
}
1494
1495
static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1496
{
1497
struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1498
lkb_statequeue);
1499
if (lkb->lkb_id == first->lkb_id)
1500
return 1;
1501
1502
return 0;
1503
}
1504
1505
/* Check if the given lkb conflicts with another lkb on the queue. */
1506
1507
static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1508
{
1509
struct dlm_lkb *this;
1510
1511
list_for_each_entry(this, head, lkb_statequeue) {
1512
if (this == lkb)
1513
continue;
1514
if (!modes_compat(this, lkb))
1515
return 1;
1516
}
1517
return 0;
1518
}
1519
1520
/*
1521
* "A conversion deadlock arises with a pair of lock requests in the converting
1522
* queue for one resource. The granted mode of each lock blocks the requested
1523
* mode of the other lock."
1524
*
1525
* Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1526
* convert queue from being granted, then deadlk/demote lkb.
1527
*
1528
* Example:
1529
* Granted Queue: empty
1530
* Convert Queue: NL->EX (first lock)
1531
* PR->EX (second lock)
1532
*
1533
* The first lock can't be granted because of the granted mode of the second
1534
* lock and the second lock can't be granted because it's not first in the
1535
* list. We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1536
* demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1537
* flag set and return DEMOTED in the lksb flags.
1538
*
1539
* Originally, this function detected conv-deadlk in a more limited scope:
1540
* - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1541
* - if lkb1 was the first entry in the queue (not just earlier), and was
1542
* blocked by the granted mode of lkb2, and there was nothing on the
1543
* granted queue preventing lkb1 from being granted immediately, i.e.
1544
* lkb2 was the only thing preventing lkb1 from being granted.
1545
*
1546
* That second condition meant we'd only say there was conv-deadlk if
1547
* resolving it (by demotion) would lead to the first lock on the convert
1548
* queue being granted right away. It allowed conversion deadlocks to exist
1549
* between locks on the convert queue while they couldn't be granted anyway.
1550
*
1551
* Now, we detect and take action on conversion deadlocks immediately when
1552
* they're created, even if they may not be immediately consequential. If
1553
* lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1554
* mode that would prevent lkb1's conversion from being granted, we do a
1555
* deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1556
* I think this means that the lkb_is_ahead condition below should always
1557
* be zero, i.e. there will never be conv-deadlk between two locks that are
1558
* both already on the convert queue.
1559
*/
1560
1561
static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1562
{
1563
struct dlm_lkb *lkb1;
1564
int lkb_is_ahead = 0;
1565
1566
list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1567
if (lkb1 == lkb2) {
1568
lkb_is_ahead = 1;
1569
continue;
1570
}
1571
1572
if (!lkb_is_ahead) {
1573
if (!modes_compat(lkb2, lkb1))
1574
return 1;
1575
} else {
1576
if (!modes_compat(lkb2, lkb1) &&
1577
!modes_compat(lkb1, lkb2))
1578
return 1;
1579
}
1580
}
1581
return 0;
1582
}
1583
1584
/*
1585
* Return 1 if the lock can be granted, 0 otherwise.
1586
* Also detect and resolve conversion deadlocks.
1587
*
1588
* lkb is the lock to be granted
1589
*
1590
* now is 1 if the function is being called in the context of the
1591
* immediate request, it is 0 if called later, after the lock has been
1592
* queued.
1593
*
1594
* References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1595
*/
1596
1597
static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1598
{
1599
int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1600
1601
/*
1602
* 6-10: Version 5.4 introduced an option to address the phenomenon of
1603
* a new request for a NL mode lock being blocked.
1604
*
1605
* 6-11: If the optional EXPEDITE flag is used with the new NL mode
1606
* request, then it would be granted. In essence, the use of this flag
1607
* tells the Lock Manager to expedite theis request by not considering
1608
* what may be in the CONVERTING or WAITING queues... As of this
1609
* writing, the EXPEDITE flag can be used only with new requests for NL
1610
* mode locks. This flag is not valid for conversion requests.
1611
*
1612
* A shortcut. Earlier checks return an error if EXPEDITE is used in a
1613
* conversion or used with a non-NL requested mode. We also know an
1614
* EXPEDITE request is always granted immediately, so now must always
1615
* be 1. The full condition to grant an expedite request: (now &&
1616
* !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1617
* therefore be shortened to just checking the flag.
1618
*/
1619
1620
if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1621
return 1;
1622
1623
/*
1624
* A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1625
* added to the remaining conditions.
1626
*/
1627
1628
if (queue_conflict(&r->res_grantqueue, lkb))
1629
goto out;
1630
1631
/*
1632
* 6-3: By default, a conversion request is immediately granted if the
1633
* requested mode is compatible with the modes of all other granted
1634
* locks
1635
*/
1636
1637
if (queue_conflict(&r->res_convertqueue, lkb))
1638
goto out;
1639
1640
/*
1641
* 6-5: But the default algorithm for deciding whether to grant or
1642
* queue conversion requests does not by itself guarantee that such
1643
* requests are serviced on a "first come first serve" basis. This, in
1644
* turn, can lead to a phenomenon known as "indefinate postponement".
1645
*
1646
* 6-7: This issue is dealt with by using the optional QUECVT flag with
1647
* the system service employed to request a lock conversion. This flag
1648
* forces certain conversion requests to be queued, even if they are
1649
* compatible with the granted modes of other locks on the same
1650
* resource. Thus, the use of this flag results in conversion requests
1651
* being ordered on a "first come first servce" basis.
1652
*
1653
* DCT: This condition is all about new conversions being able to occur
1654
* "in place" while the lock remains on the granted queue (assuming
1655
* nothing else conflicts.) IOW if QUECVT isn't set, a conversion
1656
* doesn't _have_ to go onto the convert queue where it's processed in
1657
* order. The "now" variable is necessary to distinguish converts
1658
* being received and processed for the first time now, because once a
1659
* convert is moved to the conversion queue the condition below applies
1660
* requiring fifo granting.
1661
*/
1662
1663
if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1664
return 1;
1665
1666
/*
1667
* The NOORDER flag is set to avoid the standard vms rules on grant
1668
* order.
1669
*/
1670
1671
if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1672
return 1;
1673
1674
/*
1675
* 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1676
* granted until all other conversion requests ahead of it are granted
1677
* and/or canceled.
1678
*/
1679
1680
if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1681
return 1;
1682
1683
/*
1684
* 6-4: By default, a new request is immediately granted only if all
1685
* three of the following conditions are satisfied when the request is
1686
* issued:
1687
* - The queue of ungranted conversion requests for the resource is
1688
* empty.
1689
* - The queue of ungranted new requests for the resource is empty.
1690
* - The mode of the new request is compatible with the most
1691
* restrictive mode of all granted locks on the resource.
1692
*/
1693
1694
if (now && !conv && list_empty(&r->res_convertqueue) &&
1695
list_empty(&r->res_waitqueue))
1696
return 1;
1697
1698
/*
1699
* 6-4: Once a lock request is in the queue of ungranted new requests,
1700
* it cannot be granted until the queue of ungranted conversion
1701
* requests is empty, all ungranted new requests ahead of it are
1702
* granted and/or canceled, and it is compatible with the granted mode
1703
* of the most restrictive lock granted on the resource.
1704
*/
1705
1706
if (!now && !conv && list_empty(&r->res_convertqueue) &&
1707
first_in_list(lkb, &r->res_waitqueue))
1708
return 1;
1709
out:
1710
return 0;
1711
}
1712
1713
static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1714
int *err)
1715
{
1716
int rv;
1717
int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1718
int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1719
1720
if (err)
1721
*err = 0;
1722
1723
rv = _can_be_granted(r, lkb, now);
1724
if (rv)
1725
goto out;
1726
1727
/*
1728
* The CONVDEADLK flag is non-standard and tells the dlm to resolve
1729
* conversion deadlocks by demoting grmode to NL, otherwise the dlm
1730
* cancels one of the locks.
1731
*/
1732
1733
if (is_convert && can_be_queued(lkb) &&
1734
conversion_deadlock_detect(r, lkb)) {
1735
if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1736
lkb->lkb_grmode = DLM_LOCK_NL;
1737
lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1738
} else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1739
if (err)
1740
*err = -EDEADLK;
1741
else {
1742
log_print("can_be_granted deadlock %x now %d",
1743
lkb->lkb_id, now);
1744
dlm_dump_rsb(r);
1745
}
1746
}
1747
goto out;
1748
}
1749
1750
/*
1751
* The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1752
* to grant a request in a mode other than the normal rqmode. It's a
1753
* simple way to provide a big optimization to applications that can
1754
* use them.
1755
*/
1756
1757
if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1758
alt = DLM_LOCK_PR;
1759
else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1760
alt = DLM_LOCK_CW;
1761
1762
if (alt) {
1763
lkb->lkb_rqmode = alt;
1764
rv = _can_be_granted(r, lkb, now);
1765
if (rv)
1766
lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1767
else
1768
lkb->lkb_rqmode = rqmode;
1769
}
1770
out:
1771
return rv;
1772
}
1773
1774
/* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1775
for locks pending on the convert list. Once verified (watch for these
1776
log_prints), we should be able to just call _can_be_granted() and not
1777
bother with the demote/deadlk cases here (and there's no easy way to deal
1778
with a deadlk here, we'd have to generate something like grant_lock with
1779
the deadlk error.) */
1780
1781
/* Returns the highest requested mode of all blocked conversions; sets
1782
cw if there's a blocked conversion to DLM_LOCK_CW. */
1783
1784
static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1785
{
1786
struct dlm_lkb *lkb, *s;
1787
int hi, demoted, quit, grant_restart, demote_restart;
1788
int deadlk;
1789
1790
quit = 0;
1791
restart:
1792
grant_restart = 0;
1793
demote_restart = 0;
1794
hi = DLM_LOCK_IV;
1795
1796
list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1797
demoted = is_demoted(lkb);
1798
deadlk = 0;
1799
1800
if (can_be_granted(r, lkb, 0, &deadlk)) {
1801
grant_lock_pending(r, lkb);
1802
grant_restart = 1;
1803
continue;
1804
}
1805
1806
if (!demoted && is_demoted(lkb)) {
1807
log_print("WARN: pending demoted %x node %d %s",
1808
lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1809
demote_restart = 1;
1810
continue;
1811
}
1812
1813
if (deadlk) {
1814
log_print("WARN: pending deadlock %x node %d %s",
1815
lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1816
dlm_dump_rsb(r);
1817
continue;
1818
}
1819
1820
hi = max_t(int, lkb->lkb_rqmode, hi);
1821
1822
if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1823
*cw = 1;
1824
}
1825
1826
if (grant_restart)
1827
goto restart;
1828
if (demote_restart && !quit) {
1829
quit = 1;
1830
goto restart;
1831
}
1832
1833
return max_t(int, high, hi);
1834
}
1835
1836
static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1837
{
1838
struct dlm_lkb *lkb, *s;
1839
1840
list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1841
if (can_be_granted(r, lkb, 0, NULL))
1842
grant_lock_pending(r, lkb);
1843
else {
1844
high = max_t(int, lkb->lkb_rqmode, high);
1845
if (lkb->lkb_rqmode == DLM_LOCK_CW)
1846
*cw = 1;
1847
}
1848
}
1849
1850
return high;
1851
}
1852
1853
/* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1854
on either the convert or waiting queue.
1855
high is the largest rqmode of all locks blocked on the convert or
1856
waiting queue. */
1857
1858
static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1859
{
1860
if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1861
if (gr->lkb_highbast < DLM_LOCK_EX)
1862
return 1;
1863
return 0;
1864
}
1865
1866
if (gr->lkb_highbast < high &&
1867
!__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1868
return 1;
1869
return 0;
1870
}
1871
1872
static void grant_pending_locks(struct dlm_rsb *r)
1873
{
1874
struct dlm_lkb *lkb, *s;
1875
int high = DLM_LOCK_IV;
1876
int cw = 0;
1877
1878
DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1879
1880
high = grant_pending_convert(r, high, &cw);
1881
high = grant_pending_wait(r, high, &cw);
1882
1883
if (high == DLM_LOCK_IV)
1884
return;
1885
1886
/*
1887
* If there are locks left on the wait/convert queue then send blocking
1888
* ASTs to granted locks based on the largest requested mode (high)
1889
* found above.
1890
*/
1891
1892
list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1893
if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
1894
if (cw && high == DLM_LOCK_PR &&
1895
lkb->lkb_grmode == DLM_LOCK_PR)
1896
queue_bast(r, lkb, DLM_LOCK_CW);
1897
else
1898
queue_bast(r, lkb, high);
1899
lkb->lkb_highbast = high;
1900
}
1901
}
1902
}
1903
1904
static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1905
{
1906
if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1907
(gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1908
if (gr->lkb_highbast < DLM_LOCK_EX)
1909
return 1;
1910
return 0;
1911
}
1912
1913
if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1914
return 1;
1915
return 0;
1916
}
1917
1918
static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1919
struct dlm_lkb *lkb)
1920
{
1921
struct dlm_lkb *gr;
1922
1923
list_for_each_entry(gr, head, lkb_statequeue) {
1924
/* skip self when sending basts to convertqueue */
1925
if (gr == lkb)
1926
continue;
1927
if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
1928
queue_bast(r, gr, lkb->lkb_rqmode);
1929
gr->lkb_highbast = lkb->lkb_rqmode;
1930
}
1931
}
1932
}
1933
1934
static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1935
{
1936
send_bast_queue(r, &r->res_grantqueue, lkb);
1937
}
1938
1939
static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1940
{
1941
send_bast_queue(r, &r->res_grantqueue, lkb);
1942
send_bast_queue(r, &r->res_convertqueue, lkb);
1943
}
1944
1945
/* set_master(r, lkb) -- set the master nodeid of a resource
1946
1947
The purpose of this function is to set the nodeid field in the given
1948
lkb using the nodeid field in the given rsb. If the rsb's nodeid is
1949
known, it can just be copied to the lkb and the function will return
1950
0. If the rsb's nodeid is _not_ known, it needs to be looked up
1951
before it can be copied to the lkb.
1952
1953
When the rsb nodeid is being looked up remotely, the initial lkb
1954
causing the lookup is kept on the ls_waiters list waiting for the
1955
lookup reply. Other lkb's waiting for the same rsb lookup are kept
1956
on the rsb's res_lookup list until the master is verified.
1957
1958
Return values:
1959
0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1960
1: the rsb master is not available and the lkb has been placed on
1961
a wait queue
1962
*/
1963
1964
static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1965
{
1966
struct dlm_ls *ls = r->res_ls;
1967
int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1968
1969
if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1970
rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1971
r->res_first_lkid = lkb->lkb_id;
1972
lkb->lkb_nodeid = r->res_nodeid;
1973
return 0;
1974
}
1975
1976
if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1977
list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1978
return 1;
1979
}
1980
1981
if (r->res_nodeid == 0) {
1982
lkb->lkb_nodeid = 0;
1983
return 0;
1984
}
1985
1986
if (r->res_nodeid > 0) {
1987
lkb->lkb_nodeid = r->res_nodeid;
1988
return 0;
1989
}
1990
1991
DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1992
1993
dir_nodeid = dlm_dir_nodeid(r);
1994
1995
if (dir_nodeid != our_nodeid) {
1996
r->res_first_lkid = lkb->lkb_id;
1997
send_lookup(r, lkb);
1998
return 1;
1999
}
2000
2001
for (i = 0; i < 2; i++) {
2002
/* It's possible for dlm_scand to remove an old rsb for
2003
this same resource from the toss list, us to create
2004
a new one, look up the master locally, and find it
2005
already exists just before dlm_scand does the
2006
dir_remove() on the previous rsb. */
2007
2008
error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
2009
r->res_length, &ret_nodeid);
2010
if (!error)
2011
break;
2012
log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
2013
schedule();
2014
}
2015
if (error && error != -EEXIST)
2016
return error;
2017
2018
if (ret_nodeid == our_nodeid) {
2019
r->res_first_lkid = 0;
2020
r->res_nodeid = 0;
2021
lkb->lkb_nodeid = 0;
2022
} else {
2023
r->res_first_lkid = lkb->lkb_id;
2024
r->res_nodeid = ret_nodeid;
2025
lkb->lkb_nodeid = ret_nodeid;
2026
}
2027
return 0;
2028
}
2029
2030
static void process_lookup_list(struct dlm_rsb *r)
2031
{
2032
struct dlm_lkb *lkb, *safe;
2033
2034
list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2035
list_del_init(&lkb->lkb_rsb_lookup);
2036
_request_lock(r, lkb);
2037
schedule();
2038
}
2039
}
2040
2041
/* confirm_master -- confirm (or deny) an rsb's master nodeid */
2042
2043
static void confirm_master(struct dlm_rsb *r, int error)
2044
{
2045
struct dlm_lkb *lkb;
2046
2047
if (!r->res_first_lkid)
2048
return;
2049
2050
switch (error) {
2051
case 0:
2052
case -EINPROGRESS:
2053
r->res_first_lkid = 0;
2054
process_lookup_list(r);
2055
break;
2056
2057
case -EAGAIN:
2058
case -EBADR:
2059
case -ENOTBLK:
2060
/* the remote request failed and won't be retried (it was
2061
a NOQUEUE, or has been canceled/unlocked); make a waiting
2062
lkb the first_lkid */
2063
2064
r->res_first_lkid = 0;
2065
2066
if (!list_empty(&r->res_lookup)) {
2067
lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2068
lkb_rsb_lookup);
2069
list_del_init(&lkb->lkb_rsb_lookup);
2070
r->res_first_lkid = lkb->lkb_id;
2071
_request_lock(r, lkb);
2072
}
2073
break;
2074
2075
default:
2076
log_error(r->res_ls, "confirm_master unknown error %d", error);
2077
}
2078
}
2079
2080
static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2081
int namelen, unsigned long timeout_cs,
2082
void (*ast) (void *astparam),
2083
void *astparam,
2084
void (*bast) (void *astparam, int mode),
2085
struct dlm_args *args)
2086
{
2087
int rv = -EINVAL;
2088
2089
/* check for invalid arg usage */
2090
2091
if (mode < 0 || mode > DLM_LOCK_EX)
2092
goto out;
2093
2094
if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2095
goto out;
2096
2097
if (flags & DLM_LKF_CANCEL)
2098
goto out;
2099
2100
if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2101
goto out;
2102
2103
if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2104
goto out;
2105
2106
if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2107
goto out;
2108
2109
if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2110
goto out;
2111
2112
if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2113
goto out;
2114
2115
if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2116
goto out;
2117
2118
if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2119
goto out;
2120
2121
if (!ast || !lksb)
2122
goto out;
2123
2124
if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2125
goto out;
2126
2127
if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2128
goto out;
2129
2130
/* these args will be copied to the lkb in validate_lock_args,
2131
it cannot be done now because when converting locks, fields in
2132
an active lkb cannot be modified before locking the rsb */
2133
2134
args->flags = flags;
2135
args->astfn = ast;
2136
args->astparam = astparam;
2137
args->bastfn = bast;
2138
args->timeout = timeout_cs;
2139
args->mode = mode;
2140
args->lksb = lksb;
2141
rv = 0;
2142
out:
2143
return rv;
2144
}
2145
2146
static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2147
{
2148
if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2149
DLM_LKF_FORCEUNLOCK))
2150
return -EINVAL;
2151
2152
if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2153
return -EINVAL;
2154
2155
args->flags = flags;
2156
args->astparam = astarg;
2157
return 0;
2158
}
2159
2160
static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2161
struct dlm_args *args)
2162
{
2163
int rv = -EINVAL;
2164
2165
if (args->flags & DLM_LKF_CONVERT) {
2166
if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2167
goto out;
2168
2169
if (args->flags & DLM_LKF_QUECVT &&
2170
!__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2171
goto out;
2172
2173
rv = -EBUSY;
2174
if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2175
goto out;
2176
2177
if (lkb->lkb_wait_type)
2178
goto out;
2179
2180
if (is_overlap(lkb))
2181
goto out;
2182
}
2183
2184
lkb->lkb_exflags = args->flags;
2185
lkb->lkb_sbflags = 0;
2186
lkb->lkb_astfn = args->astfn;
2187
lkb->lkb_astparam = args->astparam;
2188
lkb->lkb_bastfn = args->bastfn;
2189
lkb->lkb_rqmode = args->mode;
2190
lkb->lkb_lksb = args->lksb;
2191
lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2192
lkb->lkb_ownpid = (int) current->pid;
2193
lkb->lkb_timeout_cs = args->timeout;
2194
rv = 0;
2195
out:
2196
if (rv)
2197
log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2198
rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2199
lkb->lkb_status, lkb->lkb_wait_type,
2200
lkb->lkb_resource->res_name);
2201
return rv;
2202
}
2203
2204
/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2205
for success */
2206
2207
/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2208
because there may be a lookup in progress and it's valid to do
2209
cancel/unlockf on it */
2210
2211
static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2212
{
2213
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2214
int rv = -EINVAL;
2215
2216
if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2217
log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2218
dlm_print_lkb(lkb);
2219
goto out;
2220
}
2221
2222
/* an lkb may still exist even though the lock is EOL'ed due to a
2223
cancel, unlock or failed noqueue request; an app can't use these
2224
locks; return same error as if the lkid had not been found at all */
2225
2226
if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2227
log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2228
rv = -ENOENT;
2229
goto out;
2230
}
2231
2232
/* an lkb may be waiting for an rsb lookup to complete where the
2233
lookup was initiated by another lock */
2234
2235
if (!list_empty(&lkb->lkb_rsb_lookup)) {
2236
if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2237
log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2238
list_del_init(&lkb->lkb_rsb_lookup);
2239
queue_cast(lkb->lkb_resource, lkb,
2240
args->flags & DLM_LKF_CANCEL ?
2241
-DLM_ECANCEL : -DLM_EUNLOCK);
2242
unhold_lkb(lkb); /* undoes create_lkb() */
2243
}
2244
/* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2245
rv = -EBUSY;
2246
goto out;
2247
}
2248
2249
/* cancel not allowed with another cancel/unlock in progress */
2250
2251
if (args->flags & DLM_LKF_CANCEL) {
2252
if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2253
goto out;
2254
2255
if (is_overlap(lkb))
2256
goto out;
2257
2258
/* don't let scand try to do a cancel */
2259
del_timeout(lkb);
2260
2261
if (lkb->lkb_flags & DLM_IFL_RESEND) {
2262
lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2263
rv = -EBUSY;
2264
goto out;
2265
}
2266
2267
/* there's nothing to cancel */
2268
if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2269
!lkb->lkb_wait_type) {
2270
rv = -EBUSY;
2271
goto out;
2272
}
2273
2274
switch (lkb->lkb_wait_type) {
2275
case DLM_MSG_LOOKUP:
2276
case DLM_MSG_REQUEST:
2277
lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2278
rv = -EBUSY;
2279
goto out;
2280
case DLM_MSG_UNLOCK:
2281
case DLM_MSG_CANCEL:
2282
goto out;
2283
}
2284
/* add_to_waiters() will set OVERLAP_CANCEL */
2285
goto out_ok;
2286
}
2287
2288
/* do we need to allow a force-unlock if there's a normal unlock
2289
already in progress? in what conditions could the normal unlock
2290
fail such that we'd want to send a force-unlock to be sure? */
2291
2292
if (args->flags & DLM_LKF_FORCEUNLOCK) {
2293
if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2294
goto out;
2295
2296
if (is_overlap_unlock(lkb))
2297
goto out;
2298
2299
/* don't let scand try to do a cancel */
2300
del_timeout(lkb);
2301
2302
if (lkb->lkb_flags & DLM_IFL_RESEND) {
2303
lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2304
rv = -EBUSY;
2305
goto out;
2306
}
2307
2308
switch (lkb->lkb_wait_type) {
2309
case DLM_MSG_LOOKUP:
2310
case DLM_MSG_REQUEST:
2311
lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2312
rv = -EBUSY;
2313
goto out;
2314
case DLM_MSG_UNLOCK:
2315
goto out;
2316
}
2317
/* add_to_waiters() will set OVERLAP_UNLOCK */
2318
goto out_ok;
2319
}
2320
2321
/* normal unlock not allowed if there's any op in progress */
2322
rv = -EBUSY;
2323
if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2324
goto out;
2325
2326
out_ok:
2327
/* an overlapping op shouldn't blow away exflags from other op */
2328
lkb->lkb_exflags |= args->flags;
2329
lkb->lkb_sbflags = 0;
2330
lkb->lkb_astparam = args->astparam;
2331
rv = 0;
2332
out:
2333
if (rv)
2334
log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2335
lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2336
args->flags, lkb->lkb_wait_type,
2337
lkb->lkb_resource->res_name);
2338
return rv;
2339
}
2340
2341
/*
2342
* Four stage 4 varieties:
2343
* do_request(), do_convert(), do_unlock(), do_cancel()
2344
* These are called on the master node for the given lock and
2345
* from the central locking logic.
2346
*/
2347
2348
static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2349
{
2350
int error = 0;
2351
2352
if (can_be_granted(r, lkb, 1, NULL)) {
2353
grant_lock(r, lkb);
2354
queue_cast(r, lkb, 0);
2355
goto out;
2356
}
2357
2358
if (can_be_queued(lkb)) {
2359
error = -EINPROGRESS;
2360
add_lkb(r, lkb, DLM_LKSTS_WAITING);
2361
add_timeout(lkb);
2362
goto out;
2363
}
2364
2365
error = -EAGAIN;
2366
queue_cast(r, lkb, -EAGAIN);
2367
out:
2368
return error;
2369
}
2370
2371
static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2372
int error)
2373
{
2374
switch (error) {
2375
case -EAGAIN:
2376
if (force_blocking_asts(lkb))
2377
send_blocking_asts_all(r, lkb);
2378
break;
2379
case -EINPROGRESS:
2380
send_blocking_asts(r, lkb);
2381
break;
2382
}
2383
}
2384
2385
static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2386
{
2387
int error = 0;
2388
int deadlk = 0;
2389
2390
/* changing an existing lock may allow others to be granted */
2391
2392
if (can_be_granted(r, lkb, 1, &deadlk)) {
2393
grant_lock(r, lkb);
2394
queue_cast(r, lkb, 0);
2395
goto out;
2396
}
2397
2398
/* can_be_granted() detected that this lock would block in a conversion
2399
deadlock, so we leave it on the granted queue and return EDEADLK in
2400
the ast for the convert. */
2401
2402
if (deadlk) {
2403
/* it's left on the granted queue */
2404
log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2405
lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2406
lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2407
revert_lock(r, lkb);
2408
queue_cast(r, lkb, -EDEADLK);
2409
error = -EDEADLK;
2410
goto out;
2411
}
2412
2413
/* is_demoted() means the can_be_granted() above set the grmode
2414
to NL, and left us on the granted queue. This auto-demotion
2415
(due to CONVDEADLK) might mean other locks, and/or this lock, are
2416
now grantable. We have to try to grant other converting locks
2417
before we try again to grant this one. */
2418
2419
if (is_demoted(lkb)) {
2420
grant_pending_convert(r, DLM_LOCK_IV, NULL);
2421
if (_can_be_granted(r, lkb, 1)) {
2422
grant_lock(r, lkb);
2423
queue_cast(r, lkb, 0);
2424
goto out;
2425
}
2426
/* else fall through and move to convert queue */
2427
}
2428
2429
if (can_be_queued(lkb)) {
2430
error = -EINPROGRESS;
2431
del_lkb(r, lkb);
2432
add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2433
add_timeout(lkb);
2434
goto out;
2435
}
2436
2437
error = -EAGAIN;
2438
queue_cast(r, lkb, -EAGAIN);
2439
out:
2440
return error;
2441
}
2442
2443
static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2444
int error)
2445
{
2446
switch (error) {
2447
case 0:
2448
grant_pending_locks(r);
2449
/* grant_pending_locks also sends basts */
2450
break;
2451
case -EAGAIN:
2452
if (force_blocking_asts(lkb))
2453
send_blocking_asts_all(r, lkb);
2454
break;
2455
case -EINPROGRESS:
2456
send_blocking_asts(r, lkb);
2457
break;
2458
}
2459
}
2460
2461
static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2462
{
2463
remove_lock(r, lkb);
2464
queue_cast(r, lkb, -DLM_EUNLOCK);
2465
return -DLM_EUNLOCK;
2466
}
2467
2468
static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2469
int error)
2470
{
2471
grant_pending_locks(r);
2472
}
2473
2474
/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2475
2476
static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2477
{
2478
int error;
2479
2480
error = revert_lock(r, lkb);
2481
if (error) {
2482
queue_cast(r, lkb, -DLM_ECANCEL);
2483
return -DLM_ECANCEL;
2484
}
2485
return 0;
2486
}
2487
2488
static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2489
int error)
2490
{
2491
if (error)
2492
grant_pending_locks(r);
2493
}
2494
2495
/*
2496
* Four stage 3 varieties:
2497
* _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2498
*/
2499
2500
/* add a new lkb to a possibly new rsb, called by requesting process */
2501
2502
static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2503
{
2504
int error;
2505
2506
/* set_master: sets lkb nodeid from r */
2507
2508
error = set_master(r, lkb);
2509
if (error < 0)
2510
goto out;
2511
if (error) {
2512
error = 0;
2513
goto out;
2514
}
2515
2516
if (is_remote(r)) {
2517
/* receive_request() calls do_request() on remote node */
2518
error = send_request(r, lkb);
2519
} else {
2520
error = do_request(r, lkb);
2521
/* for remote locks the request_reply is sent
2522
between do_request and do_request_effects */
2523
do_request_effects(r, lkb, error);
2524
}
2525
out:
2526
return error;
2527
}
2528
2529
/* change some property of an existing lkb, e.g. mode */
2530
2531
static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2532
{
2533
int error;
2534
2535
if (is_remote(r)) {
2536
/* receive_convert() calls do_convert() on remote node */
2537
error = send_convert(r, lkb);
2538
} else {
2539
error = do_convert(r, lkb);
2540
/* for remote locks the convert_reply is sent
2541
between do_convert and do_convert_effects */
2542
do_convert_effects(r, lkb, error);
2543
}
2544
2545
return error;
2546
}
2547
2548
/* remove an existing lkb from the granted queue */
2549
2550
static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2551
{
2552
int error;
2553
2554
if (is_remote(r)) {
2555
/* receive_unlock() calls do_unlock() on remote node */
2556
error = send_unlock(r, lkb);
2557
} else {
2558
error = do_unlock(r, lkb);
2559
/* for remote locks the unlock_reply is sent
2560
between do_unlock and do_unlock_effects */
2561
do_unlock_effects(r, lkb, error);
2562
}
2563
2564
return error;
2565
}
2566
2567
/* remove an existing lkb from the convert or wait queue */
2568
2569
static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2570
{
2571
int error;
2572
2573
if (is_remote(r)) {
2574
/* receive_cancel() calls do_cancel() on remote node */
2575
error = send_cancel(r, lkb);
2576
} else {
2577
error = do_cancel(r, lkb);
2578
/* for remote locks the cancel_reply is sent
2579
between do_cancel and do_cancel_effects */
2580
do_cancel_effects(r, lkb, error);
2581
}
2582
2583
return error;
2584
}
2585
2586
/*
2587
* Four stage 2 varieties:
2588
* request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2589
*/
2590
2591
static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2592
int len, struct dlm_args *args)
2593
{
2594
struct dlm_rsb *r;
2595
int error;
2596
2597
error = validate_lock_args(ls, lkb, args);
2598
if (error)
2599
goto out;
2600
2601
error = find_rsb(ls, name, len, R_CREATE, &r);
2602
if (error)
2603
goto out;
2604
2605
lock_rsb(r);
2606
2607
attach_lkb(r, lkb);
2608
lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2609
2610
error = _request_lock(r, lkb);
2611
2612
unlock_rsb(r);
2613
put_rsb(r);
2614
2615
out:
2616
return error;
2617
}
2618
2619
static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2620
struct dlm_args *args)
2621
{
2622
struct dlm_rsb *r;
2623
int error;
2624
2625
r = lkb->lkb_resource;
2626
2627
hold_rsb(r);
2628
lock_rsb(r);
2629
2630
error = validate_lock_args(ls, lkb, args);
2631
if (error)
2632
goto out;
2633
2634
error = _convert_lock(r, lkb);
2635
out:
2636
unlock_rsb(r);
2637
put_rsb(r);
2638
return error;
2639
}
2640
2641
static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2642
struct dlm_args *args)
2643
{
2644
struct dlm_rsb *r;
2645
int error;
2646
2647
r = lkb->lkb_resource;
2648
2649
hold_rsb(r);
2650
lock_rsb(r);
2651
2652
error = validate_unlock_args(lkb, args);
2653
if (error)
2654
goto out;
2655
2656
error = _unlock_lock(r, lkb);
2657
out:
2658
unlock_rsb(r);
2659
put_rsb(r);
2660
return error;
2661
}
2662
2663
static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2664
struct dlm_args *args)
2665
{
2666
struct dlm_rsb *r;
2667
int error;
2668
2669
r = lkb->lkb_resource;
2670
2671
hold_rsb(r);
2672
lock_rsb(r);
2673
2674
error = validate_unlock_args(lkb, args);
2675
if (error)
2676
goto out;
2677
2678
error = _cancel_lock(r, lkb);
2679
out:
2680
unlock_rsb(r);
2681
put_rsb(r);
2682
return error;
2683
}
2684
2685
/*
2686
* Two stage 1 varieties: dlm_lock() and dlm_unlock()
2687
*/
2688
2689
int dlm_lock(dlm_lockspace_t *lockspace,
2690
int mode,
2691
struct dlm_lksb *lksb,
2692
uint32_t flags,
2693
void *name,
2694
unsigned int namelen,
2695
uint32_t parent_lkid,
2696
void (*ast) (void *astarg),
2697
void *astarg,
2698
void (*bast) (void *astarg, int mode))
2699
{
2700
struct dlm_ls *ls;
2701
struct dlm_lkb *lkb;
2702
struct dlm_args args;
2703
int error, convert = flags & DLM_LKF_CONVERT;
2704
2705
ls = dlm_find_lockspace_local(lockspace);
2706
if (!ls)
2707
return -EINVAL;
2708
2709
dlm_lock_recovery(ls);
2710
2711
if (convert)
2712
error = find_lkb(ls, lksb->sb_lkid, &lkb);
2713
else
2714
error = create_lkb(ls, &lkb);
2715
2716
if (error)
2717
goto out;
2718
2719
error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2720
astarg, bast, &args);
2721
if (error)
2722
goto out_put;
2723
2724
if (convert)
2725
error = convert_lock(ls, lkb, &args);
2726
else
2727
error = request_lock(ls, lkb, name, namelen, &args);
2728
2729
if (error == -EINPROGRESS)
2730
error = 0;
2731
out_put:
2732
if (convert || error)
2733
__put_lkb(ls, lkb);
2734
if (error == -EAGAIN || error == -EDEADLK)
2735
error = 0;
2736
out:
2737
dlm_unlock_recovery(ls);
2738
dlm_put_lockspace(ls);
2739
return error;
2740
}
2741
2742
int dlm_unlock(dlm_lockspace_t *lockspace,
2743
uint32_t lkid,
2744
uint32_t flags,
2745
struct dlm_lksb *lksb,
2746
void *astarg)
2747
{
2748
struct dlm_ls *ls;
2749
struct dlm_lkb *lkb;
2750
struct dlm_args args;
2751
int error;
2752
2753
ls = dlm_find_lockspace_local(lockspace);
2754
if (!ls)
2755
return -EINVAL;
2756
2757
dlm_lock_recovery(ls);
2758
2759
error = find_lkb(ls, lkid, &lkb);
2760
if (error)
2761
goto out;
2762
2763
error = set_unlock_args(flags, astarg, &args);
2764
if (error)
2765
goto out_put;
2766
2767
if (flags & DLM_LKF_CANCEL)
2768
error = cancel_lock(ls, lkb, &args);
2769
else
2770
error = unlock_lock(ls, lkb, &args);
2771
2772
if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2773
error = 0;
2774
if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2775
error = 0;
2776
out_put:
2777
dlm_put_lkb(lkb);
2778
out:
2779
dlm_unlock_recovery(ls);
2780
dlm_put_lockspace(ls);
2781
return error;
2782
}
2783
2784
/*
2785
* send/receive routines for remote operations and replies
2786
*
2787
* send_args
2788
* send_common
2789
* send_request receive_request
2790
* send_convert receive_convert
2791
* send_unlock receive_unlock
2792
* send_cancel receive_cancel
2793
* send_grant receive_grant
2794
* send_bast receive_bast
2795
* send_lookup receive_lookup
2796
* send_remove receive_remove
2797
*
2798
* send_common_reply
2799
* receive_request_reply send_request_reply
2800
* receive_convert_reply send_convert_reply
2801
* receive_unlock_reply send_unlock_reply
2802
* receive_cancel_reply send_cancel_reply
2803
* receive_lookup_reply send_lookup_reply
2804
*/
2805
2806
static int _create_message(struct dlm_ls *ls, int mb_len,
2807
int to_nodeid, int mstype,
2808
struct dlm_message **ms_ret,
2809
struct dlm_mhandle **mh_ret)
2810
{
2811
struct dlm_message *ms;
2812
struct dlm_mhandle *mh;
2813
char *mb;
2814
2815
/* get_buffer gives us a message handle (mh) that we need to
2816
pass into lowcomms_commit and a message buffer (mb) that we
2817
write our data into */
2818
2819
mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
2820
if (!mh)
2821
return -ENOBUFS;
2822
2823
memset(mb, 0, mb_len);
2824
2825
ms = (struct dlm_message *) mb;
2826
2827
ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2828
ms->m_header.h_lockspace = ls->ls_global_id;
2829
ms->m_header.h_nodeid = dlm_our_nodeid();
2830
ms->m_header.h_length = mb_len;
2831
ms->m_header.h_cmd = DLM_MSG;
2832
2833
ms->m_type = mstype;
2834
2835
*mh_ret = mh;
2836
*ms_ret = ms;
2837
return 0;
2838
}
2839
2840
static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2841
int to_nodeid, int mstype,
2842
struct dlm_message **ms_ret,
2843
struct dlm_mhandle **mh_ret)
2844
{
2845
int mb_len = sizeof(struct dlm_message);
2846
2847
switch (mstype) {
2848
case DLM_MSG_REQUEST:
2849
case DLM_MSG_LOOKUP:
2850
case DLM_MSG_REMOVE:
2851
mb_len += r->res_length;
2852
break;
2853
case DLM_MSG_CONVERT:
2854
case DLM_MSG_UNLOCK:
2855
case DLM_MSG_REQUEST_REPLY:
2856
case DLM_MSG_CONVERT_REPLY:
2857
case DLM_MSG_GRANT:
2858
if (lkb && lkb->lkb_lvbptr)
2859
mb_len += r->res_ls->ls_lvblen;
2860
break;
2861
}
2862
2863
return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2864
ms_ret, mh_ret);
2865
}
2866
2867
/* further lowcomms enhancements or alternate implementations may make
2868
the return value from this function useful at some point */
2869
2870
static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2871
{
2872
dlm_message_out(ms);
2873
dlm_lowcomms_commit_buffer(mh);
2874
return 0;
2875
}
2876
2877
static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2878
struct dlm_message *ms)
2879
{
2880
ms->m_nodeid = lkb->lkb_nodeid;
2881
ms->m_pid = lkb->lkb_ownpid;
2882
ms->m_lkid = lkb->lkb_id;
2883
ms->m_remid = lkb->lkb_remid;
2884
ms->m_exflags = lkb->lkb_exflags;
2885
ms->m_sbflags = lkb->lkb_sbflags;
2886
ms->m_flags = lkb->lkb_flags;
2887
ms->m_lvbseq = lkb->lkb_lvbseq;
2888
ms->m_status = lkb->lkb_status;
2889
ms->m_grmode = lkb->lkb_grmode;
2890
ms->m_rqmode = lkb->lkb_rqmode;
2891
ms->m_hash = r->res_hash;
2892
2893
/* m_result and m_bastmode are set from function args,
2894
not from lkb fields */
2895
2896
if (lkb->lkb_bastfn)
2897
ms->m_asts |= DLM_CB_BAST;
2898
if (lkb->lkb_astfn)
2899
ms->m_asts |= DLM_CB_CAST;
2900
2901
/* compare with switch in create_message; send_remove() doesn't
2902
use send_args() */
2903
2904
switch (ms->m_type) {
2905
case DLM_MSG_REQUEST:
2906
case DLM_MSG_LOOKUP:
2907
memcpy(ms->m_extra, r->res_name, r->res_length);
2908
break;
2909
case DLM_MSG_CONVERT:
2910
case DLM_MSG_UNLOCK:
2911
case DLM_MSG_REQUEST_REPLY:
2912
case DLM_MSG_CONVERT_REPLY:
2913
case DLM_MSG_GRANT:
2914
if (!lkb->lkb_lvbptr)
2915
break;
2916
memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2917
break;
2918
}
2919
}
2920
2921
static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2922
{
2923
struct dlm_message *ms;
2924
struct dlm_mhandle *mh;
2925
int to_nodeid, error;
2926
2927
to_nodeid = r->res_nodeid;
2928
2929
error = add_to_waiters(lkb, mstype, to_nodeid);
2930
if (error)
2931
return error;
2932
2933
error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2934
if (error)
2935
goto fail;
2936
2937
send_args(r, lkb, ms);
2938
2939
error = send_message(mh, ms);
2940
if (error)
2941
goto fail;
2942
return 0;
2943
2944
fail:
2945
remove_from_waiters(lkb, msg_reply_type(mstype));
2946
return error;
2947
}
2948
2949
static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2950
{
2951
return send_common(r, lkb, DLM_MSG_REQUEST);
2952
}
2953
2954
static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2955
{
2956
int error;
2957
2958
error = send_common(r, lkb, DLM_MSG_CONVERT);
2959
2960
/* down conversions go without a reply from the master */
2961
if (!error && down_conversion(lkb)) {
2962
remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2963
r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
2964
r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2965
r->res_ls->ls_stub_ms.m_result = 0;
2966
__receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2967
}
2968
2969
return error;
2970
}
2971
2972
/* FIXME: if this lkb is the only lock we hold on the rsb, then set
2973
MASTER_UNCERTAIN to force the next request on the rsb to confirm
2974
that the master is still correct. */
2975
2976
static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2977
{
2978
return send_common(r, lkb, DLM_MSG_UNLOCK);
2979
}
2980
2981
static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2982
{
2983
return send_common(r, lkb, DLM_MSG_CANCEL);
2984
}
2985
2986
static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2987
{
2988
struct dlm_message *ms;
2989
struct dlm_mhandle *mh;
2990
int to_nodeid, error;
2991
2992
to_nodeid = lkb->lkb_nodeid;
2993
2994
error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2995
if (error)
2996
goto out;
2997
2998
send_args(r, lkb, ms);
2999
3000
ms->m_result = 0;
3001
3002
error = send_message(mh, ms);
3003
out:
3004
return error;
3005
}
3006
3007
static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3008
{
3009
struct dlm_message *ms;
3010
struct dlm_mhandle *mh;
3011
int to_nodeid, error;
3012
3013
to_nodeid = lkb->lkb_nodeid;
3014
3015
error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3016
if (error)
3017
goto out;
3018
3019
send_args(r, lkb, ms);
3020
3021
ms->m_bastmode = mode;
3022
3023
error = send_message(mh, ms);
3024
out:
3025
return error;
3026
}
3027
3028
static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3029
{
3030
struct dlm_message *ms;
3031
struct dlm_mhandle *mh;
3032
int to_nodeid, error;
3033
3034
to_nodeid = dlm_dir_nodeid(r);
3035
3036
error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3037
if (error)
3038
return error;
3039
3040
error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3041
if (error)
3042
goto fail;
3043
3044
send_args(r, lkb, ms);
3045
3046
error = send_message(mh, ms);
3047
if (error)
3048
goto fail;
3049
return 0;
3050
3051
fail:
3052
remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3053
return error;
3054
}
3055
3056
static int send_remove(struct dlm_rsb *r)
3057
{
3058
struct dlm_message *ms;
3059
struct dlm_mhandle *mh;
3060
int to_nodeid, error;
3061
3062
to_nodeid = dlm_dir_nodeid(r);
3063
3064
error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3065
if (error)
3066
goto out;
3067
3068
memcpy(ms->m_extra, r->res_name, r->res_length);
3069
ms->m_hash = r->res_hash;
3070
3071
error = send_message(mh, ms);
3072
out:
3073
return error;
3074
}
3075
3076
static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3077
int mstype, int rv)
3078
{
3079
struct dlm_message *ms;
3080
struct dlm_mhandle *mh;
3081
int to_nodeid, error;
3082
3083
to_nodeid = lkb->lkb_nodeid;
3084
3085
error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3086
if (error)
3087
goto out;
3088
3089
send_args(r, lkb, ms);
3090
3091
ms->m_result = rv;
3092
3093
error = send_message(mh, ms);
3094
out:
3095
return error;
3096
}
3097
3098
static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3099
{
3100
return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3101
}
3102
3103
static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3104
{
3105
return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3106
}
3107
3108
static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3109
{
3110
return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3111
}
3112
3113
static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3114
{
3115
return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3116
}
3117
3118
static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3119
int ret_nodeid, int rv)
3120
{
3121
struct dlm_rsb *r = &ls->ls_stub_rsb;
3122
struct dlm_message *ms;
3123
struct dlm_mhandle *mh;
3124
int error, nodeid = ms_in->m_header.h_nodeid;
3125
3126
error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3127
if (error)
3128
goto out;
3129
3130
ms->m_lkid = ms_in->m_lkid;
3131
ms->m_result = rv;
3132
ms->m_nodeid = ret_nodeid;
3133
3134
error = send_message(mh, ms);
3135
out:
3136
return error;
3137
}
3138
3139
/* which args we save from a received message depends heavily on the type
3140
of message, unlike the send side where we can safely send everything about
3141
the lkb for any type of message */
3142
3143
static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3144
{
3145
lkb->lkb_exflags = ms->m_exflags;
3146
lkb->lkb_sbflags = ms->m_sbflags;
3147
lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3148
(ms->m_flags & 0x0000FFFF);
3149
}
3150
3151
static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3152
{
3153
if (ms->m_flags == DLM_IFL_STUB_MS)
3154
return;
3155
3156
lkb->lkb_sbflags = ms->m_sbflags;
3157
lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3158
(ms->m_flags & 0x0000FFFF);
3159
}
3160
3161
static int receive_extralen(struct dlm_message *ms)
3162
{
3163
return (ms->m_header.h_length - sizeof(struct dlm_message));
3164
}
3165
3166
static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3167
struct dlm_message *ms)
3168
{
3169
int len;
3170
3171
if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3172
if (!lkb->lkb_lvbptr)
3173
lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3174
if (!lkb->lkb_lvbptr)
3175
return -ENOMEM;
3176
len = receive_extralen(ms);
3177
if (len > DLM_RESNAME_MAXLEN)
3178
len = DLM_RESNAME_MAXLEN;
3179
memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3180
}
3181
return 0;
3182
}
3183
3184
static void fake_bastfn(void *astparam, int mode)
3185
{
3186
log_print("fake_bastfn should not be called");
3187
}
3188
3189
static void fake_astfn(void *astparam)
3190
{
3191
log_print("fake_astfn should not be called");
3192
}
3193
3194
static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3195
struct dlm_message *ms)
3196
{
3197
lkb->lkb_nodeid = ms->m_header.h_nodeid;
3198
lkb->lkb_ownpid = ms->m_pid;
3199
lkb->lkb_remid = ms->m_lkid;
3200
lkb->lkb_grmode = DLM_LOCK_IV;
3201
lkb->lkb_rqmode = ms->m_rqmode;
3202
3203
lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
3204
lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
3205
3206
if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3207
/* lkb was just created so there won't be an lvb yet */
3208
lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3209
if (!lkb->lkb_lvbptr)
3210
return -ENOMEM;
3211
}
3212
3213
return 0;
3214
}
3215
3216
static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3217
struct dlm_message *ms)
3218
{
3219
if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3220
return -EBUSY;
3221
3222
if (receive_lvb(ls, lkb, ms))
3223
return -ENOMEM;
3224
3225
lkb->lkb_rqmode = ms->m_rqmode;
3226
lkb->lkb_lvbseq = ms->m_lvbseq;
3227
3228
return 0;
3229
}
3230
3231
static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3232
struct dlm_message *ms)
3233
{
3234
if (receive_lvb(ls, lkb, ms))
3235
return -ENOMEM;
3236
return 0;
3237
}
3238
3239
/* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3240
uses to send a reply and that the remote end uses to process the reply. */
3241
3242
static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3243
{
3244
struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3245
lkb->lkb_nodeid = ms->m_header.h_nodeid;
3246
lkb->lkb_remid = ms->m_lkid;
3247
}
3248
3249
/* This is called after the rsb is locked so that we can safely inspect
3250
fields in the lkb. */
3251
3252
static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3253
{
3254
int from = ms->m_header.h_nodeid;
3255
int error = 0;
3256
3257
switch (ms->m_type) {
3258
case DLM_MSG_CONVERT:
3259
case DLM_MSG_UNLOCK:
3260
case DLM_MSG_CANCEL:
3261
if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3262
error = -EINVAL;
3263
break;
3264
3265
case DLM_MSG_CONVERT_REPLY:
3266
case DLM_MSG_UNLOCK_REPLY:
3267
case DLM_MSG_CANCEL_REPLY:
3268
case DLM_MSG_GRANT:
3269
case DLM_MSG_BAST:
3270
if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3271
error = -EINVAL;
3272
break;
3273
3274
case DLM_MSG_REQUEST_REPLY:
3275
if (!is_process_copy(lkb))
3276
error = -EINVAL;
3277
else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3278
error = -EINVAL;
3279
break;
3280
3281
default:
3282
error = -EINVAL;
3283
}
3284
3285
if (error)
3286
log_error(lkb->lkb_resource->res_ls,
3287
"ignore invalid message %d from %d %x %x %x %d",
3288
ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3289
lkb->lkb_flags, lkb->lkb_nodeid);
3290
return error;
3291
}
3292
3293
static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3294
{
3295
struct dlm_lkb *lkb;
3296
struct dlm_rsb *r;
3297
int error, namelen;
3298
3299
error = create_lkb(ls, &lkb);
3300
if (error)
3301
goto fail;
3302
3303
receive_flags(lkb, ms);
3304
lkb->lkb_flags |= DLM_IFL_MSTCPY;
3305
error = receive_request_args(ls, lkb, ms);
3306
if (error) {
3307
__put_lkb(ls, lkb);
3308
goto fail;
3309
}
3310
3311
namelen = receive_extralen(ms);
3312
3313
error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3314
if (error) {
3315
__put_lkb(ls, lkb);
3316
goto fail;
3317
}
3318
3319
lock_rsb(r);
3320
3321
attach_lkb(r, lkb);
3322
error = do_request(r, lkb);
3323
send_request_reply(r, lkb, error);
3324
do_request_effects(r, lkb, error);
3325
3326
unlock_rsb(r);
3327
put_rsb(r);
3328
3329
if (error == -EINPROGRESS)
3330
error = 0;
3331
if (error)
3332
dlm_put_lkb(lkb);
3333
return;
3334
3335
fail:
3336
setup_stub_lkb(ls, ms);
3337
send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3338
}
3339
3340
static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3341
{
3342
struct dlm_lkb *lkb;
3343
struct dlm_rsb *r;
3344
int error, reply = 1;
3345
3346
error = find_lkb(ls, ms->m_remid, &lkb);
3347
if (error)
3348
goto fail;
3349
3350
r = lkb->lkb_resource;
3351
3352
hold_rsb(r);
3353
lock_rsb(r);
3354
3355
error = validate_message(lkb, ms);
3356
if (error)
3357
goto out;
3358
3359
receive_flags(lkb, ms);
3360
3361
error = receive_convert_args(ls, lkb, ms);
3362
if (error) {
3363
send_convert_reply(r, lkb, error);
3364
goto out;
3365
}
3366
3367
reply = !down_conversion(lkb);
3368
3369
error = do_convert(r, lkb);
3370
if (reply)
3371
send_convert_reply(r, lkb, error);
3372
do_convert_effects(r, lkb, error);
3373
out:
3374
unlock_rsb(r);
3375
put_rsb(r);
3376
dlm_put_lkb(lkb);
3377
return;
3378
3379
fail:
3380
setup_stub_lkb(ls, ms);
3381
send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3382
}
3383
3384
static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3385
{
3386
struct dlm_lkb *lkb;
3387
struct dlm_rsb *r;
3388
int error;
3389
3390
error = find_lkb(ls, ms->m_remid, &lkb);
3391
if (error)
3392
goto fail;
3393
3394
r = lkb->lkb_resource;
3395
3396
hold_rsb(r);
3397
lock_rsb(r);
3398
3399
error = validate_message(lkb, ms);
3400
if (error)
3401
goto out;
3402
3403
receive_flags(lkb, ms);
3404
3405
error = receive_unlock_args(ls, lkb, ms);
3406
if (error) {
3407
send_unlock_reply(r, lkb, error);
3408
goto out;
3409
}
3410
3411
error = do_unlock(r, lkb);
3412
send_unlock_reply(r, lkb, error);
3413
do_unlock_effects(r, lkb, error);
3414
out:
3415
unlock_rsb(r);
3416
put_rsb(r);
3417
dlm_put_lkb(lkb);
3418
return;
3419
3420
fail:
3421
setup_stub_lkb(ls, ms);
3422
send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3423
}
3424
3425
static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3426
{
3427
struct dlm_lkb *lkb;
3428
struct dlm_rsb *r;
3429
int error;
3430
3431
error = find_lkb(ls, ms->m_remid, &lkb);
3432
if (error)
3433
goto fail;
3434
3435
receive_flags(lkb, ms);
3436
3437
r = lkb->lkb_resource;
3438
3439
hold_rsb(r);
3440
lock_rsb(r);
3441
3442
error = validate_message(lkb, ms);
3443
if (error)
3444
goto out;
3445
3446
error = do_cancel(r, lkb);
3447
send_cancel_reply(r, lkb, error);
3448
do_cancel_effects(r, lkb, error);
3449
out:
3450
unlock_rsb(r);
3451
put_rsb(r);
3452
dlm_put_lkb(lkb);
3453
return;
3454
3455
fail:
3456
setup_stub_lkb(ls, ms);
3457
send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3458
}
3459
3460
static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3461
{
3462
struct dlm_lkb *lkb;
3463
struct dlm_rsb *r;
3464
int error;
3465
3466
error = find_lkb(ls, ms->m_remid, &lkb);
3467
if (error) {
3468
log_debug(ls, "receive_grant from %d no lkb %x",
3469
ms->m_header.h_nodeid, ms->m_remid);
3470
return;
3471
}
3472
3473
r = lkb->lkb_resource;
3474
3475
hold_rsb(r);
3476
lock_rsb(r);
3477
3478
error = validate_message(lkb, ms);
3479
if (error)
3480
goto out;
3481
3482
receive_flags_reply(lkb, ms);
3483
if (is_altmode(lkb))
3484
munge_altmode(lkb, ms);
3485
grant_lock_pc(r, lkb, ms);
3486
queue_cast(r, lkb, 0);
3487
out:
3488
unlock_rsb(r);
3489
put_rsb(r);
3490
dlm_put_lkb(lkb);
3491
}
3492
3493
static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3494
{
3495
struct dlm_lkb *lkb;
3496
struct dlm_rsb *r;
3497
int error;
3498
3499
error = find_lkb(ls, ms->m_remid, &lkb);
3500
if (error) {
3501
log_debug(ls, "receive_bast from %d no lkb %x",
3502
ms->m_header.h_nodeid, ms->m_remid);
3503
return;
3504
}
3505
3506
r = lkb->lkb_resource;
3507
3508
hold_rsb(r);
3509
lock_rsb(r);
3510
3511
error = validate_message(lkb, ms);
3512
if (error)
3513
goto out;
3514
3515
queue_bast(r, lkb, ms->m_bastmode);
3516
out:
3517
unlock_rsb(r);
3518
put_rsb(r);
3519
dlm_put_lkb(lkb);
3520
}
3521
3522
static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3523
{
3524
int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3525
3526
from_nodeid = ms->m_header.h_nodeid;
3527
our_nodeid = dlm_our_nodeid();
3528
3529
len = receive_extralen(ms);
3530
3531
dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3532
if (dir_nodeid != our_nodeid) {
3533
log_error(ls, "lookup dir_nodeid %d from %d",
3534
dir_nodeid, from_nodeid);
3535
error = -EINVAL;
3536
ret_nodeid = -1;
3537
goto out;
3538
}
3539
3540
error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3541
3542
/* Optimization: we're master so treat lookup as a request */
3543
if (!error && ret_nodeid == our_nodeid) {
3544
receive_request(ls, ms);
3545
return;
3546
}
3547
out:
3548
send_lookup_reply(ls, ms, ret_nodeid, error);
3549
}
3550
3551
static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3552
{
3553
int len, dir_nodeid, from_nodeid;
3554
3555
from_nodeid = ms->m_header.h_nodeid;
3556
3557
len = receive_extralen(ms);
3558
3559
dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3560
if (dir_nodeid != dlm_our_nodeid()) {
3561
log_error(ls, "remove dir entry dir_nodeid %d from %d",
3562
dir_nodeid, from_nodeid);
3563
return;
3564
}
3565
3566
dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3567
}
3568
3569
static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3570
{
3571
do_purge(ls, ms->m_nodeid, ms->m_pid);
3572
}
3573
3574
static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3575
{
3576
struct dlm_lkb *lkb;
3577
struct dlm_rsb *r;
3578
int error, mstype, result;
3579
3580
error = find_lkb(ls, ms->m_remid, &lkb);
3581
if (error) {
3582
log_debug(ls, "receive_request_reply from %d no lkb %x",
3583
ms->m_header.h_nodeid, ms->m_remid);
3584
return;
3585
}
3586
3587
r = lkb->lkb_resource;
3588
hold_rsb(r);
3589
lock_rsb(r);
3590
3591
error = validate_message(lkb, ms);
3592
if (error)
3593
goto out;
3594
3595
mstype = lkb->lkb_wait_type;
3596
error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3597
if (error)
3598
goto out;
3599
3600
/* Optimization: the dir node was also the master, so it took our
3601
lookup as a request and sent request reply instead of lookup reply */
3602
if (mstype == DLM_MSG_LOOKUP) {
3603
r->res_nodeid = ms->m_header.h_nodeid;
3604
lkb->lkb_nodeid = r->res_nodeid;
3605
}
3606
3607
/* this is the value returned from do_request() on the master */
3608
result = ms->m_result;
3609
3610
switch (result) {
3611
case -EAGAIN:
3612
/* request would block (be queued) on remote master */
3613
queue_cast(r, lkb, -EAGAIN);
3614
confirm_master(r, -EAGAIN);
3615
unhold_lkb(lkb); /* undoes create_lkb() */
3616
break;
3617
3618
case -EINPROGRESS:
3619
case 0:
3620
/* request was queued or granted on remote master */
3621
receive_flags_reply(lkb, ms);
3622
lkb->lkb_remid = ms->m_lkid;
3623
if (is_altmode(lkb))
3624
munge_altmode(lkb, ms);
3625
if (result) {
3626
add_lkb(r, lkb, DLM_LKSTS_WAITING);
3627
add_timeout(lkb);
3628
} else {
3629
grant_lock_pc(r, lkb, ms);
3630
queue_cast(r, lkb, 0);
3631
}
3632
confirm_master(r, result);
3633
break;
3634
3635
case -EBADR:
3636
case -ENOTBLK:
3637
/* find_rsb failed to find rsb or rsb wasn't master */
3638
log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3639
lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3640
r->res_nodeid = -1;
3641
lkb->lkb_nodeid = -1;
3642
3643
if (is_overlap(lkb)) {
3644
/* we'll ignore error in cancel/unlock reply */
3645
queue_cast_overlap(r, lkb);
3646
confirm_master(r, result);
3647
unhold_lkb(lkb); /* undoes create_lkb() */
3648
} else
3649
_request_lock(r, lkb);
3650
break;
3651
3652
default:
3653
log_error(ls, "receive_request_reply %x error %d",
3654
lkb->lkb_id, result);
3655
}
3656
3657
if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3658
log_debug(ls, "receive_request_reply %x result %d unlock",
3659
lkb->lkb_id, result);
3660
lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3661
lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3662
send_unlock(r, lkb);
3663
} else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3664
log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3665
lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3666
lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3667
send_cancel(r, lkb);
3668
} else {
3669
lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3670
lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3671
}
3672
out:
3673
unlock_rsb(r);
3674
put_rsb(r);
3675
dlm_put_lkb(lkb);
3676
}
3677
3678
static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3679
struct dlm_message *ms)
3680
{
3681
/* this is the value returned from do_convert() on the master */
3682
switch (ms->m_result) {
3683
case -EAGAIN:
3684
/* convert would block (be queued) on remote master */
3685
queue_cast(r, lkb, -EAGAIN);
3686
break;
3687
3688
case -EDEADLK:
3689
receive_flags_reply(lkb, ms);
3690
revert_lock_pc(r, lkb);
3691
queue_cast(r, lkb, -EDEADLK);
3692
break;
3693
3694
case -EINPROGRESS:
3695
/* convert was queued on remote master */
3696
receive_flags_reply(lkb, ms);
3697
if (is_demoted(lkb))
3698
munge_demoted(lkb);
3699
del_lkb(r, lkb);
3700
add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3701
add_timeout(lkb);
3702
break;
3703
3704
case 0:
3705
/* convert was granted on remote master */
3706
receive_flags_reply(lkb, ms);
3707
if (is_demoted(lkb))
3708
munge_demoted(lkb);
3709
grant_lock_pc(r, lkb, ms);
3710
queue_cast(r, lkb, 0);
3711
break;
3712
3713
default:
3714
log_error(r->res_ls, "receive_convert_reply %x error %d",
3715
lkb->lkb_id, ms->m_result);
3716
}
3717
}
3718
3719
static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3720
{
3721
struct dlm_rsb *r = lkb->lkb_resource;
3722
int error;
3723
3724
hold_rsb(r);
3725
lock_rsb(r);
3726
3727
error = validate_message(lkb, ms);
3728
if (error)
3729
goto out;
3730
3731
/* stub reply can happen with waiters_mutex held */
3732
error = remove_from_waiters_ms(lkb, ms);
3733
if (error)
3734
goto out;
3735
3736
__receive_convert_reply(r, lkb, ms);
3737
out:
3738
unlock_rsb(r);
3739
put_rsb(r);
3740
}
3741
3742
static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3743
{
3744
struct dlm_lkb *lkb;
3745
int error;
3746
3747
error = find_lkb(ls, ms->m_remid, &lkb);
3748
if (error) {
3749
log_debug(ls, "receive_convert_reply from %d no lkb %x",
3750
ms->m_header.h_nodeid, ms->m_remid);
3751
return;
3752
}
3753
3754
_receive_convert_reply(lkb, ms);
3755
dlm_put_lkb(lkb);
3756
}
3757
3758
static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3759
{
3760
struct dlm_rsb *r = lkb->lkb_resource;
3761
int error;
3762
3763
hold_rsb(r);
3764
lock_rsb(r);
3765
3766
error = validate_message(lkb, ms);
3767
if (error)
3768
goto out;
3769
3770
/* stub reply can happen with waiters_mutex held */
3771
error = remove_from_waiters_ms(lkb, ms);
3772
if (error)
3773
goto out;
3774
3775
/* this is the value returned from do_unlock() on the master */
3776
3777
switch (ms->m_result) {
3778
case -DLM_EUNLOCK:
3779
receive_flags_reply(lkb, ms);
3780
remove_lock_pc(r, lkb);
3781
queue_cast(r, lkb, -DLM_EUNLOCK);
3782
break;
3783
case -ENOENT:
3784
break;
3785
default:
3786
log_error(r->res_ls, "receive_unlock_reply %x error %d",
3787
lkb->lkb_id, ms->m_result);
3788
}
3789
out:
3790
unlock_rsb(r);
3791
put_rsb(r);
3792
}
3793
3794
static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3795
{
3796
struct dlm_lkb *lkb;
3797
int error;
3798
3799
error = find_lkb(ls, ms->m_remid, &lkb);
3800
if (error) {
3801
log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3802
ms->m_header.h_nodeid, ms->m_remid);
3803
return;
3804
}
3805
3806
_receive_unlock_reply(lkb, ms);
3807
dlm_put_lkb(lkb);
3808
}
3809
3810
static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3811
{
3812
struct dlm_rsb *r = lkb->lkb_resource;
3813
int error;
3814
3815
hold_rsb(r);
3816
lock_rsb(r);
3817
3818
error = validate_message(lkb, ms);
3819
if (error)
3820
goto out;
3821
3822
/* stub reply can happen with waiters_mutex held */
3823
error = remove_from_waiters_ms(lkb, ms);
3824
if (error)
3825
goto out;
3826
3827
/* this is the value returned from do_cancel() on the master */
3828
3829
switch (ms->m_result) {
3830
case -DLM_ECANCEL:
3831
receive_flags_reply(lkb, ms);
3832
revert_lock_pc(r, lkb);
3833
queue_cast(r, lkb, -DLM_ECANCEL);
3834
break;
3835
case 0:
3836
break;
3837
default:
3838
log_error(r->res_ls, "receive_cancel_reply %x error %d",
3839
lkb->lkb_id, ms->m_result);
3840
}
3841
out:
3842
unlock_rsb(r);
3843
put_rsb(r);
3844
}
3845
3846
static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3847
{
3848
struct dlm_lkb *lkb;
3849
int error;
3850
3851
error = find_lkb(ls, ms->m_remid, &lkb);
3852
if (error) {
3853
log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3854
ms->m_header.h_nodeid, ms->m_remid);
3855
return;
3856
}
3857
3858
_receive_cancel_reply(lkb, ms);
3859
dlm_put_lkb(lkb);
3860
}
3861
3862
static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3863
{
3864
struct dlm_lkb *lkb;
3865
struct dlm_rsb *r;
3866
int error, ret_nodeid;
3867
3868
error = find_lkb(ls, ms->m_lkid, &lkb);
3869
if (error) {
3870
log_error(ls, "receive_lookup_reply no lkb");
3871
return;
3872
}
3873
3874
/* ms->m_result is the value returned by dlm_dir_lookup on dir node
3875
FIXME: will a non-zero error ever be returned? */
3876
3877
r = lkb->lkb_resource;
3878
hold_rsb(r);
3879
lock_rsb(r);
3880
3881
error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3882
if (error)
3883
goto out;
3884
3885
ret_nodeid = ms->m_nodeid;
3886
if (ret_nodeid == dlm_our_nodeid()) {
3887
r->res_nodeid = 0;
3888
ret_nodeid = 0;
3889
r->res_first_lkid = 0;
3890
} else {
3891
/* set_master() will copy res_nodeid to lkb_nodeid */
3892
r->res_nodeid = ret_nodeid;
3893
}
3894
3895
if (is_overlap(lkb)) {
3896
log_debug(ls, "receive_lookup_reply %x unlock %x",
3897
lkb->lkb_id, lkb->lkb_flags);
3898
queue_cast_overlap(r, lkb);
3899
unhold_lkb(lkb); /* undoes create_lkb() */
3900
goto out_list;
3901
}
3902
3903
_request_lock(r, lkb);
3904
3905
out_list:
3906
if (!ret_nodeid)
3907
process_lookup_list(r);
3908
out:
3909
unlock_rsb(r);
3910
put_rsb(r);
3911
dlm_put_lkb(lkb);
3912
}
3913
3914
static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
3915
{
3916
if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3917
log_debug(ls, "ignore non-member message %d from %d %x %x %d",
3918
ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
3919
ms->m_remid, ms->m_result);
3920
return;
3921
}
3922
3923
switch (ms->m_type) {
3924
3925
/* messages sent to a master node */
3926
3927
case DLM_MSG_REQUEST:
3928
receive_request(ls, ms);
3929
break;
3930
3931
case DLM_MSG_CONVERT:
3932
receive_convert(ls, ms);
3933
break;
3934
3935
case DLM_MSG_UNLOCK:
3936
receive_unlock(ls, ms);
3937
break;
3938
3939
case DLM_MSG_CANCEL:
3940
receive_cancel(ls, ms);
3941
break;
3942
3943
/* messages sent from a master node (replies to above) */
3944
3945
case DLM_MSG_REQUEST_REPLY:
3946
receive_request_reply(ls, ms);
3947
break;
3948
3949
case DLM_MSG_CONVERT_REPLY:
3950
receive_convert_reply(ls, ms);
3951
break;
3952
3953
case DLM_MSG_UNLOCK_REPLY:
3954
receive_unlock_reply(ls, ms);
3955
break;
3956
3957
case DLM_MSG_CANCEL_REPLY:
3958
receive_cancel_reply(ls, ms);
3959
break;
3960
3961
/* messages sent from a master node (only two types of async msg) */
3962
3963
case DLM_MSG_GRANT:
3964
receive_grant(ls, ms);
3965
break;
3966
3967
case DLM_MSG_BAST:
3968
receive_bast(ls, ms);
3969
break;
3970
3971
/* messages sent to a dir node */
3972
3973
case DLM_MSG_LOOKUP:
3974
receive_lookup(ls, ms);
3975
break;
3976
3977
case DLM_MSG_REMOVE:
3978
receive_remove(ls, ms);
3979
break;
3980
3981
/* messages sent from a dir node (remove has no reply) */
3982
3983
case DLM_MSG_LOOKUP_REPLY:
3984
receive_lookup_reply(ls, ms);
3985
break;
3986
3987
/* other messages */
3988
3989
case DLM_MSG_PURGE:
3990
receive_purge(ls, ms);
3991
break;
3992
3993
default:
3994
log_error(ls, "unknown message type %d", ms->m_type);
3995
}
3996
3997
dlm_astd_wake();
3998
}
3999
4000
/* If the lockspace is in recovery mode (locking stopped), then normal
4001
messages are saved on the requestqueue for processing after recovery is
4002
done. When not in recovery mode, we wait for dlm_recoverd to drain saved
4003
messages off the requestqueue before we process new ones. This occurs right
4004
after recovery completes when we transition from saving all messages on
4005
requestqueue, to processing all the saved messages, to processing new
4006
messages as they arrive. */
4007
4008
static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4009
int nodeid)
4010
{
4011
if (dlm_locking_stopped(ls)) {
4012
dlm_add_requestqueue(ls, nodeid, ms);
4013
} else {
4014
dlm_wait_requestqueue(ls);
4015
_receive_message(ls, ms);
4016
}
4017
}
4018
4019
/* This is called by dlm_recoverd to process messages that were saved on
4020
the requestqueue. */
4021
4022
void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
4023
{
4024
_receive_message(ls, ms);
4025
}
4026
4027
/* This is called by the midcomms layer when something is received for
4028
the lockspace. It could be either a MSG (normal message sent as part of
4029
standard locking activity) or an RCOM (recovery message sent as part of
4030
lockspace recovery). */
4031
4032
void dlm_receive_buffer(union dlm_packet *p, int nodeid)
4033
{
4034
struct dlm_header *hd = &p->header;
4035
struct dlm_ls *ls;
4036
int type = 0;
4037
4038
switch (hd->h_cmd) {
4039
case DLM_MSG:
4040
dlm_message_in(&p->message);
4041
type = p->message.m_type;
4042
break;
4043
case DLM_RCOM:
4044
dlm_rcom_in(&p->rcom);
4045
type = p->rcom.rc_type;
4046
break;
4047
default:
4048
log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4049
return;
4050
}
4051
4052
if (hd->h_nodeid != nodeid) {
4053
log_print("invalid h_nodeid %d from %d lockspace %x",
4054
hd->h_nodeid, nodeid, hd->h_lockspace);
4055
return;
4056
}
4057
4058
ls = dlm_find_lockspace_global(hd->h_lockspace);
4059
if (!ls) {
4060
if (dlm_config.ci_log_debug)
4061
log_print("invalid lockspace %x from %d cmd %d type %d",
4062
hd->h_lockspace, nodeid, hd->h_cmd, type);
4063
4064
if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4065
dlm_send_ls_not_ready(nodeid, &p->rcom);
4066
return;
4067
}
4068
4069
/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4070
be inactive (in this ls) before transitioning to recovery mode */
4071
4072
down_read(&ls->ls_recv_active);
4073
if (hd->h_cmd == DLM_MSG)
4074
dlm_receive_message(ls, &p->message, nodeid);
4075
else
4076
dlm_receive_rcom(ls, &p->rcom, nodeid);
4077
up_read(&ls->ls_recv_active);
4078
4079
dlm_put_lockspace(ls);
4080
}
4081
4082
static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4083
struct dlm_message *ms_stub)
4084
{
4085
if (middle_conversion(lkb)) {
4086
hold_lkb(lkb);
4087
memset(ms_stub, 0, sizeof(struct dlm_message));
4088
ms_stub->m_flags = DLM_IFL_STUB_MS;
4089
ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
4090
ms_stub->m_result = -EINPROGRESS;
4091
ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4092
_receive_convert_reply(lkb, ms_stub);
4093
4094
/* Same special case as in receive_rcom_lock_args() */
4095
lkb->lkb_grmode = DLM_LOCK_IV;
4096
rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
4097
unhold_lkb(lkb);
4098
4099
} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
4100
lkb->lkb_flags |= DLM_IFL_RESEND;
4101
}
4102
4103
/* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4104
conversions are async; there's no reply from the remote master */
4105
}
4106
4107
/* A waiting lkb needs recovery if the master node has failed, or
4108
the master node is changing (only when no directory is used) */
4109
4110
static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
4111
{
4112
if (dlm_is_removed(ls, lkb->lkb_nodeid))
4113
return 1;
4114
4115
if (!dlm_no_directory(ls))
4116
return 0;
4117
4118
if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
4119
return 1;
4120
4121
return 0;
4122
}
4123
4124
/* Recovery for locks that are waiting for replies from nodes that are now
4125
gone. We can just complete unlocks and cancels by faking a reply from the
4126
dead node. Requests and up-conversions we flag to be resent after
4127
recovery. Down-conversions can just be completed with a fake reply like
4128
unlocks. Conversions between PR and CW need special attention. */
4129
4130
void dlm_recover_waiters_pre(struct dlm_ls *ls)
4131
{
4132
struct dlm_lkb *lkb, *safe;
4133
struct dlm_message *ms_stub;
4134
int wait_type, stub_unlock_result, stub_cancel_result;
4135
4136
ms_stub = kmalloc(GFP_KERNEL, sizeof(struct dlm_message));
4137
if (!ms_stub) {
4138
log_error(ls, "dlm_recover_waiters_pre no mem");
4139
return;
4140
}
4141
4142
mutex_lock(&ls->ls_waiters_mutex);
4143
4144
list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
4145
4146
/* exclude debug messages about unlocks because there can be so
4147
many and they aren't very interesting */
4148
4149
if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
4150
log_debug(ls, "recover_waiter %x nodeid %d "
4151
"msg %d to %d", lkb->lkb_id, lkb->lkb_nodeid,
4152
lkb->lkb_wait_type, lkb->lkb_wait_nodeid);
4153
}
4154
4155
/* all outstanding lookups, regardless of destination will be
4156
resent after recovery is done */
4157
4158
if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4159
lkb->lkb_flags |= DLM_IFL_RESEND;
4160
continue;
4161
}
4162
4163
if (!waiter_needs_recovery(ls, lkb))
4164
continue;
4165
4166
wait_type = lkb->lkb_wait_type;
4167
stub_unlock_result = -DLM_EUNLOCK;
4168
stub_cancel_result = -DLM_ECANCEL;
4169
4170
/* Main reply may have been received leaving a zero wait_type,
4171
but a reply for the overlapping op may not have been
4172
received. In that case we need to fake the appropriate
4173
reply for the overlap op. */
4174
4175
if (!wait_type) {
4176
if (is_overlap_cancel(lkb)) {
4177
wait_type = DLM_MSG_CANCEL;
4178
if (lkb->lkb_grmode == DLM_LOCK_IV)
4179
stub_cancel_result = 0;
4180
}
4181
if (is_overlap_unlock(lkb)) {
4182
wait_type = DLM_MSG_UNLOCK;
4183
if (lkb->lkb_grmode == DLM_LOCK_IV)
4184
stub_unlock_result = -ENOENT;
4185
}
4186
4187
log_debug(ls, "rwpre overlap %x %x %d %d %d",
4188
lkb->lkb_id, lkb->lkb_flags, wait_type,
4189
stub_cancel_result, stub_unlock_result);
4190
}
4191
4192
switch (wait_type) {
4193
4194
case DLM_MSG_REQUEST:
4195
lkb->lkb_flags |= DLM_IFL_RESEND;
4196
break;
4197
4198
case DLM_MSG_CONVERT:
4199
recover_convert_waiter(ls, lkb, ms_stub);
4200
break;
4201
4202
case DLM_MSG_UNLOCK:
4203
hold_lkb(lkb);
4204
memset(ms_stub, 0, sizeof(struct dlm_message));
4205
ms_stub->m_flags = DLM_IFL_STUB_MS;
4206
ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
4207
ms_stub->m_result = stub_unlock_result;
4208
ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4209
_receive_unlock_reply(lkb, ms_stub);
4210
dlm_put_lkb(lkb);
4211
break;
4212
4213
case DLM_MSG_CANCEL:
4214
hold_lkb(lkb);
4215
memset(ms_stub, 0, sizeof(struct dlm_message));
4216
ms_stub->m_flags = DLM_IFL_STUB_MS;
4217
ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
4218
ms_stub->m_result = stub_cancel_result;
4219
ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4220
_receive_cancel_reply(lkb, ms_stub);
4221
dlm_put_lkb(lkb);
4222
break;
4223
4224
default:
4225
log_error(ls, "invalid lkb wait_type %d %d",
4226
lkb->lkb_wait_type, wait_type);
4227
}
4228
schedule();
4229
}
4230
mutex_unlock(&ls->ls_waiters_mutex);
4231
kfree(ms_stub);
4232
}
4233
4234
static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4235
{
4236
struct dlm_lkb *lkb;
4237
int found = 0;
4238
4239
mutex_lock(&ls->ls_waiters_mutex);
4240
list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4241
if (lkb->lkb_flags & DLM_IFL_RESEND) {
4242
hold_lkb(lkb);
4243
found = 1;
4244
break;
4245
}
4246
}
4247
mutex_unlock(&ls->ls_waiters_mutex);
4248
4249
if (!found)
4250
lkb = NULL;
4251
return lkb;
4252
}
4253
4254
/* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
4255
master or dir-node for r. Processing the lkb may result in it being placed
4256
back on waiters. */
4257
4258
/* We do this after normal locking has been enabled and any saved messages
4259
(in requestqueue) have been processed. We should be confident that at
4260
this point we won't get or process a reply to any of these waiting
4261
operations. But, new ops may be coming in on the rsbs/locks here from
4262
userspace or remotely. */
4263
4264
/* there may have been an overlap unlock/cancel prior to recovery or after
4265
recovery. if before, the lkb may still have a pos wait_count; if after, the
4266
overlap flag would just have been set and nothing new sent. we can be
4267
confident here than any replies to either the initial op or overlap ops
4268
prior to recovery have been received. */
4269
4270
int dlm_recover_waiters_post(struct dlm_ls *ls)
4271
{
4272
struct dlm_lkb *lkb;
4273
struct dlm_rsb *r;
4274
int error = 0, mstype, err, oc, ou;
4275
4276
while (1) {
4277
if (dlm_locking_stopped(ls)) {
4278
log_debug(ls, "recover_waiters_post aborted");
4279
error = -EINTR;
4280
break;
4281
}
4282
4283
lkb = find_resend_waiter(ls);
4284
if (!lkb)
4285
break;
4286
4287
r = lkb->lkb_resource;
4288
hold_rsb(r);
4289
lock_rsb(r);
4290
4291
mstype = lkb->lkb_wait_type;
4292
oc = is_overlap_cancel(lkb);
4293
ou = is_overlap_unlock(lkb);
4294
err = 0;
4295
4296
log_debug(ls, "recover_waiter %x nodeid %d msg %d r_nodeid %d",
4297
lkb->lkb_id, lkb->lkb_nodeid, mstype, r->res_nodeid);
4298
4299
/* At this point we assume that we won't get a reply to any
4300
previous op or overlap op on this lock. First, do a big
4301
remove_from_waiters() for all previous ops. */
4302
4303
lkb->lkb_flags &= ~DLM_IFL_RESEND;
4304
lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4305
lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4306
lkb->lkb_wait_type = 0;
4307
lkb->lkb_wait_count = 0;
4308
mutex_lock(&ls->ls_waiters_mutex);
4309
list_del_init(&lkb->lkb_wait_reply);
4310
mutex_unlock(&ls->ls_waiters_mutex);
4311
unhold_lkb(lkb); /* for waiters list */
4312
4313
if (oc || ou) {
4314
/* do an unlock or cancel instead of resending */
4315
switch (mstype) {
4316
case DLM_MSG_LOOKUP:
4317
case DLM_MSG_REQUEST:
4318
queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4319
-DLM_ECANCEL);
4320
unhold_lkb(lkb); /* undoes create_lkb() */
4321
break;
4322
case DLM_MSG_CONVERT:
4323
if (oc) {
4324
queue_cast(r, lkb, -DLM_ECANCEL);
4325
} else {
4326
lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4327
_unlock_lock(r, lkb);
4328
}
4329
break;
4330
default:
4331
err = 1;
4332
}
4333
} else {
4334
switch (mstype) {
4335
case DLM_MSG_LOOKUP:
4336
case DLM_MSG_REQUEST:
4337
_request_lock(r, lkb);
4338
if (is_master(r))
4339
confirm_master(r, 0);
4340
break;
4341
case DLM_MSG_CONVERT:
4342
_convert_lock(r, lkb);
4343
break;
4344
default:
4345
err = 1;
4346
}
4347
}
4348
4349
if (err)
4350
log_error(ls, "recover_waiters_post %x %d %x %d %d",
4351
lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4352
unlock_rsb(r);
4353
put_rsb(r);
4354
dlm_put_lkb(lkb);
4355
}
4356
4357
return error;
4358
}
4359
4360
static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4361
int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4362
{
4363
struct dlm_ls *ls = r->res_ls;
4364
struct dlm_lkb *lkb, *safe;
4365
4366
list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4367
if (test(ls, lkb)) {
4368
rsb_set_flag(r, RSB_LOCKS_PURGED);
4369
del_lkb(r, lkb);
4370
/* this put should free the lkb */
4371
if (!dlm_put_lkb(lkb))
4372
log_error(ls, "purged lkb not released");
4373
}
4374
}
4375
}
4376
4377
static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4378
{
4379
return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4380
}
4381
4382
static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4383
{
4384
return is_master_copy(lkb);
4385
}
4386
4387
static void purge_dead_locks(struct dlm_rsb *r)
4388
{
4389
purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4390
purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4391
purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4392
}
4393
4394
void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4395
{
4396
purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4397
purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4398
purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4399
}
4400
4401
/* Get rid of locks held by nodes that are gone. */
4402
4403
int dlm_purge_locks(struct dlm_ls *ls)
4404
{
4405
struct dlm_rsb *r;
4406
4407
log_debug(ls, "dlm_purge_locks");
4408
4409
down_write(&ls->ls_root_sem);
4410
list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4411
hold_rsb(r);
4412
lock_rsb(r);
4413
if (is_master(r))
4414
purge_dead_locks(r);
4415
unlock_rsb(r);
4416
unhold_rsb(r);
4417
4418
schedule();
4419
}
4420
up_write(&ls->ls_root_sem);
4421
4422
return 0;
4423
}
4424
4425
static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4426
{
4427
struct dlm_rsb *r, *r_ret = NULL;
4428
4429
spin_lock(&ls->ls_rsbtbl[bucket].lock);
4430
list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4431
if (!rsb_flag(r, RSB_LOCKS_PURGED))
4432
continue;
4433
hold_rsb(r);
4434
rsb_clear_flag(r, RSB_LOCKS_PURGED);
4435
r_ret = r;
4436
break;
4437
}
4438
spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4439
return r_ret;
4440
}
4441
4442
void dlm_grant_after_purge(struct dlm_ls *ls)
4443
{
4444
struct dlm_rsb *r;
4445
int bucket = 0;
4446
4447
while (1) {
4448
r = find_purged_rsb(ls, bucket);
4449
if (!r) {
4450
if (bucket == ls->ls_rsbtbl_size - 1)
4451
break;
4452
bucket++;
4453
continue;
4454
}
4455
lock_rsb(r);
4456
if (is_master(r)) {
4457
grant_pending_locks(r);
4458
confirm_master(r, 0);
4459
}
4460
unlock_rsb(r);
4461
put_rsb(r);
4462
schedule();
4463
}
4464
}
4465
4466
static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4467
uint32_t remid)
4468
{
4469
struct dlm_lkb *lkb;
4470
4471
list_for_each_entry(lkb, head, lkb_statequeue) {
4472
if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4473
return lkb;
4474
}
4475
return NULL;
4476
}
4477
4478
static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4479
uint32_t remid)
4480
{
4481
struct dlm_lkb *lkb;
4482
4483
lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4484
if (lkb)
4485
return lkb;
4486
lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4487
if (lkb)
4488
return lkb;
4489
lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4490
if (lkb)
4491
return lkb;
4492
return NULL;
4493
}
4494
4495
/* needs at least dlm_rcom + rcom_lock */
4496
static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4497
struct dlm_rsb *r, struct dlm_rcom *rc)
4498
{
4499
struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4500
4501
lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4502
lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4503
lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4504
lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4505
lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
4506
lkb->lkb_flags |= DLM_IFL_MSTCPY;
4507
lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
4508
lkb->lkb_rqmode = rl->rl_rqmode;
4509
lkb->lkb_grmode = rl->rl_grmode;
4510
/* don't set lkb_status because add_lkb wants to itself */
4511
4512
lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
4513
lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
4514
4515
if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4516
int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4517
sizeof(struct rcom_lock);
4518
if (lvblen > ls->ls_lvblen)
4519
return -EINVAL;
4520
lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
4521
if (!lkb->lkb_lvbptr)
4522
return -ENOMEM;
4523
memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4524
}
4525
4526
/* Conversions between PR and CW (middle modes) need special handling.
4527
The real granted mode of these converting locks cannot be determined
4528
until all locks have been rebuilt on the rsb (recover_conversion) */
4529
4530
if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4531
middle_conversion(lkb)) {
4532
rl->rl_status = DLM_LKSTS_CONVERT;
4533
lkb->lkb_grmode = DLM_LOCK_IV;
4534
rsb_set_flag(r, RSB_RECOVER_CONVERT);
4535
}
4536
4537
return 0;
4538
}
4539
4540
/* This lkb may have been recovered in a previous aborted recovery so we need
4541
to check if the rsb already has an lkb with the given remote nodeid/lkid.
4542
If so we just send back a standard reply. If not, we create a new lkb with
4543
the given values and send back our lkid. We send back our lkid by sending
4544
back the rcom_lock struct we got but with the remid field filled in. */
4545
4546
/* needs at least dlm_rcom + rcom_lock */
4547
int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4548
{
4549
struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4550
struct dlm_rsb *r;
4551
struct dlm_lkb *lkb;
4552
int error;
4553
4554
if (rl->rl_parent_lkid) {
4555
error = -EOPNOTSUPP;
4556
goto out;
4557
}
4558
4559
error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4560
R_MASTER, &r);
4561
if (error)
4562
goto out;
4563
4564
lock_rsb(r);
4565
4566
lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
4567
if (lkb) {
4568
error = -EEXIST;
4569
goto out_remid;
4570
}
4571
4572
error = create_lkb(ls, &lkb);
4573
if (error)
4574
goto out_unlock;
4575
4576
error = receive_rcom_lock_args(ls, lkb, r, rc);
4577
if (error) {
4578
__put_lkb(ls, lkb);
4579
goto out_unlock;
4580
}
4581
4582
attach_lkb(r, lkb);
4583
add_lkb(r, lkb, rl->rl_status);
4584
error = 0;
4585
4586
out_remid:
4587
/* this is the new value returned to the lock holder for
4588
saving in its process-copy lkb */
4589
rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4590
4591
out_unlock:
4592
unlock_rsb(r);
4593
put_rsb(r);
4594
out:
4595
if (error)
4596
log_debug(ls, "recover_master_copy %d %x", error,
4597
le32_to_cpu(rl->rl_lkid));
4598
rl->rl_result = cpu_to_le32(error);
4599
return error;
4600
}
4601
4602
/* needs at least dlm_rcom + rcom_lock */
4603
int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4604
{
4605
struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4606
struct dlm_rsb *r;
4607
struct dlm_lkb *lkb;
4608
int error;
4609
4610
error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
4611
if (error) {
4612
log_error(ls, "recover_process_copy no lkid %x",
4613
le32_to_cpu(rl->rl_lkid));
4614
return error;
4615
}
4616
4617
DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4618
4619
error = le32_to_cpu(rl->rl_result);
4620
4621
r = lkb->lkb_resource;
4622
hold_rsb(r);
4623
lock_rsb(r);
4624
4625
switch (error) {
4626
case -EBADR:
4627
/* There's a chance the new master received our lock before
4628
dlm_recover_master_reply(), this wouldn't happen if we did
4629
a barrier between recover_masters and recover_locks. */
4630
log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4631
(unsigned long)r, r->res_name);
4632
dlm_send_rcom_lock(r, lkb);
4633
goto out;
4634
case -EEXIST:
4635
log_debug(ls, "master copy exists %x", lkb->lkb_id);
4636
/* fall through */
4637
case 0:
4638
lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
4639
break;
4640
default:
4641
log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4642
error, lkb->lkb_id);
4643
}
4644
4645
/* an ack for dlm_recover_locks() which waits for replies from
4646
all the locks it sends to new masters */
4647
dlm_recovered_lock(r);
4648
out:
4649
unlock_rsb(r);
4650
put_rsb(r);
4651
dlm_put_lkb(lkb);
4652
4653
return 0;
4654
}
4655
4656
int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4657
int mode, uint32_t flags, void *name, unsigned int namelen,
4658
unsigned long timeout_cs)
4659
{
4660
struct dlm_lkb *lkb;
4661
struct dlm_args args;
4662
int error;
4663
4664
dlm_lock_recovery(ls);
4665
4666
error = create_lkb(ls, &lkb);
4667
if (error) {
4668
kfree(ua);
4669
goto out;
4670
}
4671
4672
if (flags & DLM_LKF_VALBLK) {
4673
ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4674
if (!ua->lksb.sb_lvbptr) {
4675
kfree(ua);
4676
__put_lkb(ls, lkb);
4677
error = -ENOMEM;
4678
goto out;
4679
}
4680
}
4681
4682
/* After ua is attached to lkb it will be freed by dlm_free_lkb().
4683
When DLM_IFL_USER is set, the dlm knows that this is a userspace
4684
lock and that lkb_astparam is the dlm_user_args structure. */
4685
4686
error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4687
fake_astfn, ua, fake_bastfn, &args);
4688
lkb->lkb_flags |= DLM_IFL_USER;
4689
4690
if (error) {
4691
__put_lkb(ls, lkb);
4692
goto out;
4693
}
4694
4695
error = request_lock(ls, lkb, name, namelen, &args);
4696
4697
switch (error) {
4698
case 0:
4699
break;
4700
case -EINPROGRESS:
4701
error = 0;
4702
break;
4703
case -EAGAIN:
4704
error = 0;
4705
/* fall through */
4706
default:
4707
__put_lkb(ls, lkb);
4708
goto out;
4709
}
4710
4711
/* add this new lkb to the per-process list of locks */
4712
spin_lock(&ua->proc->locks_spin);
4713
hold_lkb(lkb);
4714
list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4715
spin_unlock(&ua->proc->locks_spin);
4716
out:
4717
dlm_unlock_recovery(ls);
4718
return error;
4719
}
4720
4721
int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4722
int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4723
unsigned long timeout_cs)
4724
{
4725
struct dlm_lkb *lkb;
4726
struct dlm_args args;
4727
struct dlm_user_args *ua;
4728
int error;
4729
4730
dlm_lock_recovery(ls);
4731
4732
error = find_lkb(ls, lkid, &lkb);
4733
if (error)
4734
goto out;
4735
4736
/* user can change the params on its lock when it converts it, or
4737
add an lvb that didn't exist before */
4738
4739
ua = lkb->lkb_ua;
4740
4741
if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4742
ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4743
if (!ua->lksb.sb_lvbptr) {
4744
error = -ENOMEM;
4745
goto out_put;
4746
}
4747
}
4748
if (lvb_in && ua->lksb.sb_lvbptr)
4749
memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4750
4751
ua->xid = ua_tmp->xid;
4752
ua->castparam = ua_tmp->castparam;
4753
ua->castaddr = ua_tmp->castaddr;
4754
ua->bastparam = ua_tmp->bastparam;
4755
ua->bastaddr = ua_tmp->bastaddr;
4756
ua->user_lksb = ua_tmp->user_lksb;
4757
4758
error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4759
fake_astfn, ua, fake_bastfn, &args);
4760
if (error)
4761
goto out_put;
4762
4763
error = convert_lock(ls, lkb, &args);
4764
4765
if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
4766
error = 0;
4767
out_put:
4768
dlm_put_lkb(lkb);
4769
out:
4770
dlm_unlock_recovery(ls);
4771
kfree(ua_tmp);
4772
return error;
4773
}
4774
4775
int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4776
uint32_t flags, uint32_t lkid, char *lvb_in)
4777
{
4778
struct dlm_lkb *lkb;
4779
struct dlm_args args;
4780
struct dlm_user_args *ua;
4781
int error;
4782
4783
dlm_lock_recovery(ls);
4784
4785
error = find_lkb(ls, lkid, &lkb);
4786
if (error)
4787
goto out;
4788
4789
ua = lkb->lkb_ua;
4790
4791
if (lvb_in && ua->lksb.sb_lvbptr)
4792
memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4793
if (ua_tmp->castparam)
4794
ua->castparam = ua_tmp->castparam;
4795
ua->user_lksb = ua_tmp->user_lksb;
4796
4797
error = set_unlock_args(flags, ua, &args);
4798
if (error)
4799
goto out_put;
4800
4801
error = unlock_lock(ls, lkb, &args);
4802
4803
if (error == -DLM_EUNLOCK)
4804
error = 0;
4805
/* from validate_unlock_args() */
4806
if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4807
error = 0;
4808
if (error)
4809
goto out_put;
4810
4811
spin_lock(&ua->proc->locks_spin);
4812
/* dlm_user_add_ast() may have already taken lkb off the proc list */
4813
if (!list_empty(&lkb->lkb_ownqueue))
4814
list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4815
spin_unlock(&ua->proc->locks_spin);
4816
out_put:
4817
dlm_put_lkb(lkb);
4818
out:
4819
dlm_unlock_recovery(ls);
4820
kfree(ua_tmp);
4821
return error;
4822
}
4823
4824
int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4825
uint32_t flags, uint32_t lkid)
4826
{
4827
struct dlm_lkb *lkb;
4828
struct dlm_args args;
4829
struct dlm_user_args *ua;
4830
int error;
4831
4832
dlm_lock_recovery(ls);
4833
4834
error = find_lkb(ls, lkid, &lkb);
4835
if (error)
4836
goto out;
4837
4838
ua = lkb->lkb_ua;
4839
if (ua_tmp->castparam)
4840
ua->castparam = ua_tmp->castparam;
4841
ua->user_lksb = ua_tmp->user_lksb;
4842
4843
error = set_unlock_args(flags, ua, &args);
4844
if (error)
4845
goto out_put;
4846
4847
error = cancel_lock(ls, lkb, &args);
4848
4849
if (error == -DLM_ECANCEL)
4850
error = 0;
4851
/* from validate_unlock_args() */
4852
if (error == -EBUSY)
4853
error = 0;
4854
out_put:
4855
dlm_put_lkb(lkb);
4856
out:
4857
dlm_unlock_recovery(ls);
4858
kfree(ua_tmp);
4859
return error;
4860
}
4861
4862
int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4863
{
4864
struct dlm_lkb *lkb;
4865
struct dlm_args args;
4866
struct dlm_user_args *ua;
4867
struct dlm_rsb *r;
4868
int error;
4869
4870
dlm_lock_recovery(ls);
4871
4872
error = find_lkb(ls, lkid, &lkb);
4873
if (error)
4874
goto out;
4875
4876
ua = lkb->lkb_ua;
4877
4878
error = set_unlock_args(flags, ua, &args);
4879
if (error)
4880
goto out_put;
4881
4882
/* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4883
4884
r = lkb->lkb_resource;
4885
hold_rsb(r);
4886
lock_rsb(r);
4887
4888
error = validate_unlock_args(lkb, &args);
4889
if (error)
4890
goto out_r;
4891
lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4892
4893
error = _cancel_lock(r, lkb);
4894
out_r:
4895
unlock_rsb(r);
4896
put_rsb(r);
4897
4898
if (error == -DLM_ECANCEL)
4899
error = 0;
4900
/* from validate_unlock_args() */
4901
if (error == -EBUSY)
4902
error = 0;
4903
out_put:
4904
dlm_put_lkb(lkb);
4905
out:
4906
dlm_unlock_recovery(ls);
4907
return error;
4908
}
4909
4910
/* lkb's that are removed from the waiters list by revert are just left on the
4911
orphans list with the granted orphan locks, to be freed by purge */
4912
4913
static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4914
{
4915
struct dlm_args args;
4916
int error;
4917
4918
hold_lkb(lkb);
4919
mutex_lock(&ls->ls_orphans_mutex);
4920
list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4921
mutex_unlock(&ls->ls_orphans_mutex);
4922
4923
set_unlock_args(0, lkb->lkb_ua, &args);
4924
4925
error = cancel_lock(ls, lkb, &args);
4926
if (error == -DLM_ECANCEL)
4927
error = 0;
4928
return error;
4929
}
4930
4931
/* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4932
Regardless of what rsb queue the lock is on, it's removed and freed. */
4933
4934
static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4935
{
4936
struct dlm_args args;
4937
int error;
4938
4939
set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
4940
4941
error = unlock_lock(ls, lkb, &args);
4942
if (error == -DLM_EUNLOCK)
4943
error = 0;
4944
return error;
4945
}
4946
4947
/* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4948
(which does lock_rsb) due to deadlock with receiving a message that does
4949
lock_rsb followed by dlm_user_add_ast() */
4950
4951
static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4952
struct dlm_user_proc *proc)
4953
{
4954
struct dlm_lkb *lkb = NULL;
4955
4956
mutex_lock(&ls->ls_clear_proc_locks);
4957
if (list_empty(&proc->locks))
4958
goto out;
4959
4960
lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4961
list_del_init(&lkb->lkb_ownqueue);
4962
4963
if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4964
lkb->lkb_flags |= DLM_IFL_ORPHAN;
4965
else
4966
lkb->lkb_flags |= DLM_IFL_DEAD;
4967
out:
4968
mutex_unlock(&ls->ls_clear_proc_locks);
4969
return lkb;
4970
}
4971
4972
/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4973
1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4974
which we clear here. */
4975
4976
/* proc CLOSING flag is set so no more device_reads should look at proc->asts
4977
list, and no more device_writes should add lkb's to proc->locks list; so we
4978
shouldn't need to take asts_spin or locks_spin here. this assumes that
4979
device reads/writes/closes are serialized -- FIXME: we may need to serialize
4980
them ourself. */
4981
4982
void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4983
{
4984
struct dlm_lkb *lkb, *safe;
4985
4986
dlm_lock_recovery(ls);
4987
4988
while (1) {
4989
lkb = del_proc_lock(ls, proc);
4990
if (!lkb)
4991
break;
4992
del_timeout(lkb);
4993
if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4994
orphan_proc_lock(ls, lkb);
4995
else
4996
unlock_proc_lock(ls, lkb);
4997
4998
/* this removes the reference for the proc->locks list
4999
added by dlm_user_request, it may result in the lkb
5000
being freed */
5001
5002
dlm_put_lkb(lkb);
5003
}
5004
5005
mutex_lock(&ls->ls_clear_proc_locks);
5006
5007
/* in-progress unlocks */
5008
list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5009
list_del_init(&lkb->lkb_ownqueue);
5010
lkb->lkb_flags |= DLM_IFL_DEAD;
5011
dlm_put_lkb(lkb);
5012
}
5013
5014
list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
5015
memset(&lkb->lkb_callbacks, 0,
5016
sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5017
list_del_init(&lkb->lkb_astqueue);
5018
dlm_put_lkb(lkb);
5019
}
5020
5021
mutex_unlock(&ls->ls_clear_proc_locks);
5022
dlm_unlock_recovery(ls);
5023
}
5024
5025
static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5026
{
5027
struct dlm_lkb *lkb, *safe;
5028
5029
while (1) {
5030
lkb = NULL;
5031
spin_lock(&proc->locks_spin);
5032
if (!list_empty(&proc->locks)) {
5033
lkb = list_entry(proc->locks.next, struct dlm_lkb,
5034
lkb_ownqueue);
5035
list_del_init(&lkb->lkb_ownqueue);
5036
}
5037
spin_unlock(&proc->locks_spin);
5038
5039
if (!lkb)
5040
break;
5041
5042
lkb->lkb_flags |= DLM_IFL_DEAD;
5043
unlock_proc_lock(ls, lkb);
5044
dlm_put_lkb(lkb); /* ref from proc->locks list */
5045
}
5046
5047
spin_lock(&proc->locks_spin);
5048
list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5049
list_del_init(&lkb->lkb_ownqueue);
5050
lkb->lkb_flags |= DLM_IFL_DEAD;
5051
dlm_put_lkb(lkb);
5052
}
5053
spin_unlock(&proc->locks_spin);
5054
5055
spin_lock(&proc->asts_spin);
5056
list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
5057
memset(&lkb->lkb_callbacks, 0,
5058
sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5059
list_del_init(&lkb->lkb_astqueue);
5060
dlm_put_lkb(lkb);
5061
}
5062
spin_unlock(&proc->asts_spin);
5063
}
5064
5065
/* pid of 0 means purge all orphans */
5066
5067
static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
5068
{
5069
struct dlm_lkb *lkb, *safe;
5070
5071
mutex_lock(&ls->ls_orphans_mutex);
5072
list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
5073
if (pid && lkb->lkb_ownpid != pid)
5074
continue;
5075
unlock_proc_lock(ls, lkb);
5076
list_del_init(&lkb->lkb_ownqueue);
5077
dlm_put_lkb(lkb);
5078
}
5079
mutex_unlock(&ls->ls_orphans_mutex);
5080
}
5081
5082
static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
5083
{
5084
struct dlm_message *ms;
5085
struct dlm_mhandle *mh;
5086
int error;
5087
5088
error = _create_message(ls, sizeof(struct dlm_message), nodeid,
5089
DLM_MSG_PURGE, &ms, &mh);
5090
if (error)
5091
return error;
5092
ms->m_nodeid = nodeid;
5093
ms->m_pid = pid;
5094
5095
return send_message(mh, ms);
5096
}
5097
5098
int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
5099
int nodeid, int pid)
5100
{
5101
int error = 0;
5102
5103
if (nodeid != dlm_our_nodeid()) {
5104
error = send_purge(ls, nodeid, pid);
5105
} else {
5106
dlm_lock_recovery(ls);
5107
if (pid == current->pid)
5108
purge_proc_locks(ls, proc);
5109
else
5110
do_purge(ls, nodeid, pid);
5111
dlm_unlock_recovery(ls);
5112
}
5113
return error;
5114
}
5115
5116
5117