Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/fs/dlm/rcom.c
26282 views
1
// SPDX-License-Identifier: GPL-2.0-only
2
/******************************************************************************
3
*******************************************************************************
4
**
5
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
6
** Copyright (C) 2005-2008 Red Hat, Inc. All rights reserved.
7
**
8
**
9
*******************************************************************************
10
******************************************************************************/
11
12
#include "dlm_internal.h"
13
#include "lockspace.h"
14
#include "member.h"
15
#include "lowcomms.h"
16
#include "midcomms.h"
17
#include "rcom.h"
18
#include "recover.h"
19
#include "dir.h"
20
#include "config.h"
21
#include "memory.h"
22
#include "lock.h"
23
#include "util.h"
24
25
static int rcom_response(struct dlm_ls *ls)
26
{
27
return test_bit(LSFL_RCOM_READY, &ls->ls_flags);
28
}
29
30
static void _create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
31
struct dlm_rcom **rc_ret, char *mb, int mb_len,
32
uint64_t seq)
33
{
34
struct dlm_rcom *rc;
35
36
rc = (struct dlm_rcom *) mb;
37
38
rc->rc_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
39
rc->rc_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id);
40
rc->rc_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
41
rc->rc_header.h_length = cpu_to_le16(mb_len);
42
rc->rc_header.h_cmd = DLM_RCOM;
43
44
rc->rc_type = cpu_to_le32(type);
45
rc->rc_seq = cpu_to_le64(seq);
46
47
*rc_ret = rc;
48
}
49
50
static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
51
struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret,
52
uint64_t seq)
53
{
54
int mb_len = sizeof(struct dlm_rcom) + len;
55
struct dlm_mhandle *mh;
56
char *mb;
57
58
mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb);
59
if (!mh) {
60
log_print("%s to %d type %d len %d ENOBUFS",
61
__func__, to_nodeid, type, len);
62
return -ENOBUFS;
63
}
64
65
_create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len, seq);
66
*mh_ret = mh;
67
return 0;
68
}
69
70
static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type,
71
int len, struct dlm_rcom **rc_ret,
72
struct dlm_msg **msg_ret, uint64_t seq)
73
{
74
int mb_len = sizeof(struct dlm_rcom) + len;
75
struct dlm_msg *msg;
76
char *mb;
77
78
msg = dlm_lowcomms_new_msg(to_nodeid, mb_len, &mb, NULL, NULL);
79
if (!msg) {
80
log_print("create_rcom to %d type %d len %d ENOBUFS",
81
to_nodeid, type, len);
82
return -ENOBUFS;
83
}
84
85
_create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len, seq);
86
*msg_ret = msg;
87
return 0;
88
}
89
90
static void send_rcom(struct dlm_mhandle *mh, struct dlm_rcom *rc)
91
{
92
dlm_midcomms_commit_mhandle(mh, NULL, 0);
93
}
94
95
static void send_rcom_stateless(struct dlm_msg *msg, struct dlm_rcom *rc)
96
{
97
dlm_lowcomms_commit_msg(msg);
98
dlm_lowcomms_put_msg(msg);
99
}
100
101
static void set_rcom_status(struct dlm_ls *ls, struct rcom_status *rs,
102
uint32_t flags)
103
{
104
rs->rs_flags = cpu_to_le32(flags);
105
}
106
107
/* When replying to a status request, a node also sends back its
108
configuration values. The requesting node then checks that the remote
109
node is configured the same way as itself. */
110
111
static void set_rcom_config(struct dlm_ls *ls, struct rcom_config *rf,
112
uint32_t num_slots)
113
{
114
rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen);
115
rf->rf_lsflags = cpu_to_le32(ls->ls_exflags);
116
117
rf->rf_our_slot = cpu_to_le16(ls->ls_slot);
118
rf->rf_num_slots = cpu_to_le16(num_slots);
119
rf->rf_generation = cpu_to_le32(ls->ls_generation);
120
}
121
122
static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
123
{
124
struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
125
126
if ((le32_to_cpu(rc->rc_header.h_version) & 0xFFFF0000) != DLM_HEADER_MAJOR) {
127
log_error(ls, "version mismatch: %x nodeid %d: %x",
128
DLM_HEADER_MAJOR | DLM_HEADER_MINOR, nodeid,
129
le32_to_cpu(rc->rc_header.h_version));
130
return -EPROTO;
131
}
132
133
if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen ||
134
le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) {
135
log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
136
ls->ls_lvblen, ls->ls_exflags, nodeid,
137
le32_to_cpu(rf->rf_lvblen),
138
le32_to_cpu(rf->rf_lsflags));
139
return -EPROTO;
140
}
141
return 0;
142
}
143
144
static void allow_sync_reply(struct dlm_ls *ls, __le64 *new_seq)
145
{
146
spin_lock_bh(&ls->ls_rcom_spin);
147
*new_seq = cpu_to_le64(++ls->ls_rcom_seq);
148
set_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
149
spin_unlock_bh(&ls->ls_rcom_spin);
150
}
151
152
static void disallow_sync_reply(struct dlm_ls *ls)
153
{
154
spin_lock_bh(&ls->ls_rcom_spin);
155
clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
156
clear_bit(LSFL_RCOM_READY, &ls->ls_flags);
157
spin_unlock_bh(&ls->ls_rcom_spin);
158
}
159
160
/*
161
* low nodeid gathers one slot value at a time from each node.
162
* it sets need_slots=0, and saves rf_our_slot returned from each
163
* rcom_config.
164
*
165
* other nodes gather all slot values at once from the low nodeid.
166
* they set need_slots=1, and ignore the rf_our_slot returned from each
167
* rcom_config. they use the rf_num_slots returned from the low
168
* node's rcom_config.
169
*/
170
171
int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags,
172
uint64_t seq)
173
{
174
struct dlm_rcom *rc;
175
struct dlm_msg *msg;
176
int error = 0;
177
178
ls->ls_recover_nodeid = nodeid;
179
180
if (nodeid == dlm_our_nodeid()) {
181
rc = ls->ls_recover_buf;
182
rc->rc_result = cpu_to_le32(dlm_recover_status(ls));
183
goto out;
184
}
185
186
retry:
187
error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS,
188
sizeof(struct rcom_status), &rc, &msg,
189
seq);
190
if (error)
191
goto out;
192
193
set_rcom_status(ls, (struct rcom_status *)rc->rc_buf, status_flags);
194
195
allow_sync_reply(ls, &rc->rc_id);
196
memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE);
197
198
send_rcom_stateless(msg, rc);
199
200
error = dlm_wait_function(ls, &rcom_response);
201
disallow_sync_reply(ls);
202
if (error == -ETIMEDOUT)
203
goto retry;
204
if (error)
205
goto out;
206
207
rc = ls->ls_recover_buf;
208
209
if (rc->rc_result == cpu_to_le32(-ESRCH)) {
210
/* we pretend the remote lockspace exists with 0 status */
211
log_debug(ls, "remote node %d not ready", nodeid);
212
rc->rc_result = 0;
213
error = 0;
214
} else {
215
error = check_rcom_config(ls, rc, nodeid);
216
}
217
218
/* the caller looks at rc_result for the remote recovery status */
219
out:
220
return error;
221
}
222
223
static void receive_rcom_status(struct dlm_ls *ls,
224
const struct dlm_rcom *rc_in,
225
uint64_t seq)
226
{
227
struct dlm_rcom *rc;
228
struct rcom_status *rs;
229
uint32_t status;
230
int nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid);
231
int len = sizeof(struct rcom_config);
232
struct dlm_msg *msg;
233
int num_slots = 0;
234
int error;
235
236
if (!dlm_slots_version(&rc_in->rc_header)) {
237
status = dlm_recover_status(ls);
238
goto do_create;
239
}
240
241
rs = (struct rcom_status *)rc_in->rc_buf;
242
243
if (!(le32_to_cpu(rs->rs_flags) & DLM_RSF_NEED_SLOTS)) {
244
status = dlm_recover_status(ls);
245
goto do_create;
246
}
247
248
spin_lock_bh(&ls->ls_recover_lock);
249
status = ls->ls_recover_status;
250
num_slots = ls->ls_num_slots;
251
spin_unlock_bh(&ls->ls_recover_lock);
252
len += num_slots * sizeof(struct rcom_slot);
253
254
do_create:
255
error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS_REPLY,
256
len, &rc, &msg, seq);
257
if (error)
258
return;
259
260
rc->rc_id = rc_in->rc_id;
261
rc->rc_seq_reply = rc_in->rc_seq;
262
rc->rc_result = cpu_to_le32(status);
263
264
set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, num_slots);
265
266
if (!num_slots)
267
goto do_send;
268
269
spin_lock_bh(&ls->ls_recover_lock);
270
if (ls->ls_num_slots != num_slots) {
271
spin_unlock_bh(&ls->ls_recover_lock);
272
log_debug(ls, "receive_rcom_status num_slots %d to %d",
273
num_slots, ls->ls_num_slots);
274
rc->rc_result = 0;
275
set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, 0);
276
goto do_send;
277
}
278
279
dlm_slots_copy_out(ls, rc);
280
spin_unlock_bh(&ls->ls_recover_lock);
281
282
do_send:
283
send_rcom_stateless(msg, rc);
284
}
285
286
static void receive_sync_reply(struct dlm_ls *ls, const struct dlm_rcom *rc_in)
287
{
288
spin_lock_bh(&ls->ls_rcom_spin);
289
if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) ||
290
le64_to_cpu(rc_in->rc_id) != ls->ls_rcom_seq) {
291
log_debug(ls, "reject reply %d from %d seq %llx expect %llx",
292
le32_to_cpu(rc_in->rc_type),
293
le32_to_cpu(rc_in->rc_header.h_nodeid),
294
(unsigned long long)le64_to_cpu(rc_in->rc_id),
295
(unsigned long long)ls->ls_rcom_seq);
296
goto out;
297
}
298
memcpy(ls->ls_recover_buf, rc_in,
299
le16_to_cpu(rc_in->rc_header.h_length));
300
set_bit(LSFL_RCOM_READY, &ls->ls_flags);
301
clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
302
wake_up(&ls->ls_wait_general);
303
out:
304
spin_unlock_bh(&ls->ls_rcom_spin);
305
}
306
307
int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,
308
int last_len, uint64_t seq)
309
{
310
struct dlm_mhandle *mh;
311
struct dlm_rcom *rc;
312
int error = 0;
313
314
ls->ls_recover_nodeid = nodeid;
315
316
retry:
317
error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len,
318
&rc, &mh, seq);
319
if (error)
320
goto out;
321
memcpy(rc->rc_buf, last_name, last_len);
322
323
allow_sync_reply(ls, &rc->rc_id);
324
memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE);
325
326
send_rcom(mh, rc);
327
328
error = dlm_wait_function(ls, &rcom_response);
329
disallow_sync_reply(ls);
330
if (error == -ETIMEDOUT)
331
goto retry;
332
out:
333
return error;
334
}
335
336
static void receive_rcom_names(struct dlm_ls *ls, const struct dlm_rcom *rc_in,
337
uint64_t seq)
338
{
339
struct dlm_mhandle *mh;
340
struct dlm_rcom *rc;
341
int error, inlen, outlen, nodeid;
342
343
nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid);
344
inlen = le16_to_cpu(rc_in->rc_header.h_length) -
345
sizeof(struct dlm_rcom);
346
outlen = DLM_MAX_APP_BUFSIZE - sizeof(struct dlm_rcom);
347
348
error = create_rcom(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen,
349
&rc, &mh, seq);
350
if (error)
351
return;
352
rc->rc_id = rc_in->rc_id;
353
rc->rc_seq_reply = rc_in->rc_seq;
354
355
dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
356
nodeid);
357
send_rcom(mh, rc);
358
}
359
360
int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid, uint64_t seq)
361
{
362
struct dlm_rcom *rc;
363
struct dlm_mhandle *mh;
364
struct dlm_ls *ls = r->res_ls;
365
int error;
366
367
error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length,
368
&rc, &mh, seq);
369
if (error)
370
goto out;
371
memcpy(rc->rc_buf, r->res_name, r->res_length);
372
rc->rc_id = cpu_to_le64(r->res_id);
373
374
send_rcom(mh, rc);
375
out:
376
return error;
377
}
378
379
static void receive_rcom_lookup(struct dlm_ls *ls,
380
const struct dlm_rcom *rc_in, uint64_t seq)
381
{
382
struct dlm_rcom *rc;
383
struct dlm_mhandle *mh;
384
int error, ret_nodeid, nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid);
385
int len = le16_to_cpu(rc_in->rc_header.h_length) -
386
sizeof(struct dlm_rcom);
387
388
/* Old code would send this special id to trigger a debug dump. */
389
if (rc_in->rc_id == cpu_to_le64(0xFFFFFFFF)) {
390
log_error(ls, "receive_rcom_lookup dump from %d", nodeid);
391
dlm_dump_rsb_name(ls, rc_in->rc_buf, len);
392
return;
393
}
394
395
error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh,
396
seq);
397
if (error)
398
return;
399
400
error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len,
401
DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL);
402
if (error)
403
ret_nodeid = error;
404
rc->rc_result = cpu_to_le32(ret_nodeid);
405
rc->rc_id = rc_in->rc_id;
406
rc->rc_seq_reply = rc_in->rc_seq;
407
408
send_rcom(mh, rc);
409
}
410
411
static void receive_rcom_lookup_reply(struct dlm_ls *ls,
412
const struct dlm_rcom *rc_in)
413
{
414
dlm_recover_master_reply(ls, rc_in);
415
}
416
417
static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb,
418
struct rcom_lock *rl)
419
{
420
memset(rl, 0, sizeof(*rl));
421
422
rl->rl_ownpid = cpu_to_le32(lkb->lkb_ownpid);
423
rl->rl_lkid = cpu_to_le32(lkb->lkb_id);
424
rl->rl_exflags = cpu_to_le32(lkb->lkb_exflags);
425
rl->rl_flags = cpu_to_le32(dlm_dflags_val(lkb));
426
rl->rl_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
427
rl->rl_rqmode = lkb->lkb_rqmode;
428
rl->rl_grmode = lkb->lkb_grmode;
429
rl->rl_status = lkb->lkb_status;
430
rl->rl_wait_type = cpu_to_le16(lkb->lkb_wait_type);
431
432
if (lkb->lkb_bastfn)
433
rl->rl_asts |= DLM_CB_BAST;
434
if (lkb->lkb_astfn)
435
rl->rl_asts |= DLM_CB_CAST;
436
437
rl->rl_namelen = cpu_to_le16(r->res_length);
438
memcpy(rl->rl_name, r->res_name, r->res_length);
439
440
/* FIXME: might we have an lvb without DLM_LKF_VALBLK set ?
441
If so, receive_rcom_lock_args() won't take this copy. */
442
443
if (lkb->lkb_lvbptr)
444
memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
445
}
446
447
int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb, uint64_t seq)
448
{
449
struct dlm_ls *ls = r->res_ls;
450
struct dlm_rcom *rc;
451
struct dlm_mhandle *mh;
452
struct rcom_lock *rl;
453
int error, len = sizeof(struct rcom_lock);
454
455
if (lkb->lkb_lvbptr)
456
len += ls->ls_lvblen;
457
458
error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh,
459
seq);
460
if (error)
461
goto out;
462
463
rl = (struct rcom_lock *) rc->rc_buf;
464
pack_rcom_lock(r, lkb, rl);
465
rc->rc_id = cpu_to_le64((uintptr_t)r);
466
467
send_rcom(mh, rc);
468
out:
469
return error;
470
}
471
472
/* needs at least dlm_rcom + rcom_lock */
473
static void receive_rcom_lock(struct dlm_ls *ls, const struct dlm_rcom *rc_in,
474
uint64_t seq)
475
{
476
__le32 rl_remid, rl_result;
477
struct rcom_lock *rl;
478
struct dlm_rcom *rc;
479
struct dlm_mhandle *mh;
480
int error, nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid);
481
482
dlm_recover_master_copy(ls, rc_in, &rl_remid, &rl_result);
483
484
error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY,
485
sizeof(struct rcom_lock), &rc, &mh, seq);
486
if (error)
487
return;
488
489
memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock));
490
rl = (struct rcom_lock *)rc->rc_buf;
491
/* set rl_remid and rl_result from dlm_recover_master_copy() */
492
rl->rl_remid = rl_remid;
493
rl->rl_result = rl_result;
494
495
rc->rc_id = rc_in->rc_id;
496
rc->rc_seq_reply = rc_in->rc_seq;
497
498
send_rcom(mh, rc);
499
}
500
501
/* If the lockspace doesn't exist then still send a status message
502
back; it's possible that it just doesn't have its global_id yet. */
503
504
int dlm_send_ls_not_ready(int nodeid, const struct dlm_rcom *rc_in)
505
{
506
struct dlm_rcom *rc;
507
struct rcom_config *rf;
508
struct dlm_mhandle *mh;
509
char *mb;
510
int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
511
512
mh = dlm_midcomms_get_mhandle(nodeid, mb_len, &mb);
513
if (!mh)
514
return -ENOBUFS;
515
516
rc = (struct dlm_rcom *) mb;
517
518
rc->rc_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
519
rc->rc_header.u.h_lockspace = rc_in->rc_header.u.h_lockspace;
520
rc->rc_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
521
rc->rc_header.h_length = cpu_to_le16(mb_len);
522
rc->rc_header.h_cmd = DLM_RCOM;
523
524
rc->rc_type = cpu_to_le32(DLM_RCOM_STATUS_REPLY);
525
rc->rc_id = rc_in->rc_id;
526
rc->rc_seq_reply = rc_in->rc_seq;
527
rc->rc_result = cpu_to_le32(-ESRCH);
528
529
rf = (struct rcom_config *) rc->rc_buf;
530
rf->rf_lvblen = cpu_to_le32(~0U);
531
532
dlm_midcomms_commit_mhandle(mh, NULL, 0);
533
534
return 0;
535
}
536
537
/*
538
* Ignore messages for stage Y before we set
539
* recover_status bit for stage X:
540
*
541
* recover_status = 0
542
*
543
* dlm_recover_members()
544
* - send nothing
545
* - recv nothing
546
* - ignore NAMES, NAMES_REPLY
547
* - ignore LOOKUP, LOOKUP_REPLY
548
* - ignore LOCK, LOCK_REPLY
549
*
550
* recover_status |= NODES
551
*
552
* dlm_recover_members_wait()
553
*
554
* dlm_recover_directory()
555
* - send NAMES
556
* - recv NAMES_REPLY
557
* - ignore LOOKUP, LOOKUP_REPLY
558
* - ignore LOCK, LOCK_REPLY
559
*
560
* recover_status |= DIR
561
*
562
* dlm_recover_directory_wait()
563
*
564
* dlm_recover_masters()
565
* - send LOOKUP
566
* - recv LOOKUP_REPLY
567
*
568
* dlm_recover_locks()
569
* - send LOCKS
570
* - recv LOCKS_REPLY
571
*
572
* recover_status |= LOCKS
573
*
574
* dlm_recover_locks_wait()
575
*
576
* recover_status |= DONE
577
*/
578
579
/* Called by dlm_recv; corresponds to dlm_receive_message() but special
580
recovery-only comms are sent through here. */
581
582
void dlm_receive_rcom(struct dlm_ls *ls, const struct dlm_rcom *rc, int nodeid)
583
{
584
int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
585
int stop, reply = 0, names = 0, lookup = 0, lock = 0;
586
uint32_t status;
587
uint64_t seq;
588
589
switch (rc->rc_type) {
590
case cpu_to_le32(DLM_RCOM_STATUS_REPLY):
591
reply = 1;
592
break;
593
case cpu_to_le32(DLM_RCOM_NAMES):
594
names = 1;
595
break;
596
case cpu_to_le32(DLM_RCOM_NAMES_REPLY):
597
names = 1;
598
reply = 1;
599
break;
600
case cpu_to_le32(DLM_RCOM_LOOKUP):
601
lookup = 1;
602
break;
603
case cpu_to_le32(DLM_RCOM_LOOKUP_REPLY):
604
lookup = 1;
605
reply = 1;
606
break;
607
case cpu_to_le32(DLM_RCOM_LOCK):
608
lock = 1;
609
break;
610
case cpu_to_le32(DLM_RCOM_LOCK_REPLY):
611
lock = 1;
612
reply = 1;
613
break;
614
}
615
616
spin_lock_bh(&ls->ls_recover_lock);
617
status = ls->ls_recover_status;
618
stop = dlm_recovery_stopped(ls);
619
seq = ls->ls_recover_seq;
620
spin_unlock_bh(&ls->ls_recover_lock);
621
622
if (stop && (rc->rc_type != cpu_to_le32(DLM_RCOM_STATUS)))
623
goto ignore;
624
625
if (reply && (le64_to_cpu(rc->rc_seq_reply) != seq))
626
goto ignore;
627
628
if (!(status & DLM_RS_NODES) && (names || lookup || lock))
629
goto ignore;
630
631
if (!(status & DLM_RS_DIR) && (lookup || lock))
632
goto ignore;
633
634
switch (rc->rc_type) {
635
case cpu_to_le32(DLM_RCOM_STATUS):
636
receive_rcom_status(ls, rc, seq);
637
break;
638
639
case cpu_to_le32(DLM_RCOM_NAMES):
640
receive_rcom_names(ls, rc, seq);
641
break;
642
643
case cpu_to_le32(DLM_RCOM_LOOKUP):
644
receive_rcom_lookup(ls, rc, seq);
645
break;
646
647
case cpu_to_le32(DLM_RCOM_LOCK):
648
if (le16_to_cpu(rc->rc_header.h_length) < lock_size)
649
goto Eshort;
650
receive_rcom_lock(ls, rc, seq);
651
break;
652
653
case cpu_to_le32(DLM_RCOM_STATUS_REPLY):
654
receive_sync_reply(ls, rc);
655
break;
656
657
case cpu_to_le32(DLM_RCOM_NAMES_REPLY):
658
receive_sync_reply(ls, rc);
659
break;
660
661
case cpu_to_le32(DLM_RCOM_LOOKUP_REPLY):
662
receive_rcom_lookup_reply(ls, rc);
663
break;
664
665
case cpu_to_le32(DLM_RCOM_LOCK_REPLY):
666
if (le16_to_cpu(rc->rc_header.h_length) < lock_size)
667
goto Eshort;
668
dlm_recover_process_copy(ls, rc, seq);
669
break;
670
671
default:
672
log_error(ls, "receive_rcom bad type %d",
673
le32_to_cpu(rc->rc_type));
674
}
675
return;
676
677
ignore:
678
log_limit(ls, "dlm_receive_rcom ignore msg %d "
679
"from %d %llu %llu recover seq %llu sts %x gen %u",
680
le32_to_cpu(rc->rc_type),
681
nodeid,
682
(unsigned long long)le64_to_cpu(rc->rc_seq),
683
(unsigned long long)le64_to_cpu(rc->rc_seq_reply),
684
(unsigned long long)seq,
685
status, ls->ls_generation);
686
return;
687
Eshort:
688
log_error(ls, "recovery message %d from %d is too short",
689
le32_to_cpu(rc->rc_type), nodeid);
690
}
691
692
693