Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/fs/dlm/requestqueue.c
26282 views
1
// SPDX-License-Identifier: GPL-2.0-only
2
/******************************************************************************
3
*******************************************************************************
4
**
5
** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved.
6
**
7
**
8
*******************************************************************************
9
******************************************************************************/
10
11
#include "dlm_internal.h"
12
#include "member.h"
13
#include "lock.h"
14
#include "dir.h"
15
#include "config.h"
16
#include "requestqueue.h"
17
#include "util.h"
18
19
struct rq_entry {
20
struct list_head list;
21
uint32_t recover_seq;
22
int nodeid;
23
struct dlm_message request;
24
};
25
26
/*
27
* Requests received while the lockspace is in recovery get added to the
28
* request queue and processed when recovery is complete. This happens when
29
* the lockspace is suspended on some nodes before it is on others, or the
30
* lockspace is enabled on some while still suspended on others.
31
*/
32
33
void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid,
34
const struct dlm_message *ms)
35
{
36
struct rq_entry *e;
37
int length = le16_to_cpu(ms->m_header.h_length) -
38
sizeof(struct dlm_message);
39
40
e = kmalloc(sizeof(struct rq_entry) + length, GFP_ATOMIC);
41
if (!e) {
42
log_print("dlm_add_requestqueue: out of memory len %d", length);
43
return;
44
}
45
46
e->recover_seq = ls->ls_recover_seq & 0xFFFFFFFF;
47
e->nodeid = nodeid;
48
memcpy(&e->request, ms, sizeof(*ms));
49
memcpy(&e->request.m_extra, ms->m_extra, length);
50
51
list_add_tail(&e->list, &ls->ls_requestqueue);
52
}
53
54
/*
55
* Called by dlm_recoverd to process normal messages saved while recovery was
56
* happening. Normal locking has been enabled before this is called. dlm_recv
57
* upon receiving a message, will wait for all saved messages to be drained
58
* here before processing the message it got. If a new dlm_ls_stop() arrives
59
* while we're processing these saved messages, it may block trying to suspend
60
* dlm_recv if dlm_recv is waiting for us in dlm_wait_requestqueue. In that
61
* case, we don't abort since locking_stopped is still 0. If dlm_recv is not
62
* waiting for us, then this processing may be aborted due to locking_stopped.
63
*/
64
65
int dlm_process_requestqueue(struct dlm_ls *ls)
66
{
67
struct rq_entry *e;
68
struct dlm_message *ms;
69
int error = 0;
70
71
write_lock_bh(&ls->ls_requestqueue_lock);
72
for (;;) {
73
if (list_empty(&ls->ls_requestqueue)) {
74
clear_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags);
75
error = 0;
76
break;
77
}
78
e = list_first_entry(&ls->ls_requestqueue, struct rq_entry, list);
79
80
ms = &e->request;
81
82
log_limit(ls, "dlm_process_requestqueue msg %d from %d "
83
"lkid %x remid %x result %d seq %u",
84
le32_to_cpu(ms->m_type),
85
le32_to_cpu(ms->m_header.h_nodeid),
86
le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
87
from_dlm_errno(le32_to_cpu(ms->m_result)),
88
e->recover_seq);
89
90
dlm_receive_message_saved(ls, &e->request, e->recover_seq);
91
list_del(&e->list);
92
kfree(e);
93
94
if (dlm_locking_stopped(ls)) {
95
log_debug(ls, "process_requestqueue abort running");
96
error = -EINTR;
97
break;
98
}
99
write_unlock_bh(&ls->ls_requestqueue_lock);
100
schedule();
101
write_lock_bh(&ls->ls_requestqueue_lock);
102
}
103
write_unlock_bh(&ls->ls_requestqueue_lock);
104
105
return error;
106
}
107
108
static int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid)
109
{
110
__le32 type = ms->m_type;
111
112
/* the ls is being cleaned up and freed by release_lockspace */
113
if (!atomic_read(&ls->ls_count))
114
return 1;
115
116
if (dlm_is_removed(ls, nodeid))
117
return 1;
118
119
/* directory operations are always purged because the directory is
120
always rebuilt during recovery and the lookups resent */
121
122
if (type == cpu_to_le32(DLM_MSG_REMOVE) ||
123
type == cpu_to_le32(DLM_MSG_LOOKUP) ||
124
type == cpu_to_le32(DLM_MSG_LOOKUP_REPLY))
125
return 1;
126
127
if (!dlm_no_directory(ls))
128
return 0;
129
130
return 1;
131
}
132
133
void dlm_purge_requestqueue(struct dlm_ls *ls)
134
{
135
struct dlm_message *ms;
136
struct rq_entry *e, *safe;
137
138
write_lock_bh(&ls->ls_requestqueue_lock);
139
list_for_each_entry_safe(e, safe, &ls->ls_requestqueue, list) {
140
ms = &e->request;
141
142
if (purge_request(ls, ms, e->nodeid)) {
143
list_del(&e->list);
144
kfree(e);
145
}
146
}
147
write_unlock_bh(&ls->ls_requestqueue_lock);
148
}
149
150
151