Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/fs/dlm/dir.c
26285 views
1
// SPDX-License-Identifier: GPL-2.0-only
2
/******************************************************************************
3
*******************************************************************************
4
**
5
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
6
** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
7
**
8
**
9
*******************************************************************************
10
******************************************************************************/
11
12
#include "dlm_internal.h"
13
#include "lockspace.h"
14
#include "member.h"
15
#include "lowcomms.h"
16
#include "rcom.h"
17
#include "config.h"
18
#include "memory.h"
19
#include "recover.h"
20
#include "util.h"
21
#include "lock.h"
22
#include "dir.h"
23
24
/*
25
* We use the upper 16 bits of the hash value to select the directory node.
26
* Low bits are used for distribution of rsb's among hash buckets on each node.
27
*
28
* To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
29
* num_nodes to the hash value. This value in the desired range is used as an
30
* offset into the sorted list of nodeid's to give the particular nodeid.
31
*/
32
33
int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
34
{
35
uint32_t node;
36
37
if (ls->ls_num_nodes == 1)
38
return dlm_our_nodeid();
39
else {
40
node = (hash >> 16) % ls->ls_total_weight;
41
return ls->ls_node_array[node];
42
}
43
}
44
45
int dlm_dir_nodeid(struct dlm_rsb *r)
46
{
47
return r->res_dir_nodeid;
48
}
49
50
void dlm_recover_dir_nodeid(struct dlm_ls *ls, const struct list_head *root_list)
51
{
52
struct dlm_rsb *r;
53
54
list_for_each_entry(r, root_list, res_root_list) {
55
r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
56
}
57
}
58
59
int dlm_recover_directory(struct dlm_ls *ls, uint64_t seq)
60
{
61
struct dlm_member *memb;
62
char *b, *last_name = NULL;
63
int error = -ENOMEM, last_len, nodeid, result;
64
uint16_t namelen;
65
unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0;
66
67
log_rinfo(ls, "dlm_recover_directory");
68
69
if (dlm_no_directory(ls))
70
goto out_status;
71
72
last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
73
if (!last_name)
74
goto out;
75
76
list_for_each_entry(memb, &ls->ls_nodes, list) {
77
if (memb->nodeid == dlm_our_nodeid())
78
continue;
79
80
memset(last_name, 0, DLM_RESNAME_MAXLEN);
81
last_len = 0;
82
83
for (;;) {
84
int left;
85
if (dlm_recovery_stopped(ls)) {
86
error = -EINTR;
87
goto out_free;
88
}
89
90
error = dlm_rcom_names(ls, memb->nodeid,
91
last_name, last_len, seq);
92
if (error)
93
goto out_free;
94
95
cond_resched();
96
97
/*
98
* pick namelen/name pairs out of received buffer
99
*/
100
101
b = ls->ls_recover_buf->rc_buf;
102
left = le16_to_cpu(ls->ls_recover_buf->rc_header.h_length);
103
left -= sizeof(struct dlm_rcom);
104
105
for (;;) {
106
__be16 v;
107
108
error = -EINVAL;
109
if (left < sizeof(__be16))
110
goto out_free;
111
112
memcpy(&v, b, sizeof(__be16));
113
namelen = be16_to_cpu(v);
114
b += sizeof(__be16);
115
left -= sizeof(__be16);
116
117
/* namelen of 0xFFFFF marks end of names for
118
this node; namelen of 0 marks end of the
119
buffer */
120
121
if (namelen == 0xFFFF)
122
goto done;
123
if (!namelen)
124
break;
125
126
if (namelen > left)
127
goto out_free;
128
129
if (namelen > DLM_RESNAME_MAXLEN)
130
goto out_free;
131
132
error = dlm_master_lookup(ls, memb->nodeid,
133
b, namelen,
134
DLM_LU_RECOVER_DIR,
135
&nodeid, &result);
136
if (error) {
137
log_error(ls, "recover_dir lookup %d",
138
error);
139
goto out_free;
140
}
141
142
/* The name was found in rsbtbl, but the
143
* master nodeid is different from
144
* memb->nodeid which says it is the master.
145
* This should not happen. */
146
147
if (result == DLM_LU_MATCH &&
148
nodeid != memb->nodeid) {
149
count_bad++;
150
log_error(ls, "recover_dir lookup %d "
151
"nodeid %d memb %d bad %u",
152
result, nodeid, memb->nodeid,
153
count_bad);
154
print_hex_dump_bytes("dlm_recover_dir ",
155
DUMP_PREFIX_NONE,
156
b, namelen);
157
}
158
159
/* The name was found in rsbtbl, and the
160
* master nodeid matches memb->nodeid. */
161
162
if (result == DLM_LU_MATCH &&
163
nodeid == memb->nodeid) {
164
count_match++;
165
}
166
167
/* The name was not found in rsbtbl and was
168
* added with memb->nodeid as the master. */
169
170
if (result == DLM_LU_ADD) {
171
count_add++;
172
}
173
174
last_len = namelen;
175
memcpy(last_name, b, namelen);
176
b += namelen;
177
left -= namelen;
178
count++;
179
}
180
}
181
done:
182
;
183
}
184
185
out_status:
186
error = 0;
187
dlm_set_recover_status(ls, DLM_RS_DIR);
188
189
log_rinfo(ls, "dlm_recover_directory %u in %u new",
190
count, count_add);
191
out_free:
192
kfree(last_name);
193
out:
194
return error;
195
}
196
197
static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
198
int len)
199
{
200
struct dlm_rsb *r;
201
int rv;
202
203
read_lock_bh(&ls->ls_rsbtbl_lock);
204
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
205
read_unlock_bh(&ls->ls_rsbtbl_lock);
206
if (!rv)
207
return r;
208
209
list_for_each_entry(r, &ls->ls_masters_list, res_masters_list) {
210
if (len == r->res_length && !memcmp(name, r->res_name, len)) {
211
log_debug(ls, "find_rsb_root revert to root_list %s",
212
r->res_name);
213
return r;
214
}
215
}
216
return NULL;
217
}
218
219
struct dlm_dir_dump {
220
/* init values to match if whole
221
* dump fits to one seq. Sanity check only.
222
*/
223
uint64_t seq_init;
224
uint64_t nodeid_init;
225
/* compare local pointer with last lookup,
226
* just a sanity check.
227
*/
228
struct list_head *last;
229
230
unsigned int sent_res; /* for log info */
231
unsigned int sent_msg; /* for log info */
232
233
struct list_head list;
234
};
235
236
static void drop_dir_ctx(struct dlm_ls *ls, int nodeid)
237
{
238
struct dlm_dir_dump *dd, *safe;
239
240
write_lock_bh(&ls->ls_dir_dump_lock);
241
list_for_each_entry_safe(dd, safe, &ls->ls_dir_dump_list, list) {
242
if (dd->nodeid_init == nodeid) {
243
log_error(ls, "drop dump seq %llu",
244
(unsigned long long)dd->seq_init);
245
list_del(&dd->list);
246
kfree(dd);
247
}
248
}
249
write_unlock_bh(&ls->ls_dir_dump_lock);
250
}
251
252
static struct dlm_dir_dump *lookup_dir_dump(struct dlm_ls *ls, int nodeid)
253
{
254
struct dlm_dir_dump *iter, *dd = NULL;
255
256
read_lock_bh(&ls->ls_dir_dump_lock);
257
list_for_each_entry(iter, &ls->ls_dir_dump_list, list) {
258
if (iter->nodeid_init == nodeid) {
259
dd = iter;
260
break;
261
}
262
}
263
read_unlock_bh(&ls->ls_dir_dump_lock);
264
265
return dd;
266
}
267
268
static struct dlm_dir_dump *init_dir_dump(struct dlm_ls *ls, int nodeid)
269
{
270
struct dlm_dir_dump *dd;
271
272
dd = lookup_dir_dump(ls, nodeid);
273
if (dd) {
274
log_error(ls, "found ongoing dir dump for node %d, will drop it",
275
nodeid);
276
drop_dir_ctx(ls, nodeid);
277
}
278
279
dd = kzalloc(sizeof(*dd), GFP_ATOMIC);
280
if (!dd)
281
return NULL;
282
283
dd->seq_init = ls->ls_recover_seq;
284
dd->nodeid_init = nodeid;
285
286
write_lock_bh(&ls->ls_dir_dump_lock);
287
list_add(&dd->list, &ls->ls_dir_dump_list);
288
write_unlock_bh(&ls->ls_dir_dump_lock);
289
290
return dd;
291
}
292
293
/* Find the rsb where we left off (or start again), then send rsb names
294
for rsb's we're master of and whose directory node matches the requesting
295
node. inbuf is the rsb name last sent, inlen is the name's length */
296
297
void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
298
char *outbuf, int outlen, int nodeid)
299
{
300
struct list_head *list;
301
struct dlm_rsb *r;
302
int offset = 0, dir_nodeid;
303
struct dlm_dir_dump *dd;
304
__be16 be_namelen;
305
306
read_lock_bh(&ls->ls_masters_lock);
307
308
if (inlen > 1) {
309
dd = lookup_dir_dump(ls, nodeid);
310
if (!dd) {
311
log_error(ls, "failed to lookup dir dump context nodeid: %d",
312
nodeid);
313
goto out;
314
}
315
316
/* next chunk in dump */
317
r = find_rsb_root(ls, inbuf, inlen);
318
if (!r) {
319
log_error(ls, "copy_master_names from %d start %d %.*s",
320
nodeid, inlen, inlen, inbuf);
321
goto out;
322
}
323
list = r->res_masters_list.next;
324
325
/* sanity checks */
326
if (dd->last != &r->res_masters_list ||
327
dd->seq_init != ls->ls_recover_seq) {
328
log_error(ls, "failed dir dump sanity check seq_init: %llu seq: %llu",
329
(unsigned long long)dd->seq_init,
330
(unsigned long long)ls->ls_recover_seq);
331
goto out;
332
}
333
} else {
334
dd = init_dir_dump(ls, nodeid);
335
if (!dd) {
336
log_error(ls, "failed to allocate dir dump context");
337
goto out;
338
}
339
340
/* start dump */
341
list = ls->ls_masters_list.next;
342
dd->last = list;
343
}
344
345
for (offset = 0; list != &ls->ls_masters_list; list = list->next) {
346
r = list_entry(list, struct dlm_rsb, res_masters_list);
347
dir_nodeid = dlm_dir_nodeid(r);
348
if (dir_nodeid != nodeid)
349
continue;
350
351
/*
352
* The block ends when we can't fit the following in the
353
* remaining buffer space:
354
* namelen (uint16_t) +
355
* name (r->res_length) +
356
* end-of-block record 0x0000 (uint16_t)
357
*/
358
359
if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
360
/* Write end-of-block record */
361
be_namelen = cpu_to_be16(0);
362
memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
363
offset += sizeof(__be16);
364
dd->sent_msg++;
365
goto out;
366
}
367
368
be_namelen = cpu_to_be16(r->res_length);
369
memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
370
offset += sizeof(__be16);
371
memcpy(outbuf + offset, r->res_name, r->res_length);
372
offset += r->res_length;
373
dd->sent_res++;
374
dd->last = list;
375
}
376
377
/*
378
* If we've reached the end of the list (and there's room) write a
379
* terminating record.
380
*/
381
382
if ((list == &ls->ls_masters_list) &&
383
(offset + sizeof(uint16_t) <= outlen)) {
384
/* end dump */
385
be_namelen = cpu_to_be16(0xFFFF);
386
memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
387
offset += sizeof(__be16);
388
dd->sent_msg++;
389
log_rinfo(ls, "dlm_recover_directory nodeid %d sent %u res out %u messages",
390
nodeid, dd->sent_res, dd->sent_msg);
391
392
write_lock_bh(&ls->ls_dir_dump_lock);
393
list_del_init(&dd->list);
394
write_unlock_bh(&ls->ls_dir_dump_lock);
395
kfree(dd);
396
}
397
out:
398
read_unlock_bh(&ls->ls_masters_lock);
399
}
400
401
402