Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
awilliam
GitHub Repository: awilliam/linux-vfio
Path: blob/master/fs/ceph/mds_client.c
15109 views
1
#include <linux/ceph/ceph_debug.h>
2
3
#include <linux/fs.h>
4
#include <linux/wait.h>
5
#include <linux/slab.h>
6
#include <linux/sched.h>
7
#include <linux/debugfs.h>
8
#include <linux/seq_file.h>
9
10
#include "super.h"
11
#include "mds_client.h"
12
13
#include <linux/ceph/messenger.h>
14
#include <linux/ceph/decode.h>
15
#include <linux/ceph/pagelist.h>
16
#include <linux/ceph/auth.h>
17
#include <linux/ceph/debugfs.h>
18
19
/*
20
* A cluster of MDS (metadata server) daemons is responsible for
21
* managing the file system namespace (the directory hierarchy and
22
* inodes) and for coordinating shared access to storage. Metadata is
23
* partitioning hierarchically across a number of servers, and that
24
* partition varies over time as the cluster adjusts the distribution
25
* in order to balance load.
26
*
27
* The MDS client is primarily responsible to managing synchronous
28
* metadata requests for operations like open, unlink, and so forth.
29
* If there is a MDS failure, we find out about it when we (possibly
30
* request and) receive a new MDS map, and can resubmit affected
31
* requests.
32
*
33
* For the most part, though, we take advantage of a lossless
34
* communications channel to the MDS, and do not need to worry about
35
* timing out or resubmitting requests.
36
*
37
* We maintain a stateful "session" with each MDS we interact with.
38
* Within each session, we sent periodic heartbeat messages to ensure
39
* any capabilities or leases we have been issues remain valid. If
40
* the session times out and goes stale, our leases and capabilities
41
* are no longer valid.
42
*/
43
44
struct ceph_reconnect_state {
45
struct ceph_pagelist *pagelist;
46
bool flock;
47
};
48
49
static void __wake_requests(struct ceph_mds_client *mdsc,
50
struct list_head *head);
51
52
static const struct ceph_connection_operations mds_con_ops;
53
54
55
/*
56
* mds reply parsing
57
*/
58
59
/*
60
* parse individual inode info
61
*/
62
static int parse_reply_info_in(void **p, void *end,
63
struct ceph_mds_reply_info_in *info,
64
int features)
65
{
66
int err = -EIO;
67
68
info->in = *p;
69
*p += sizeof(struct ceph_mds_reply_inode) +
70
sizeof(*info->in->fragtree.splits) *
71
le32_to_cpu(info->in->fragtree.nsplits);
72
73
ceph_decode_32_safe(p, end, info->symlink_len, bad);
74
ceph_decode_need(p, end, info->symlink_len, bad);
75
info->symlink = *p;
76
*p += info->symlink_len;
77
78
if (features & CEPH_FEATURE_DIRLAYOUTHASH)
79
ceph_decode_copy_safe(p, end, &info->dir_layout,
80
sizeof(info->dir_layout), bad);
81
else
82
memset(&info->dir_layout, 0, sizeof(info->dir_layout));
83
84
ceph_decode_32_safe(p, end, info->xattr_len, bad);
85
ceph_decode_need(p, end, info->xattr_len, bad);
86
info->xattr_data = *p;
87
*p += info->xattr_len;
88
return 0;
89
bad:
90
return err;
91
}
92
93
/*
94
* parse a normal reply, which may contain a (dir+)dentry and/or a
95
* target inode.
96
*/
97
static int parse_reply_info_trace(void **p, void *end,
98
struct ceph_mds_reply_info_parsed *info,
99
int features)
100
{
101
int err;
102
103
if (info->head->is_dentry) {
104
err = parse_reply_info_in(p, end, &info->diri, features);
105
if (err < 0)
106
goto out_bad;
107
108
if (unlikely(*p + sizeof(*info->dirfrag) > end))
109
goto bad;
110
info->dirfrag = *p;
111
*p += sizeof(*info->dirfrag) +
112
sizeof(u32)*le32_to_cpu(info->dirfrag->ndist);
113
if (unlikely(*p > end))
114
goto bad;
115
116
ceph_decode_32_safe(p, end, info->dname_len, bad);
117
ceph_decode_need(p, end, info->dname_len, bad);
118
info->dname = *p;
119
*p += info->dname_len;
120
info->dlease = *p;
121
*p += sizeof(*info->dlease);
122
}
123
124
if (info->head->is_target) {
125
err = parse_reply_info_in(p, end, &info->targeti, features);
126
if (err < 0)
127
goto out_bad;
128
}
129
130
if (unlikely(*p != end))
131
goto bad;
132
return 0;
133
134
bad:
135
err = -EIO;
136
out_bad:
137
pr_err("problem parsing mds trace %d\n", err);
138
return err;
139
}
140
141
/*
142
* parse readdir results
143
*/
144
static int parse_reply_info_dir(void **p, void *end,
145
struct ceph_mds_reply_info_parsed *info,
146
int features)
147
{
148
u32 num, i = 0;
149
int err;
150
151
info->dir_dir = *p;
152
if (*p + sizeof(*info->dir_dir) > end)
153
goto bad;
154
*p += sizeof(*info->dir_dir) +
155
sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
156
if (*p > end)
157
goto bad;
158
159
ceph_decode_need(p, end, sizeof(num) + 2, bad);
160
num = ceph_decode_32(p);
161
info->dir_end = ceph_decode_8(p);
162
info->dir_complete = ceph_decode_8(p);
163
if (num == 0)
164
goto done;
165
166
/* alloc large array */
167
info->dir_nr = num;
168
info->dir_in = kcalloc(num, sizeof(*info->dir_in) +
169
sizeof(*info->dir_dname) +
170
sizeof(*info->dir_dname_len) +
171
sizeof(*info->dir_dlease),
172
GFP_NOFS);
173
if (info->dir_in == NULL) {
174
err = -ENOMEM;
175
goto out_bad;
176
}
177
info->dir_dname = (void *)(info->dir_in + num);
178
info->dir_dname_len = (void *)(info->dir_dname + num);
179
info->dir_dlease = (void *)(info->dir_dname_len + num);
180
181
while (num) {
182
/* dentry */
183
ceph_decode_need(p, end, sizeof(u32)*2, bad);
184
info->dir_dname_len[i] = ceph_decode_32(p);
185
ceph_decode_need(p, end, info->dir_dname_len[i], bad);
186
info->dir_dname[i] = *p;
187
*p += info->dir_dname_len[i];
188
dout("parsed dir dname '%.*s'\n", info->dir_dname_len[i],
189
info->dir_dname[i]);
190
info->dir_dlease[i] = *p;
191
*p += sizeof(struct ceph_mds_reply_lease);
192
193
/* inode */
194
err = parse_reply_info_in(p, end, &info->dir_in[i], features);
195
if (err < 0)
196
goto out_bad;
197
i++;
198
num--;
199
}
200
201
done:
202
if (*p != end)
203
goto bad;
204
return 0;
205
206
bad:
207
err = -EIO;
208
out_bad:
209
pr_err("problem parsing dir contents %d\n", err);
210
return err;
211
}
212
213
/*
214
* parse fcntl F_GETLK results
215
*/
216
static int parse_reply_info_filelock(void **p, void *end,
217
struct ceph_mds_reply_info_parsed *info,
218
int features)
219
{
220
if (*p + sizeof(*info->filelock_reply) > end)
221
goto bad;
222
223
info->filelock_reply = *p;
224
*p += sizeof(*info->filelock_reply);
225
226
if (unlikely(*p != end))
227
goto bad;
228
return 0;
229
230
bad:
231
return -EIO;
232
}
233
234
/*
235
* parse extra results
236
*/
237
static int parse_reply_info_extra(void **p, void *end,
238
struct ceph_mds_reply_info_parsed *info,
239
int features)
240
{
241
if (info->head->op == CEPH_MDS_OP_GETFILELOCK)
242
return parse_reply_info_filelock(p, end, info, features);
243
else
244
return parse_reply_info_dir(p, end, info, features);
245
}
246
247
/*
248
* parse entire mds reply
249
*/
250
static int parse_reply_info(struct ceph_msg *msg,
251
struct ceph_mds_reply_info_parsed *info,
252
int features)
253
{
254
void *p, *end;
255
u32 len;
256
int err;
257
258
info->head = msg->front.iov_base;
259
p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
260
end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
261
262
/* trace */
263
ceph_decode_32_safe(&p, end, len, bad);
264
if (len > 0) {
265
err = parse_reply_info_trace(&p, p+len, info, features);
266
if (err < 0)
267
goto out_bad;
268
}
269
270
/* extra */
271
ceph_decode_32_safe(&p, end, len, bad);
272
if (len > 0) {
273
err = parse_reply_info_extra(&p, p+len, info, features);
274
if (err < 0)
275
goto out_bad;
276
}
277
278
/* snap blob */
279
ceph_decode_32_safe(&p, end, len, bad);
280
info->snapblob_len = len;
281
info->snapblob = p;
282
p += len;
283
284
if (p != end)
285
goto bad;
286
return 0;
287
288
bad:
289
err = -EIO;
290
out_bad:
291
pr_err("mds parse_reply err %d\n", err);
292
return err;
293
}
294
295
static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
296
{
297
kfree(info->dir_in);
298
}
299
300
301
/*
302
* sessions
303
*/
304
static const char *session_state_name(int s)
305
{
306
switch (s) {
307
case CEPH_MDS_SESSION_NEW: return "new";
308
case CEPH_MDS_SESSION_OPENING: return "opening";
309
case CEPH_MDS_SESSION_OPEN: return "open";
310
case CEPH_MDS_SESSION_HUNG: return "hung";
311
case CEPH_MDS_SESSION_CLOSING: return "closing";
312
case CEPH_MDS_SESSION_RESTARTING: return "restarting";
313
case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
314
default: return "???";
315
}
316
}
317
318
static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
319
{
320
if (atomic_inc_not_zero(&s->s_ref)) {
321
dout("mdsc get_session %p %d -> %d\n", s,
322
atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref));
323
return s;
324
} else {
325
dout("mdsc get_session %p 0 -- FAIL", s);
326
return NULL;
327
}
328
}
329
330
void ceph_put_mds_session(struct ceph_mds_session *s)
331
{
332
dout("mdsc put_session %p %d -> %d\n", s,
333
atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
334
if (atomic_dec_and_test(&s->s_ref)) {
335
if (s->s_authorizer)
336
s->s_mdsc->fsc->client->monc.auth->ops->destroy_authorizer(
337
s->s_mdsc->fsc->client->monc.auth,
338
s->s_authorizer);
339
kfree(s);
340
}
341
}
342
343
/*
344
* called under mdsc->mutex
345
*/
346
struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
347
int mds)
348
{
349
struct ceph_mds_session *session;
350
351
if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL)
352
return NULL;
353
session = mdsc->sessions[mds];
354
dout("lookup_mds_session %p %d\n", session,
355
atomic_read(&session->s_ref));
356
get_session(session);
357
return session;
358
}
359
360
static bool __have_session(struct ceph_mds_client *mdsc, int mds)
361
{
362
if (mds >= mdsc->max_sessions)
363
return false;
364
return mdsc->sessions[mds];
365
}
366
367
static int __verify_registered_session(struct ceph_mds_client *mdsc,
368
struct ceph_mds_session *s)
369
{
370
if (s->s_mds >= mdsc->max_sessions ||
371
mdsc->sessions[s->s_mds] != s)
372
return -ENOENT;
373
return 0;
374
}
375
376
/*
377
* create+register a new session for given mds.
378
* called under mdsc->mutex.
379
*/
380
static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
381
int mds)
382
{
383
struct ceph_mds_session *s;
384
385
s = kzalloc(sizeof(*s), GFP_NOFS);
386
if (!s)
387
return ERR_PTR(-ENOMEM);
388
s->s_mdsc = mdsc;
389
s->s_mds = mds;
390
s->s_state = CEPH_MDS_SESSION_NEW;
391
s->s_ttl = 0;
392
s->s_seq = 0;
393
mutex_init(&s->s_mutex);
394
395
ceph_con_init(mdsc->fsc->client->msgr, &s->s_con);
396
s->s_con.private = s;
397
s->s_con.ops = &mds_con_ops;
398
s->s_con.peer_name.type = CEPH_ENTITY_TYPE_MDS;
399
s->s_con.peer_name.num = cpu_to_le64(mds);
400
401
spin_lock_init(&s->s_cap_lock);
402
s->s_cap_gen = 0;
403
s->s_cap_ttl = 0;
404
s->s_renew_requested = 0;
405
s->s_renew_seq = 0;
406
INIT_LIST_HEAD(&s->s_caps);
407
s->s_nr_caps = 0;
408
s->s_trim_caps = 0;
409
atomic_set(&s->s_ref, 1);
410
INIT_LIST_HEAD(&s->s_waiting);
411
INIT_LIST_HEAD(&s->s_unsafe);
412
s->s_num_cap_releases = 0;
413
s->s_cap_iterator = NULL;
414
INIT_LIST_HEAD(&s->s_cap_releases);
415
INIT_LIST_HEAD(&s->s_cap_releases_done);
416
INIT_LIST_HEAD(&s->s_cap_flushing);
417
INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
418
419
dout("register_session mds%d\n", mds);
420
if (mds >= mdsc->max_sessions) {
421
int newmax = 1 << get_count_order(mds+1);
422
struct ceph_mds_session **sa;
423
424
dout("register_session realloc to %d\n", newmax);
425
sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
426
if (sa == NULL)
427
goto fail_realloc;
428
if (mdsc->sessions) {
429
memcpy(sa, mdsc->sessions,
430
mdsc->max_sessions * sizeof(void *));
431
kfree(mdsc->sessions);
432
}
433
mdsc->sessions = sa;
434
mdsc->max_sessions = newmax;
435
}
436
mdsc->sessions[mds] = s;
437
atomic_inc(&s->s_ref); /* one ref to sessions[], one to caller */
438
439
ceph_con_open(&s->s_con, ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
440
441
return s;
442
443
fail_realloc:
444
kfree(s);
445
return ERR_PTR(-ENOMEM);
446
}
447
448
/*
449
* called under mdsc->mutex
450
*/
451
static void __unregister_session(struct ceph_mds_client *mdsc,
452
struct ceph_mds_session *s)
453
{
454
dout("__unregister_session mds%d %p\n", s->s_mds, s);
455
BUG_ON(mdsc->sessions[s->s_mds] != s);
456
mdsc->sessions[s->s_mds] = NULL;
457
ceph_con_close(&s->s_con);
458
ceph_put_mds_session(s);
459
}
460
461
/*
462
* drop session refs in request.
463
*
464
* should be last request ref, or hold mdsc->mutex
465
*/
466
static void put_request_session(struct ceph_mds_request *req)
467
{
468
if (req->r_session) {
469
ceph_put_mds_session(req->r_session);
470
req->r_session = NULL;
471
}
472
}
473
474
void ceph_mdsc_release_request(struct kref *kref)
475
{
476
struct ceph_mds_request *req = container_of(kref,
477
struct ceph_mds_request,
478
r_kref);
479
if (req->r_request)
480
ceph_msg_put(req->r_request);
481
if (req->r_reply) {
482
ceph_msg_put(req->r_reply);
483
destroy_reply_info(&req->r_reply_info);
484
}
485
if (req->r_inode) {
486
ceph_put_cap_refs(ceph_inode(req->r_inode),
487
CEPH_CAP_PIN);
488
iput(req->r_inode);
489
}
490
if (req->r_locked_dir)
491
ceph_put_cap_refs(ceph_inode(req->r_locked_dir),
492
CEPH_CAP_PIN);
493
if (req->r_target_inode)
494
iput(req->r_target_inode);
495
if (req->r_dentry)
496
dput(req->r_dentry);
497
if (req->r_old_dentry) {
498
ceph_put_cap_refs(
499
ceph_inode(req->r_old_dentry->d_parent->d_inode),
500
CEPH_CAP_PIN);
501
dput(req->r_old_dentry);
502
}
503
kfree(req->r_path1);
504
kfree(req->r_path2);
505
put_request_session(req);
506
ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
507
kfree(req);
508
}
509
510
/*
511
* lookup session, bump ref if found.
512
*
513
* called under mdsc->mutex.
514
*/
515
static struct ceph_mds_request *__lookup_request(struct ceph_mds_client *mdsc,
516
u64 tid)
517
{
518
struct ceph_mds_request *req;
519
struct rb_node *n = mdsc->request_tree.rb_node;
520
521
while (n) {
522
req = rb_entry(n, struct ceph_mds_request, r_node);
523
if (tid < req->r_tid)
524
n = n->rb_left;
525
else if (tid > req->r_tid)
526
n = n->rb_right;
527
else {
528
ceph_mdsc_get_request(req);
529
return req;
530
}
531
}
532
return NULL;
533
}
534
535
static void __insert_request(struct ceph_mds_client *mdsc,
536
struct ceph_mds_request *new)
537
{
538
struct rb_node **p = &mdsc->request_tree.rb_node;
539
struct rb_node *parent = NULL;
540
struct ceph_mds_request *req = NULL;
541
542
while (*p) {
543
parent = *p;
544
req = rb_entry(parent, struct ceph_mds_request, r_node);
545
if (new->r_tid < req->r_tid)
546
p = &(*p)->rb_left;
547
else if (new->r_tid > req->r_tid)
548
p = &(*p)->rb_right;
549
else
550
BUG();
551
}
552
553
rb_link_node(&new->r_node, parent, p);
554
rb_insert_color(&new->r_node, &mdsc->request_tree);
555
}
556
557
/*
558
* Register an in-flight request, and assign a tid. Link to directory
559
* are modifying (if any).
560
*
561
* Called under mdsc->mutex.
562
*/
563
static void __register_request(struct ceph_mds_client *mdsc,
564
struct ceph_mds_request *req,
565
struct inode *dir)
566
{
567
req->r_tid = ++mdsc->last_tid;
568
if (req->r_num_caps)
569
ceph_reserve_caps(mdsc, &req->r_caps_reservation,
570
req->r_num_caps);
571
dout("__register_request %p tid %lld\n", req, req->r_tid);
572
ceph_mdsc_get_request(req);
573
__insert_request(mdsc, req);
574
575
req->r_uid = current_fsuid();
576
req->r_gid = current_fsgid();
577
578
if (dir) {
579
struct ceph_inode_info *ci = ceph_inode(dir);
580
581
ihold(dir);
582
spin_lock(&ci->i_unsafe_lock);
583
req->r_unsafe_dir = dir;
584
list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
585
spin_unlock(&ci->i_unsafe_lock);
586
}
587
}
588
589
static void __unregister_request(struct ceph_mds_client *mdsc,
590
struct ceph_mds_request *req)
591
{
592
dout("__unregister_request %p tid %lld\n", req, req->r_tid);
593
rb_erase(&req->r_node, &mdsc->request_tree);
594
RB_CLEAR_NODE(&req->r_node);
595
596
if (req->r_unsafe_dir) {
597
struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
598
599
spin_lock(&ci->i_unsafe_lock);
600
list_del_init(&req->r_unsafe_dir_item);
601
spin_unlock(&ci->i_unsafe_lock);
602
603
iput(req->r_unsafe_dir);
604
req->r_unsafe_dir = NULL;
605
}
606
607
ceph_mdsc_put_request(req);
608
}
609
610
/*
611
* Choose mds to send request to next. If there is a hint set in the
612
* request (e.g., due to a prior forward hint from the mds), use that.
613
* Otherwise, consult frag tree and/or caps to identify the
614
* appropriate mds. If all else fails, choose randomly.
615
*
616
* Called under mdsc->mutex.
617
*/
618
struct dentry *get_nonsnap_parent(struct dentry *dentry)
619
{
620
while (!IS_ROOT(dentry) && ceph_snap(dentry->d_inode) != CEPH_NOSNAP)
621
dentry = dentry->d_parent;
622
return dentry;
623
}
624
625
static int __choose_mds(struct ceph_mds_client *mdsc,
626
struct ceph_mds_request *req)
627
{
628
struct inode *inode;
629
struct ceph_inode_info *ci;
630
struct ceph_cap *cap;
631
int mode = req->r_direct_mode;
632
int mds = -1;
633
u32 hash = req->r_direct_hash;
634
bool is_hash = req->r_direct_is_hash;
635
636
/*
637
* is there a specific mds we should try? ignore hint if we have
638
* no session and the mds is not up (active or recovering).
639
*/
640
if (req->r_resend_mds >= 0 &&
641
(__have_session(mdsc, req->r_resend_mds) ||
642
ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
643
dout("choose_mds using resend_mds mds%d\n",
644
req->r_resend_mds);
645
return req->r_resend_mds;
646
}
647
648
if (mode == USE_RANDOM_MDS)
649
goto random;
650
651
inode = NULL;
652
if (req->r_inode) {
653
inode = req->r_inode;
654
} else if (req->r_dentry) {
655
struct inode *dir = req->r_dentry->d_parent->d_inode;
656
657
if (dir->i_sb != mdsc->fsc->sb) {
658
/* not this fs! */
659
inode = req->r_dentry->d_inode;
660
} else if (ceph_snap(dir) != CEPH_NOSNAP) {
661
/* direct snapped/virtual snapdir requests
662
* based on parent dir inode */
663
struct dentry *dn =
664
get_nonsnap_parent(req->r_dentry->d_parent);
665
inode = dn->d_inode;
666
dout("__choose_mds using nonsnap parent %p\n", inode);
667
} else if (req->r_dentry->d_inode) {
668
/* dentry target */
669
inode = req->r_dentry->d_inode;
670
} else {
671
/* dir + name */
672
inode = dir;
673
hash = ceph_dentry_hash(req->r_dentry);
674
is_hash = true;
675
}
676
}
677
678
dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
679
(int)hash, mode);
680
if (!inode)
681
goto random;
682
ci = ceph_inode(inode);
683
684
if (is_hash && S_ISDIR(inode->i_mode)) {
685
struct ceph_inode_frag frag;
686
int found;
687
688
ceph_choose_frag(ci, hash, &frag, &found);
689
if (found) {
690
if (mode == USE_ANY_MDS && frag.ndist > 0) {
691
u8 r;
692
693
/* choose a random replica */
694
get_random_bytes(&r, 1);
695
r %= frag.ndist;
696
mds = frag.dist[r];
697
dout("choose_mds %p %llx.%llx "
698
"frag %u mds%d (%d/%d)\n",
699
inode, ceph_vinop(inode),
700
frag.frag, mds,
701
(int)r, frag.ndist);
702
if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
703
CEPH_MDS_STATE_ACTIVE)
704
return mds;
705
}
706
707
/* since this file/dir wasn't known to be
708
* replicated, then we want to look for the
709
* authoritative mds. */
710
mode = USE_AUTH_MDS;
711
if (frag.mds >= 0) {
712
/* choose auth mds */
713
mds = frag.mds;
714
dout("choose_mds %p %llx.%llx "
715
"frag %u mds%d (auth)\n",
716
inode, ceph_vinop(inode), frag.frag, mds);
717
if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
718
CEPH_MDS_STATE_ACTIVE)
719
return mds;
720
}
721
}
722
}
723
724
spin_lock(&inode->i_lock);
725
cap = NULL;
726
if (mode == USE_AUTH_MDS)
727
cap = ci->i_auth_cap;
728
if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
729
cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
730
if (!cap) {
731
spin_unlock(&inode->i_lock);
732
goto random;
733
}
734
mds = cap->session->s_mds;
735
dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
736
inode, ceph_vinop(inode), mds,
737
cap == ci->i_auth_cap ? "auth " : "", cap);
738
spin_unlock(&inode->i_lock);
739
return mds;
740
741
random:
742
mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
743
dout("choose_mds chose random mds%d\n", mds);
744
return mds;
745
}
746
747
748
/*
749
* session messages
750
*/
751
static struct ceph_msg *create_session_msg(u32 op, u64 seq)
752
{
753
struct ceph_msg *msg;
754
struct ceph_mds_session_head *h;
755
756
msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS);
757
if (!msg) {
758
pr_err("create_session_msg ENOMEM creating msg\n");
759
return NULL;
760
}
761
h = msg->front.iov_base;
762
h->op = cpu_to_le32(op);
763
h->seq = cpu_to_le64(seq);
764
return msg;
765
}
766
767
/*
768
* send session open request.
769
*
770
* called under mdsc->mutex
771
*/
772
static int __open_session(struct ceph_mds_client *mdsc,
773
struct ceph_mds_session *session)
774
{
775
struct ceph_msg *msg;
776
int mstate;
777
int mds = session->s_mds;
778
779
/* wait for mds to go active? */
780
mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
781
dout("open_session to mds%d (%s)\n", mds,
782
ceph_mds_state_name(mstate));
783
session->s_state = CEPH_MDS_SESSION_OPENING;
784
session->s_renew_requested = jiffies;
785
786
/* send connect message */
787
msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq);
788
if (!msg)
789
return -ENOMEM;
790
ceph_con_send(&session->s_con, msg);
791
return 0;
792
}
793
794
/*
795
* open sessions for any export targets for the given mds
796
*
797
* called under mdsc->mutex
798
*/
799
static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
800
struct ceph_mds_session *session)
801
{
802
struct ceph_mds_info *mi;
803
struct ceph_mds_session *ts;
804
int i, mds = session->s_mds;
805
int target;
806
807
if (mds >= mdsc->mdsmap->m_max_mds)
808
return;
809
mi = &mdsc->mdsmap->m_info[mds];
810
dout("open_export_target_sessions for mds%d (%d targets)\n",
811
session->s_mds, mi->num_export_targets);
812
813
for (i = 0; i < mi->num_export_targets; i++) {
814
target = mi->export_targets[i];
815
ts = __ceph_lookup_mds_session(mdsc, target);
816
if (!ts) {
817
ts = register_session(mdsc, target);
818
if (IS_ERR(ts))
819
return;
820
}
821
if (session->s_state == CEPH_MDS_SESSION_NEW ||
822
session->s_state == CEPH_MDS_SESSION_CLOSING)
823
__open_session(mdsc, session);
824
else
825
dout(" mds%d target mds%d %p is %s\n", session->s_mds,
826
i, ts, session_state_name(ts->s_state));
827
ceph_put_mds_session(ts);
828
}
829
}
830
831
void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
832
struct ceph_mds_session *session)
833
{
834
mutex_lock(&mdsc->mutex);
835
__open_export_target_sessions(mdsc, session);
836
mutex_unlock(&mdsc->mutex);
837
}
838
839
/*
840
* session caps
841
*/
842
843
/*
844
* Free preallocated cap messages assigned to this session
845
*/
846
static void cleanup_cap_releases(struct ceph_mds_session *session)
847
{
848
struct ceph_msg *msg;
849
850
spin_lock(&session->s_cap_lock);
851
while (!list_empty(&session->s_cap_releases)) {
852
msg = list_first_entry(&session->s_cap_releases,
853
struct ceph_msg, list_head);
854
list_del_init(&msg->list_head);
855
ceph_msg_put(msg);
856
}
857
while (!list_empty(&session->s_cap_releases_done)) {
858
msg = list_first_entry(&session->s_cap_releases_done,
859
struct ceph_msg, list_head);
860
list_del_init(&msg->list_head);
861
ceph_msg_put(msg);
862
}
863
spin_unlock(&session->s_cap_lock);
864
}
865
866
/*
867
* Helper to safely iterate over all caps associated with a session, with
868
* special care taken to handle a racing __ceph_remove_cap().
869
*
870
* Caller must hold session s_mutex.
871
*/
872
static int iterate_session_caps(struct ceph_mds_session *session,
873
int (*cb)(struct inode *, struct ceph_cap *,
874
void *), void *arg)
875
{
876
struct list_head *p;
877
struct ceph_cap *cap;
878
struct inode *inode, *last_inode = NULL;
879
struct ceph_cap *old_cap = NULL;
880
int ret;
881
882
dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
883
spin_lock(&session->s_cap_lock);
884
p = session->s_caps.next;
885
while (p != &session->s_caps) {
886
cap = list_entry(p, struct ceph_cap, session_caps);
887
inode = igrab(&cap->ci->vfs_inode);
888
if (!inode) {
889
p = p->next;
890
continue;
891
}
892
session->s_cap_iterator = cap;
893
spin_unlock(&session->s_cap_lock);
894
895
if (last_inode) {
896
iput(last_inode);
897
last_inode = NULL;
898
}
899
if (old_cap) {
900
ceph_put_cap(session->s_mdsc, old_cap);
901
old_cap = NULL;
902
}
903
904
ret = cb(inode, cap, arg);
905
last_inode = inode;
906
907
spin_lock(&session->s_cap_lock);
908
p = p->next;
909
if (cap->ci == NULL) {
910
dout("iterate_session_caps finishing cap %p removal\n",
911
cap);
912
BUG_ON(cap->session != session);
913
list_del_init(&cap->session_caps);
914
session->s_nr_caps--;
915
cap->session = NULL;
916
old_cap = cap; /* put_cap it w/o locks held */
917
}
918
if (ret < 0)
919
goto out;
920
}
921
ret = 0;
922
out:
923
session->s_cap_iterator = NULL;
924
spin_unlock(&session->s_cap_lock);
925
926
if (last_inode)
927
iput(last_inode);
928
if (old_cap)
929
ceph_put_cap(session->s_mdsc, old_cap);
930
931
return ret;
932
}
933
934
static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
935
void *arg)
936
{
937
struct ceph_inode_info *ci = ceph_inode(inode);
938
int drop = 0;
939
940
dout("removing cap %p, ci is %p, inode is %p\n",
941
cap, ci, &ci->vfs_inode);
942
spin_lock(&inode->i_lock);
943
__ceph_remove_cap(cap);
944
if (!__ceph_is_any_real_caps(ci)) {
945
struct ceph_mds_client *mdsc =
946
ceph_sb_to_client(inode->i_sb)->mdsc;
947
948
spin_lock(&mdsc->cap_dirty_lock);
949
if (!list_empty(&ci->i_dirty_item)) {
950
pr_info(" dropping dirty %s state for %p %lld\n",
951
ceph_cap_string(ci->i_dirty_caps),
952
inode, ceph_ino(inode));
953
ci->i_dirty_caps = 0;
954
list_del_init(&ci->i_dirty_item);
955
drop = 1;
956
}
957
if (!list_empty(&ci->i_flushing_item)) {
958
pr_info(" dropping dirty+flushing %s state for %p %lld\n",
959
ceph_cap_string(ci->i_flushing_caps),
960
inode, ceph_ino(inode));
961
ci->i_flushing_caps = 0;
962
list_del_init(&ci->i_flushing_item);
963
mdsc->num_cap_flushing--;
964
drop = 1;
965
}
966
if (drop && ci->i_wrbuffer_ref) {
967
pr_info(" dropping dirty data for %p %lld\n",
968
inode, ceph_ino(inode));
969
ci->i_wrbuffer_ref = 0;
970
ci->i_wrbuffer_ref_head = 0;
971
drop++;
972
}
973
spin_unlock(&mdsc->cap_dirty_lock);
974
}
975
spin_unlock(&inode->i_lock);
976
while (drop--)
977
iput(inode);
978
return 0;
979
}
980
981
/*
982
* caller must hold session s_mutex
983
*/
984
static void remove_session_caps(struct ceph_mds_session *session)
985
{
986
dout("remove_session_caps on %p\n", session);
987
iterate_session_caps(session, remove_session_caps_cb, NULL);
988
BUG_ON(session->s_nr_caps > 0);
989
BUG_ON(!list_empty(&session->s_cap_flushing));
990
cleanup_cap_releases(session);
991
}
992
993
/*
994
* wake up any threads waiting on this session's caps. if the cap is
995
* old (didn't get renewed on the client reconnect), remove it now.
996
*
997
* caller must hold s_mutex.
998
*/
999
static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1000
void *arg)
1001
{
1002
struct ceph_inode_info *ci = ceph_inode(inode);
1003
1004
wake_up_all(&ci->i_cap_wq);
1005
if (arg) {
1006
spin_lock(&inode->i_lock);
1007
ci->i_wanted_max_size = 0;
1008
ci->i_requested_max_size = 0;
1009
spin_unlock(&inode->i_lock);
1010
}
1011
return 0;
1012
}
1013
1014
static void wake_up_session_caps(struct ceph_mds_session *session,
1015
int reconnect)
1016
{
1017
dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1018
iterate_session_caps(session, wake_up_session_cb,
1019
(void *)(unsigned long)reconnect);
1020
}
1021
1022
/*
1023
* Send periodic message to MDS renewing all currently held caps. The
1024
* ack will reset the expiration for all caps from this session.
1025
*
1026
* caller holds s_mutex
1027
*/
1028
static int send_renew_caps(struct ceph_mds_client *mdsc,
1029
struct ceph_mds_session *session)
1030
{
1031
struct ceph_msg *msg;
1032
int state;
1033
1034
if (time_after_eq(jiffies, session->s_cap_ttl) &&
1035
time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1036
pr_info("mds%d caps stale\n", session->s_mds);
1037
session->s_renew_requested = jiffies;
1038
1039
/* do not try to renew caps until a recovering mds has reconnected
1040
* with its clients. */
1041
state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1042
if (state < CEPH_MDS_STATE_RECONNECT) {
1043
dout("send_renew_caps ignoring mds%d (%s)\n",
1044
session->s_mds, ceph_mds_state_name(state));
1045
return 0;
1046
}
1047
1048
dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1049
ceph_mds_state_name(state));
1050
msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1051
++session->s_renew_seq);
1052
if (!msg)
1053
return -ENOMEM;
1054
ceph_con_send(&session->s_con, msg);
1055
return 0;
1056
}
1057
1058
/*
1059
* Note new cap ttl, and any transition from stale -> not stale (fresh?).
1060
*
1061
* Called under session->s_mutex
1062
*/
1063
static void renewed_caps(struct ceph_mds_client *mdsc,
1064
struct ceph_mds_session *session, int is_renew)
1065
{
1066
int was_stale;
1067
int wake = 0;
1068
1069
spin_lock(&session->s_cap_lock);
1070
was_stale = is_renew && (session->s_cap_ttl == 0 ||
1071
time_after_eq(jiffies, session->s_cap_ttl));
1072
1073
session->s_cap_ttl = session->s_renew_requested +
1074
mdsc->mdsmap->m_session_timeout*HZ;
1075
1076
if (was_stale) {
1077
if (time_before(jiffies, session->s_cap_ttl)) {
1078
pr_info("mds%d caps renewed\n", session->s_mds);
1079
wake = 1;
1080
} else {
1081
pr_info("mds%d caps still stale\n", session->s_mds);
1082
}
1083
}
1084
dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1085
session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1086
time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1087
spin_unlock(&session->s_cap_lock);
1088
1089
if (wake)
1090
wake_up_session_caps(session, 0);
1091
}
1092
1093
/*
1094
* send a session close request
1095
*/
1096
static int request_close_session(struct ceph_mds_client *mdsc,
1097
struct ceph_mds_session *session)
1098
{
1099
struct ceph_msg *msg;
1100
1101
dout("request_close_session mds%d state %s seq %lld\n",
1102
session->s_mds, session_state_name(session->s_state),
1103
session->s_seq);
1104
msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
1105
if (!msg)
1106
return -ENOMEM;
1107
ceph_con_send(&session->s_con, msg);
1108
return 0;
1109
}
1110
1111
/*
1112
* Called with s_mutex held.
1113
*/
1114
static int __close_session(struct ceph_mds_client *mdsc,
1115
struct ceph_mds_session *session)
1116
{
1117
if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1118
return 0;
1119
session->s_state = CEPH_MDS_SESSION_CLOSING;
1120
return request_close_session(mdsc, session);
1121
}
1122
1123
/*
1124
* Trim old(er) caps.
1125
*
1126
* Because we can't cache an inode without one or more caps, we do
1127
* this indirectly: if a cap is unused, we prune its aliases, at which
1128
* point the inode will hopefully get dropped to.
1129
*
1130
* Yes, this is a bit sloppy. Our only real goal here is to respond to
1131
* memory pressure from the MDS, though, so it needn't be perfect.
1132
*/
1133
static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1134
{
1135
struct ceph_mds_session *session = arg;
1136
struct ceph_inode_info *ci = ceph_inode(inode);
1137
int used, oissued, mine;
1138
1139
if (session->s_trim_caps <= 0)
1140
return -1;
1141
1142
spin_lock(&inode->i_lock);
1143
mine = cap->issued | cap->implemented;
1144
used = __ceph_caps_used(ci);
1145
oissued = __ceph_caps_issued_other(ci, cap);
1146
1147
dout("trim_caps_cb %p cap %p mine %s oissued %s used %s\n",
1148
inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1149
ceph_cap_string(used));
1150
if (ci->i_dirty_caps)
1151
goto out; /* dirty caps */
1152
if ((used & ~oissued) & mine)
1153
goto out; /* we need these caps */
1154
1155
session->s_trim_caps--;
1156
if (oissued) {
1157
/* we aren't the only cap.. just remove us */
1158
__ceph_remove_cap(cap);
1159
} else {
1160
/* try to drop referring dentries */
1161
spin_unlock(&inode->i_lock);
1162
d_prune_aliases(inode);
1163
dout("trim_caps_cb %p cap %p pruned, count now %d\n",
1164
inode, cap, atomic_read(&inode->i_count));
1165
return 0;
1166
}
1167
1168
out:
1169
spin_unlock(&inode->i_lock);
1170
return 0;
1171
}
1172
1173
/*
1174
* Trim session cap count down to some max number.
1175
*/
1176
static int trim_caps(struct ceph_mds_client *mdsc,
1177
struct ceph_mds_session *session,
1178
int max_caps)
1179
{
1180
int trim_caps = session->s_nr_caps - max_caps;
1181
1182
dout("trim_caps mds%d start: %d / %d, trim %d\n",
1183
session->s_mds, session->s_nr_caps, max_caps, trim_caps);
1184
if (trim_caps > 0) {
1185
session->s_trim_caps = trim_caps;
1186
iterate_session_caps(session, trim_caps_cb, session);
1187
dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
1188
session->s_mds, session->s_nr_caps, max_caps,
1189
trim_caps - session->s_trim_caps);
1190
session->s_trim_caps = 0;
1191
}
1192
return 0;
1193
}
1194
1195
/*
1196
* Allocate cap_release messages. If there is a partially full message
1197
* in the queue, try to allocate enough to cover it's remainder, so that
1198
* we can send it immediately.
1199
*
1200
* Called under s_mutex.
1201
*/
1202
int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
1203
struct ceph_mds_session *session)
1204
{
1205
struct ceph_msg *msg, *partial = NULL;
1206
struct ceph_mds_cap_release *head;
1207
int err = -ENOMEM;
1208
int extra = mdsc->fsc->mount_options->cap_release_safety;
1209
int num;
1210
1211
dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds,
1212
extra);
1213
1214
spin_lock(&session->s_cap_lock);
1215
1216
if (!list_empty(&session->s_cap_releases)) {
1217
msg = list_first_entry(&session->s_cap_releases,
1218
struct ceph_msg,
1219
list_head);
1220
head = msg->front.iov_base;
1221
num = le32_to_cpu(head->num);
1222
if (num) {
1223
dout(" partial %p with (%d/%d)\n", msg, num,
1224
(int)CEPH_CAPS_PER_RELEASE);
1225
extra += CEPH_CAPS_PER_RELEASE - num;
1226
partial = msg;
1227
}
1228
}
1229
while (session->s_num_cap_releases < session->s_nr_caps + extra) {
1230
spin_unlock(&session->s_cap_lock);
1231
msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
1232
GFP_NOFS);
1233
if (!msg)
1234
goto out_unlocked;
1235
dout("add_cap_releases %p msg %p now %d\n", session, msg,
1236
(int)msg->front.iov_len);
1237
head = msg->front.iov_base;
1238
head->num = cpu_to_le32(0);
1239
msg->front.iov_len = sizeof(*head);
1240
spin_lock(&session->s_cap_lock);
1241
list_add(&msg->list_head, &session->s_cap_releases);
1242
session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE;
1243
}
1244
1245
if (partial) {
1246
head = partial->front.iov_base;
1247
num = le32_to_cpu(head->num);
1248
dout(" queueing partial %p with %d/%d\n", partial, num,
1249
(int)CEPH_CAPS_PER_RELEASE);
1250
list_move_tail(&partial->list_head,
1251
&session->s_cap_releases_done);
1252
session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num;
1253
}
1254
err = 0;
1255
spin_unlock(&session->s_cap_lock);
1256
out_unlocked:
1257
return err;
1258
}
1259
1260
/*
1261
* flush all dirty inode data to disk.
1262
*
1263
* returns true if we've flushed through want_flush_seq
1264
*/
1265
static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
1266
{
1267
int mds, ret = 1;
1268
1269
dout("check_cap_flush want %lld\n", want_flush_seq);
1270
mutex_lock(&mdsc->mutex);
1271
for (mds = 0; ret && mds < mdsc->max_sessions; mds++) {
1272
struct ceph_mds_session *session = mdsc->sessions[mds];
1273
1274
if (!session)
1275
continue;
1276
get_session(session);
1277
mutex_unlock(&mdsc->mutex);
1278
1279
mutex_lock(&session->s_mutex);
1280
if (!list_empty(&session->s_cap_flushing)) {
1281
struct ceph_inode_info *ci =
1282
list_entry(session->s_cap_flushing.next,
1283
struct ceph_inode_info,
1284
i_flushing_item);
1285
struct inode *inode = &ci->vfs_inode;
1286
1287
spin_lock(&inode->i_lock);
1288
if (ci->i_cap_flush_seq <= want_flush_seq) {
1289
dout("check_cap_flush still flushing %p "
1290
"seq %lld <= %lld to mds%d\n", inode,
1291
ci->i_cap_flush_seq, want_flush_seq,
1292
session->s_mds);
1293
ret = 0;
1294
}
1295
spin_unlock(&inode->i_lock);
1296
}
1297
mutex_unlock(&session->s_mutex);
1298
ceph_put_mds_session(session);
1299
1300
if (!ret)
1301
return ret;
1302
mutex_lock(&mdsc->mutex);
1303
}
1304
1305
mutex_unlock(&mdsc->mutex);
1306
dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq);
1307
return ret;
1308
}
1309
1310
/*
1311
* called under s_mutex
1312
*/
1313
void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
1314
struct ceph_mds_session *session)
1315
{
1316
struct ceph_msg *msg;
1317
1318
dout("send_cap_releases mds%d\n", session->s_mds);
1319
spin_lock(&session->s_cap_lock);
1320
while (!list_empty(&session->s_cap_releases_done)) {
1321
msg = list_first_entry(&session->s_cap_releases_done,
1322
struct ceph_msg, list_head);
1323
list_del_init(&msg->list_head);
1324
spin_unlock(&session->s_cap_lock);
1325
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1326
dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1327
ceph_con_send(&session->s_con, msg);
1328
spin_lock(&session->s_cap_lock);
1329
}
1330
spin_unlock(&session->s_cap_lock);
1331
}
1332
1333
static void discard_cap_releases(struct ceph_mds_client *mdsc,
1334
struct ceph_mds_session *session)
1335
{
1336
struct ceph_msg *msg;
1337
struct ceph_mds_cap_release *head;
1338
unsigned num;
1339
1340
dout("discard_cap_releases mds%d\n", session->s_mds);
1341
spin_lock(&session->s_cap_lock);
1342
1343
/* zero out the in-progress message */
1344
msg = list_first_entry(&session->s_cap_releases,
1345
struct ceph_msg, list_head);
1346
head = msg->front.iov_base;
1347
num = le32_to_cpu(head->num);
1348
dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num);
1349
head->num = cpu_to_le32(0);
1350
session->s_num_cap_releases += num;
1351
1352
/* requeue completed messages */
1353
while (!list_empty(&session->s_cap_releases_done)) {
1354
msg = list_first_entry(&session->s_cap_releases_done,
1355
struct ceph_msg, list_head);
1356
list_del_init(&msg->list_head);
1357
1358
head = msg->front.iov_base;
1359
num = le32_to_cpu(head->num);
1360
dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg,
1361
num);
1362
session->s_num_cap_releases += num;
1363
head->num = cpu_to_le32(0);
1364
msg->front.iov_len = sizeof(*head);
1365
list_add(&msg->list_head, &session->s_cap_releases);
1366
}
1367
1368
spin_unlock(&session->s_cap_lock);
1369
}
1370
1371
/*
1372
* requests
1373
*/
1374
1375
/*
1376
* Create an mds request.
1377
*/
1378
struct ceph_mds_request *
1379
ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1380
{
1381
struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
1382
1383
if (!req)
1384
return ERR_PTR(-ENOMEM);
1385
1386
mutex_init(&req->r_fill_mutex);
1387
req->r_mdsc = mdsc;
1388
req->r_started = jiffies;
1389
req->r_resend_mds = -1;
1390
INIT_LIST_HEAD(&req->r_unsafe_dir_item);
1391
req->r_fmode = -1;
1392
kref_init(&req->r_kref);
1393
INIT_LIST_HEAD(&req->r_wait);
1394
init_completion(&req->r_completion);
1395
init_completion(&req->r_safe_completion);
1396
INIT_LIST_HEAD(&req->r_unsafe_item);
1397
1398
req->r_op = op;
1399
req->r_direct_mode = mode;
1400
return req;
1401
}
1402
1403
/*
1404
* return oldest (lowest) request, tid in request tree, 0 if none.
1405
*
1406
* called under mdsc->mutex.
1407
*/
1408
static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
1409
{
1410
if (RB_EMPTY_ROOT(&mdsc->request_tree))
1411
return NULL;
1412
return rb_entry(rb_first(&mdsc->request_tree),
1413
struct ceph_mds_request, r_node);
1414
}
1415
1416
static u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
1417
{
1418
struct ceph_mds_request *req = __get_oldest_req(mdsc);
1419
1420
if (req)
1421
return req->r_tid;
1422
return 0;
1423
}
1424
1425
/*
1426
* Build a dentry's path. Allocate on heap; caller must kfree. Based
1427
* on build_path_from_dentry in fs/cifs/dir.c.
1428
*
1429
* If @stop_on_nosnap, generate path relative to the first non-snapped
1430
* inode.
1431
*
1432
* Encode hidden .snap dirs as a double /, i.e.
1433
* foo/.snap/bar -> foo//bar
1434
*/
1435
char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
1436
int stop_on_nosnap)
1437
{
1438
struct dentry *temp;
1439
char *path;
1440
int len, pos;
1441
unsigned seq;
1442
1443
if (dentry == NULL)
1444
return ERR_PTR(-EINVAL);
1445
1446
retry:
1447
len = 0;
1448
seq = read_seqbegin(&rename_lock);
1449
rcu_read_lock();
1450
for (temp = dentry; !IS_ROOT(temp);) {
1451
struct inode *inode = temp->d_inode;
1452
if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
1453
len++; /* slash only */
1454
else if (stop_on_nosnap && inode &&
1455
ceph_snap(inode) == CEPH_NOSNAP)
1456
break;
1457
else
1458
len += 1 + temp->d_name.len;
1459
temp = temp->d_parent;
1460
if (temp == NULL) {
1461
rcu_read_unlock();
1462
pr_err("build_path corrupt dentry %p\n", dentry);
1463
return ERR_PTR(-EINVAL);
1464
}
1465
}
1466
rcu_read_unlock();
1467
if (len)
1468
len--; /* no leading '/' */
1469
1470
path = kmalloc(len+1, GFP_NOFS);
1471
if (path == NULL)
1472
return ERR_PTR(-ENOMEM);
1473
pos = len;
1474
path[pos] = 0; /* trailing null */
1475
rcu_read_lock();
1476
for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
1477
struct inode *inode;
1478
1479
spin_lock(&temp->d_lock);
1480
inode = temp->d_inode;
1481
if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
1482
dout("build_path path+%d: %p SNAPDIR\n",
1483
pos, temp);
1484
} else if (stop_on_nosnap && inode &&
1485
ceph_snap(inode) == CEPH_NOSNAP) {
1486
break;
1487
} else {
1488
pos -= temp->d_name.len;
1489
if (pos < 0) {
1490
spin_unlock(&temp->d_lock);
1491
break;
1492
}
1493
strncpy(path + pos, temp->d_name.name,
1494
temp->d_name.len);
1495
}
1496
spin_unlock(&temp->d_lock);
1497
if (pos)
1498
path[--pos] = '/';
1499
temp = temp->d_parent;
1500
if (temp == NULL) {
1501
rcu_read_unlock();
1502
pr_err("build_path corrupt dentry\n");
1503
kfree(path);
1504
return ERR_PTR(-EINVAL);
1505
}
1506
}
1507
rcu_read_unlock();
1508
if (pos != 0 || read_seqretry(&rename_lock, seq)) {
1509
pr_err("build_path did not end path lookup where "
1510
"expected, namelen is %d, pos is %d\n", len, pos);
1511
/* presumably this is only possible if racing with a
1512
rename of one of the parent directories (we can not
1513
lock the dentries above us to prevent this, but
1514
retrying should be harmless) */
1515
kfree(path);
1516
goto retry;
1517
}
1518
1519
*base = ceph_ino(temp->d_inode);
1520
*plen = len;
1521
dout("build_path on %p %d built %llx '%.*s'\n",
1522
dentry, dentry->d_count, *base, len, path);
1523
return path;
1524
}
1525
1526
static int build_dentry_path(struct dentry *dentry,
1527
const char **ppath, int *ppathlen, u64 *pino,
1528
int *pfreepath)
1529
{
1530
char *path;
1531
1532
if (ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP) {
1533
*pino = ceph_ino(dentry->d_parent->d_inode);
1534
*ppath = dentry->d_name.name;
1535
*ppathlen = dentry->d_name.len;
1536
return 0;
1537
}
1538
path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1539
if (IS_ERR(path))
1540
return PTR_ERR(path);
1541
*ppath = path;
1542
*pfreepath = 1;
1543
return 0;
1544
}
1545
1546
static int build_inode_path(struct inode *inode,
1547
const char **ppath, int *ppathlen, u64 *pino,
1548
int *pfreepath)
1549
{
1550
struct dentry *dentry;
1551
char *path;
1552
1553
if (ceph_snap(inode) == CEPH_NOSNAP) {
1554
*pino = ceph_ino(inode);
1555
*ppathlen = 0;
1556
return 0;
1557
}
1558
dentry = d_find_alias(inode);
1559
path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1560
dput(dentry);
1561
if (IS_ERR(path))
1562
return PTR_ERR(path);
1563
*ppath = path;
1564
*pfreepath = 1;
1565
return 0;
1566
}
1567
1568
/*
1569
* request arguments may be specified via an inode *, a dentry *, or
1570
* an explicit ino+path.
1571
*/
1572
static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
1573
const char *rpath, u64 rino,
1574
const char **ppath, int *pathlen,
1575
u64 *ino, int *freepath)
1576
{
1577
int r = 0;
1578
1579
if (rinode) {
1580
r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
1581
dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
1582
ceph_snap(rinode));
1583
} else if (rdentry) {
1584
r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath);
1585
dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
1586
*ppath);
1587
} else if (rpath) {
1588
*ino = rino;
1589
*ppath = rpath;
1590
*pathlen = strlen(rpath);
1591
dout(" path %.*s\n", *pathlen, rpath);
1592
}
1593
1594
return r;
1595
}
1596
1597
/*
1598
* called under mdsc->mutex
1599
*/
1600
static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1601
struct ceph_mds_request *req,
1602
int mds)
1603
{
1604
struct ceph_msg *msg;
1605
struct ceph_mds_request_head *head;
1606
const char *path1 = NULL;
1607
const char *path2 = NULL;
1608
u64 ino1 = 0, ino2 = 0;
1609
int pathlen1 = 0, pathlen2 = 0;
1610
int freepath1 = 0, freepath2 = 0;
1611
int len;
1612
u16 releases;
1613
void *p, *end;
1614
int ret;
1615
1616
ret = set_request_path_attr(req->r_inode, req->r_dentry,
1617
req->r_path1, req->r_ino1.ino,
1618
&path1, &pathlen1, &ino1, &freepath1);
1619
if (ret < 0) {
1620
msg = ERR_PTR(ret);
1621
goto out;
1622
}
1623
1624
ret = set_request_path_attr(NULL, req->r_old_dentry,
1625
req->r_path2, req->r_ino2.ino,
1626
&path2, &pathlen2, &ino2, &freepath2);
1627
if (ret < 0) {
1628
msg = ERR_PTR(ret);
1629
goto out_free1;
1630
}
1631
1632
len = sizeof(*head) +
1633
pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64));
1634
1635
/* calculate (max) length for cap releases */
1636
len += sizeof(struct ceph_mds_request_release) *
1637
(!!req->r_inode_drop + !!req->r_dentry_drop +
1638
!!req->r_old_inode_drop + !!req->r_old_dentry_drop);
1639
if (req->r_dentry_drop)
1640
len += req->r_dentry->d_name.len;
1641
if (req->r_old_dentry_drop)
1642
len += req->r_old_dentry->d_name.len;
1643
1644
msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS);
1645
if (!msg) {
1646
msg = ERR_PTR(-ENOMEM);
1647
goto out_free2;
1648
}
1649
1650
msg->hdr.tid = cpu_to_le64(req->r_tid);
1651
1652
head = msg->front.iov_base;
1653
p = msg->front.iov_base + sizeof(*head);
1654
end = msg->front.iov_base + msg->front.iov_len;
1655
1656
head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
1657
head->op = cpu_to_le32(req->r_op);
1658
head->caller_uid = cpu_to_le32(req->r_uid);
1659
head->caller_gid = cpu_to_le32(req->r_gid);
1660
head->args = req->r_args;
1661
1662
ceph_encode_filepath(&p, end, ino1, path1);
1663
ceph_encode_filepath(&p, end, ino2, path2);
1664
1665
/* make note of release offset, in case we need to replay */
1666
req->r_request_release_offset = p - msg->front.iov_base;
1667
1668
/* cap releases */
1669
releases = 0;
1670
if (req->r_inode_drop)
1671
releases += ceph_encode_inode_release(&p,
1672
req->r_inode ? req->r_inode : req->r_dentry->d_inode,
1673
mds, req->r_inode_drop, req->r_inode_unless, 0);
1674
if (req->r_dentry_drop)
1675
releases += ceph_encode_dentry_release(&p, req->r_dentry,
1676
mds, req->r_dentry_drop, req->r_dentry_unless);
1677
if (req->r_old_dentry_drop)
1678
releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
1679
mds, req->r_old_dentry_drop, req->r_old_dentry_unless);
1680
if (req->r_old_inode_drop)
1681
releases += ceph_encode_inode_release(&p,
1682
req->r_old_dentry->d_inode,
1683
mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
1684
head->num_releases = cpu_to_le16(releases);
1685
1686
BUG_ON(p > end);
1687
msg->front.iov_len = p - msg->front.iov_base;
1688
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1689
1690
msg->pages = req->r_pages;
1691
msg->nr_pages = req->r_num_pages;
1692
msg->hdr.data_len = cpu_to_le32(req->r_data_len);
1693
msg->hdr.data_off = cpu_to_le16(0);
1694
1695
out_free2:
1696
if (freepath2)
1697
kfree((char *)path2);
1698
out_free1:
1699
if (freepath1)
1700
kfree((char *)path1);
1701
out:
1702
return msg;
1703
}
1704
1705
/*
1706
* called under mdsc->mutex if error, under no mutex if
1707
* success.
1708
*/
1709
static void complete_request(struct ceph_mds_client *mdsc,
1710
struct ceph_mds_request *req)
1711
{
1712
if (req->r_callback)
1713
req->r_callback(mdsc, req);
1714
else
1715
complete_all(&req->r_completion);
1716
}
1717
1718
/*
1719
* called under mdsc->mutex
1720
*/
1721
static int __prepare_send_request(struct ceph_mds_client *mdsc,
1722
struct ceph_mds_request *req,
1723
int mds)
1724
{
1725
struct ceph_mds_request_head *rhead;
1726
struct ceph_msg *msg;
1727
int flags = 0;
1728
1729
req->r_attempts++;
1730
if (req->r_inode) {
1731
struct ceph_cap *cap =
1732
ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
1733
1734
if (cap)
1735
req->r_sent_on_mseq = cap->mseq;
1736
else
1737
req->r_sent_on_mseq = -1;
1738
}
1739
dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
1740
req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
1741
1742
if (req->r_got_unsafe) {
1743
/*
1744
* Replay. Do not regenerate message (and rebuild
1745
* paths, etc.); just use the original message.
1746
* Rebuilding paths will break for renames because
1747
* d_move mangles the src name.
1748
*/
1749
msg = req->r_request;
1750
rhead = msg->front.iov_base;
1751
1752
flags = le32_to_cpu(rhead->flags);
1753
flags |= CEPH_MDS_FLAG_REPLAY;
1754
rhead->flags = cpu_to_le32(flags);
1755
1756
if (req->r_target_inode)
1757
rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
1758
1759
rhead->num_retry = req->r_attempts - 1;
1760
1761
/* remove cap/dentry releases from message */
1762
rhead->num_releases = 0;
1763
msg->hdr.front_len = cpu_to_le32(req->r_request_release_offset);
1764
msg->front.iov_len = req->r_request_release_offset;
1765
return 0;
1766
}
1767
1768
if (req->r_request) {
1769
ceph_msg_put(req->r_request);
1770
req->r_request = NULL;
1771
}
1772
msg = create_request_message(mdsc, req, mds);
1773
if (IS_ERR(msg)) {
1774
req->r_err = PTR_ERR(msg);
1775
complete_request(mdsc, req);
1776
return PTR_ERR(msg);
1777
}
1778
req->r_request = msg;
1779
1780
rhead = msg->front.iov_base;
1781
rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
1782
if (req->r_got_unsafe)
1783
flags |= CEPH_MDS_FLAG_REPLAY;
1784
if (req->r_locked_dir)
1785
flags |= CEPH_MDS_FLAG_WANT_DENTRY;
1786
rhead->flags = cpu_to_le32(flags);
1787
rhead->num_fwd = req->r_num_fwd;
1788
rhead->num_retry = req->r_attempts - 1;
1789
rhead->ino = 0;
1790
1791
dout(" r_locked_dir = %p\n", req->r_locked_dir);
1792
return 0;
1793
}
1794
1795
/*
1796
* send request, or put it on the appropriate wait list.
1797
*/
1798
static int __do_request(struct ceph_mds_client *mdsc,
1799
struct ceph_mds_request *req)
1800
{
1801
struct ceph_mds_session *session = NULL;
1802
int mds = -1;
1803
int err = -EAGAIN;
1804
1805
if (req->r_err || req->r_got_result)
1806
goto out;
1807
1808
if (req->r_timeout &&
1809
time_after_eq(jiffies, req->r_started + req->r_timeout)) {
1810
dout("do_request timed out\n");
1811
err = -EIO;
1812
goto finish;
1813
}
1814
1815
put_request_session(req);
1816
1817
mds = __choose_mds(mdsc, req);
1818
if (mds < 0 ||
1819
ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
1820
dout("do_request no mds or not active, waiting for map\n");
1821
list_add(&req->r_wait, &mdsc->waiting_for_map);
1822
goto out;
1823
}
1824
1825
/* get, open session */
1826
session = __ceph_lookup_mds_session(mdsc, mds);
1827
if (!session) {
1828
session = register_session(mdsc, mds);
1829
if (IS_ERR(session)) {
1830
err = PTR_ERR(session);
1831
goto finish;
1832
}
1833
}
1834
req->r_session = get_session(session);
1835
1836
dout("do_request mds%d session %p state %s\n", mds, session,
1837
session_state_name(session->s_state));
1838
if (session->s_state != CEPH_MDS_SESSION_OPEN &&
1839
session->s_state != CEPH_MDS_SESSION_HUNG) {
1840
if (session->s_state == CEPH_MDS_SESSION_NEW ||
1841
session->s_state == CEPH_MDS_SESSION_CLOSING)
1842
__open_session(mdsc, session);
1843
list_add(&req->r_wait, &session->s_waiting);
1844
goto out_session;
1845
}
1846
1847
/* send request */
1848
req->r_resend_mds = -1; /* forget any previous mds hint */
1849
1850
if (req->r_request_started == 0) /* note request start time */
1851
req->r_request_started = jiffies;
1852
1853
err = __prepare_send_request(mdsc, req, mds);
1854
if (!err) {
1855
ceph_msg_get(req->r_request);
1856
ceph_con_send(&session->s_con, req->r_request);
1857
}
1858
1859
out_session:
1860
ceph_put_mds_session(session);
1861
out:
1862
return err;
1863
1864
finish:
1865
req->r_err = err;
1866
complete_request(mdsc, req);
1867
goto out;
1868
}
1869
1870
/*
1871
* called under mdsc->mutex
1872
*/
1873
static void __wake_requests(struct ceph_mds_client *mdsc,
1874
struct list_head *head)
1875
{
1876
struct ceph_mds_request *req, *nreq;
1877
1878
list_for_each_entry_safe(req, nreq, head, r_wait) {
1879
list_del_init(&req->r_wait);
1880
__do_request(mdsc, req);
1881
}
1882
}
1883
1884
/*
1885
* Wake up threads with requests pending for @mds, so that they can
1886
* resubmit their requests to a possibly different mds.
1887
*/
1888
static void kick_requests(struct ceph_mds_client *mdsc, int mds)
1889
{
1890
struct ceph_mds_request *req;
1891
struct rb_node *p;
1892
1893
dout("kick_requests mds%d\n", mds);
1894
for (p = rb_first(&mdsc->request_tree); p; p = rb_next(p)) {
1895
req = rb_entry(p, struct ceph_mds_request, r_node);
1896
if (req->r_got_unsafe)
1897
continue;
1898
if (req->r_session &&
1899
req->r_session->s_mds == mds) {
1900
dout(" kicking tid %llu\n", req->r_tid);
1901
__do_request(mdsc, req);
1902
}
1903
}
1904
}
1905
1906
void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
1907
struct ceph_mds_request *req)
1908
{
1909
dout("submit_request on %p\n", req);
1910
mutex_lock(&mdsc->mutex);
1911
__register_request(mdsc, req, NULL);
1912
__do_request(mdsc, req);
1913
mutex_unlock(&mdsc->mutex);
1914
}
1915
1916
/*
1917
* Synchrously perform an mds request. Take care of all of the
1918
* session setup, forwarding, retry details.
1919
*/
1920
int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
1921
struct inode *dir,
1922
struct ceph_mds_request *req)
1923
{
1924
int err;
1925
1926
dout("do_request on %p\n", req);
1927
1928
/* take CAP_PIN refs for r_inode, r_locked_dir, r_old_dentry */
1929
if (req->r_inode)
1930
ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
1931
if (req->r_locked_dir)
1932
ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
1933
if (req->r_old_dentry)
1934
ceph_get_cap_refs(
1935
ceph_inode(req->r_old_dentry->d_parent->d_inode),
1936
CEPH_CAP_PIN);
1937
1938
/* issue */
1939
mutex_lock(&mdsc->mutex);
1940
__register_request(mdsc, req, dir);
1941
__do_request(mdsc, req);
1942
1943
if (req->r_err) {
1944
err = req->r_err;
1945
__unregister_request(mdsc, req);
1946
dout("do_request early error %d\n", err);
1947
goto out;
1948
}
1949
1950
/* wait */
1951
mutex_unlock(&mdsc->mutex);
1952
dout("do_request waiting\n");
1953
if (req->r_timeout) {
1954
err = (long)wait_for_completion_killable_timeout(
1955
&req->r_completion, req->r_timeout);
1956
if (err == 0)
1957
err = -EIO;
1958
} else {
1959
err = wait_for_completion_killable(&req->r_completion);
1960
}
1961
dout("do_request waited, got %d\n", err);
1962
mutex_lock(&mdsc->mutex);
1963
1964
/* only abort if we didn't race with a real reply */
1965
if (req->r_got_result) {
1966
err = le32_to_cpu(req->r_reply_info.head->result);
1967
} else if (err < 0) {
1968
dout("aborted request %lld with %d\n", req->r_tid, err);
1969
1970
/*
1971
* ensure we aren't running concurrently with
1972
* ceph_fill_trace or ceph_readdir_prepopulate, which
1973
* rely on locks (dir mutex) held by our caller.
1974
*/
1975
mutex_lock(&req->r_fill_mutex);
1976
req->r_err = err;
1977
req->r_aborted = true;
1978
mutex_unlock(&req->r_fill_mutex);
1979
1980
if (req->r_locked_dir &&
1981
(req->r_op & CEPH_MDS_OP_WRITE))
1982
ceph_invalidate_dir_request(req);
1983
} else {
1984
err = req->r_err;
1985
}
1986
1987
out:
1988
mutex_unlock(&mdsc->mutex);
1989
dout("do_request %p done, result %d\n", req, err);
1990
return err;
1991
}
1992
1993
/*
1994
* Invalidate dir I_COMPLETE, dentry lease state on an aborted MDS
1995
* namespace request.
1996
*/
1997
void ceph_invalidate_dir_request(struct ceph_mds_request *req)
1998
{
1999
struct inode *inode = req->r_locked_dir;
2000
struct ceph_inode_info *ci = ceph_inode(inode);
2001
2002
dout("invalidate_dir_request %p (I_COMPLETE, lease(s))\n", inode);
2003
spin_lock(&inode->i_lock);
2004
ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
2005
ci->i_release_count++;
2006
spin_unlock(&inode->i_lock);
2007
2008
if (req->r_dentry)
2009
ceph_invalidate_dentry_lease(req->r_dentry);
2010
if (req->r_old_dentry)
2011
ceph_invalidate_dentry_lease(req->r_old_dentry);
2012
}
2013
2014
/*
2015
* Handle mds reply.
2016
*
2017
* We take the session mutex and parse and process the reply immediately.
2018
* This preserves the logical ordering of replies, capabilities, etc., sent
2019
* by the MDS as they are applied to our local cache.
2020
*/
2021
static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2022
{
2023
struct ceph_mds_client *mdsc = session->s_mdsc;
2024
struct ceph_mds_request *req;
2025
struct ceph_mds_reply_head *head = msg->front.iov_base;
2026
struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */
2027
u64 tid;
2028
int err, result;
2029
int mds = session->s_mds;
2030
2031
if (msg->front.iov_len < sizeof(*head)) {
2032
pr_err("mdsc_handle_reply got corrupt (short) reply\n");
2033
ceph_msg_dump(msg);
2034
return;
2035
}
2036
2037
/* get request, session */
2038
tid = le64_to_cpu(msg->hdr.tid);
2039
mutex_lock(&mdsc->mutex);
2040
req = __lookup_request(mdsc, tid);
2041
if (!req) {
2042
dout("handle_reply on unknown tid %llu\n", tid);
2043
mutex_unlock(&mdsc->mutex);
2044
return;
2045
}
2046
dout("handle_reply %p\n", req);
2047
2048
/* correct session? */
2049
if (req->r_session != session) {
2050
pr_err("mdsc_handle_reply got %llu on session mds%d"
2051
" not mds%d\n", tid, session->s_mds,
2052
req->r_session ? req->r_session->s_mds : -1);
2053
mutex_unlock(&mdsc->mutex);
2054
goto out;
2055
}
2056
2057
/* dup? */
2058
if ((req->r_got_unsafe && !head->safe) ||
2059
(req->r_got_safe && head->safe)) {
2060
pr_warning("got a dup %s reply on %llu from mds%d\n",
2061
head->safe ? "safe" : "unsafe", tid, mds);
2062
mutex_unlock(&mdsc->mutex);
2063
goto out;
2064
}
2065
if (req->r_got_safe && !head->safe) {
2066
pr_warning("got unsafe after safe on %llu from mds%d\n",
2067
tid, mds);
2068
mutex_unlock(&mdsc->mutex);
2069
goto out;
2070
}
2071
2072
result = le32_to_cpu(head->result);
2073
2074
/*
2075
* Handle an ESTALE
2076
* if we're not talking to the authority, send to them
2077
* if the authority has changed while we weren't looking,
2078
* send to new authority
2079
* Otherwise we just have to return an ESTALE
2080
*/
2081
if (result == -ESTALE) {
2082
dout("got ESTALE on request %llu", req->r_tid);
2083
if (!req->r_inode) {
2084
/* do nothing; not an authority problem */
2085
} else if (req->r_direct_mode != USE_AUTH_MDS) {
2086
dout("not using auth, setting for that now");
2087
req->r_direct_mode = USE_AUTH_MDS;
2088
__do_request(mdsc, req);
2089
mutex_unlock(&mdsc->mutex);
2090
goto out;
2091
} else {
2092
struct ceph_inode_info *ci = ceph_inode(req->r_inode);
2093
struct ceph_cap *cap = NULL;
2094
2095
if (req->r_session)
2096
cap = ceph_get_cap_for_mds(ci,
2097
req->r_session->s_mds);
2098
2099
dout("already using auth");
2100
if ((!cap || cap != ci->i_auth_cap) ||
2101
(cap->mseq != req->r_sent_on_mseq)) {
2102
dout("but cap changed, so resending");
2103
__do_request(mdsc, req);
2104
mutex_unlock(&mdsc->mutex);
2105
goto out;
2106
}
2107
}
2108
dout("have to return ESTALE on request %llu", req->r_tid);
2109
}
2110
2111
2112
if (head->safe) {
2113
req->r_got_safe = true;
2114
__unregister_request(mdsc, req);
2115
complete_all(&req->r_safe_completion);
2116
2117
if (req->r_got_unsafe) {
2118
/*
2119
* We already handled the unsafe response, now do the
2120
* cleanup. No need to examine the response; the MDS
2121
* doesn't include any result info in the safe
2122
* response. And even if it did, there is nothing
2123
* useful we could do with a revised return value.
2124
*/
2125
dout("got safe reply %llu, mds%d\n", tid, mds);
2126
list_del_init(&req->r_unsafe_item);
2127
2128
/* last unsafe request during umount? */
2129
if (mdsc->stopping && !__get_oldest_req(mdsc))
2130
complete_all(&mdsc->safe_umount_waiters);
2131
mutex_unlock(&mdsc->mutex);
2132
goto out;
2133
}
2134
} else {
2135
req->r_got_unsafe = true;
2136
list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
2137
}
2138
2139
dout("handle_reply tid %lld result %d\n", tid, result);
2140
rinfo = &req->r_reply_info;
2141
err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
2142
mutex_unlock(&mdsc->mutex);
2143
2144
mutex_lock(&session->s_mutex);
2145
if (err < 0) {
2146
pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
2147
ceph_msg_dump(msg);
2148
goto out_err;
2149
}
2150
2151
/* snap trace */
2152
if (rinfo->snapblob_len) {
2153
down_write(&mdsc->snap_rwsem);
2154
ceph_update_snap_trace(mdsc, rinfo->snapblob,
2155
rinfo->snapblob + rinfo->snapblob_len,
2156
le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP);
2157
downgrade_write(&mdsc->snap_rwsem);
2158
} else {
2159
down_read(&mdsc->snap_rwsem);
2160
}
2161
2162
/* insert trace into our cache */
2163
mutex_lock(&req->r_fill_mutex);
2164
err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
2165
if (err == 0) {
2166
if (result == 0 && req->r_op != CEPH_MDS_OP_GETFILELOCK &&
2167
rinfo->dir_nr)
2168
ceph_readdir_prepopulate(req, req->r_session);
2169
ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
2170
}
2171
mutex_unlock(&req->r_fill_mutex);
2172
2173
up_read(&mdsc->snap_rwsem);
2174
out_err:
2175
mutex_lock(&mdsc->mutex);
2176
if (!req->r_aborted) {
2177
if (err) {
2178
req->r_err = err;
2179
} else {
2180
req->r_reply = msg;
2181
ceph_msg_get(msg);
2182
req->r_got_result = true;
2183
}
2184
} else {
2185
dout("reply arrived after request %lld was aborted\n", tid);
2186
}
2187
mutex_unlock(&mdsc->mutex);
2188
2189
ceph_add_cap_releases(mdsc, req->r_session);
2190
mutex_unlock(&session->s_mutex);
2191
2192
/* kick calling process */
2193
complete_request(mdsc, req);
2194
out:
2195
ceph_mdsc_put_request(req);
2196
return;
2197
}
2198
2199
2200
2201
/*
2202
* handle mds notification that our request has been forwarded.
2203
*/
2204
static void handle_forward(struct ceph_mds_client *mdsc,
2205
struct ceph_mds_session *session,
2206
struct ceph_msg *msg)
2207
{
2208
struct ceph_mds_request *req;
2209
u64 tid = le64_to_cpu(msg->hdr.tid);
2210
u32 next_mds;
2211
u32 fwd_seq;
2212
int err = -EINVAL;
2213
void *p = msg->front.iov_base;
2214
void *end = p + msg->front.iov_len;
2215
2216
ceph_decode_need(&p, end, 2*sizeof(u32), bad);
2217
next_mds = ceph_decode_32(&p);
2218
fwd_seq = ceph_decode_32(&p);
2219
2220
mutex_lock(&mdsc->mutex);
2221
req = __lookup_request(mdsc, tid);
2222
if (!req) {
2223
dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
2224
goto out; /* dup reply? */
2225
}
2226
2227
if (req->r_aborted) {
2228
dout("forward tid %llu aborted, unregistering\n", tid);
2229
__unregister_request(mdsc, req);
2230
} else if (fwd_seq <= req->r_num_fwd) {
2231
dout("forward tid %llu to mds%d - old seq %d <= %d\n",
2232
tid, next_mds, req->r_num_fwd, fwd_seq);
2233
} else {
2234
/* resend. forward race not possible; mds would drop */
2235
dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
2236
BUG_ON(req->r_err);
2237
BUG_ON(req->r_got_result);
2238
req->r_num_fwd = fwd_seq;
2239
req->r_resend_mds = next_mds;
2240
put_request_session(req);
2241
__do_request(mdsc, req);
2242
}
2243
ceph_mdsc_put_request(req);
2244
out:
2245
mutex_unlock(&mdsc->mutex);
2246
return;
2247
2248
bad:
2249
pr_err("mdsc_handle_forward decode error err=%d\n", err);
2250
}
2251
2252
/*
2253
* handle a mds session control message
2254
*/
2255
static void handle_session(struct ceph_mds_session *session,
2256
struct ceph_msg *msg)
2257
{
2258
struct ceph_mds_client *mdsc = session->s_mdsc;
2259
u32 op;
2260
u64 seq;
2261
int mds = session->s_mds;
2262
struct ceph_mds_session_head *h = msg->front.iov_base;
2263
int wake = 0;
2264
2265
/* decode */
2266
if (msg->front.iov_len != sizeof(*h))
2267
goto bad;
2268
op = le32_to_cpu(h->op);
2269
seq = le64_to_cpu(h->seq);
2270
2271
mutex_lock(&mdsc->mutex);
2272
if (op == CEPH_SESSION_CLOSE)
2273
__unregister_session(mdsc, session);
2274
/* FIXME: this ttl calculation is generous */
2275
session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
2276
mutex_unlock(&mdsc->mutex);
2277
2278
mutex_lock(&session->s_mutex);
2279
2280
dout("handle_session mds%d %s %p state %s seq %llu\n",
2281
mds, ceph_session_op_name(op), session,
2282
session_state_name(session->s_state), seq);
2283
2284
if (session->s_state == CEPH_MDS_SESSION_HUNG) {
2285
session->s_state = CEPH_MDS_SESSION_OPEN;
2286
pr_info("mds%d came back\n", session->s_mds);
2287
}
2288
2289
switch (op) {
2290
case CEPH_SESSION_OPEN:
2291
if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2292
pr_info("mds%d reconnect success\n", session->s_mds);
2293
session->s_state = CEPH_MDS_SESSION_OPEN;
2294
renewed_caps(mdsc, session, 0);
2295
wake = 1;
2296
if (mdsc->stopping)
2297
__close_session(mdsc, session);
2298
break;
2299
2300
case CEPH_SESSION_RENEWCAPS:
2301
if (session->s_renew_seq == seq)
2302
renewed_caps(mdsc, session, 1);
2303
break;
2304
2305
case CEPH_SESSION_CLOSE:
2306
if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2307
pr_info("mds%d reconnect denied\n", session->s_mds);
2308
remove_session_caps(session);
2309
wake = 1; /* for good measure */
2310
wake_up_all(&mdsc->session_close_wq);
2311
kick_requests(mdsc, mds);
2312
break;
2313
2314
case CEPH_SESSION_STALE:
2315
pr_info("mds%d caps went stale, renewing\n",
2316
session->s_mds);
2317
spin_lock(&session->s_cap_lock);
2318
session->s_cap_gen++;
2319
session->s_cap_ttl = 0;
2320
spin_unlock(&session->s_cap_lock);
2321
send_renew_caps(mdsc, session);
2322
break;
2323
2324
case CEPH_SESSION_RECALL_STATE:
2325
trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
2326
break;
2327
2328
default:
2329
pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
2330
WARN_ON(1);
2331
}
2332
2333
mutex_unlock(&session->s_mutex);
2334
if (wake) {
2335
mutex_lock(&mdsc->mutex);
2336
__wake_requests(mdsc, &session->s_waiting);
2337
mutex_unlock(&mdsc->mutex);
2338
}
2339
return;
2340
2341
bad:
2342
pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
2343
(int)msg->front.iov_len);
2344
ceph_msg_dump(msg);
2345
return;
2346
}
2347
2348
2349
/*
2350
* called under session->mutex.
2351
*/
2352
static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
2353
struct ceph_mds_session *session)
2354
{
2355
struct ceph_mds_request *req, *nreq;
2356
int err;
2357
2358
dout("replay_unsafe_requests mds%d\n", session->s_mds);
2359
2360
mutex_lock(&mdsc->mutex);
2361
list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
2362
err = __prepare_send_request(mdsc, req, session->s_mds);
2363
if (!err) {
2364
ceph_msg_get(req->r_request);
2365
ceph_con_send(&session->s_con, req->r_request);
2366
}
2367
}
2368
mutex_unlock(&mdsc->mutex);
2369
}
2370
2371
/*
2372
* Encode information about a cap for a reconnect with the MDS.
2373
*/
2374
static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
2375
void *arg)
2376
{
2377
union {
2378
struct ceph_mds_cap_reconnect v2;
2379
struct ceph_mds_cap_reconnect_v1 v1;
2380
} rec;
2381
size_t reclen;
2382
struct ceph_inode_info *ci;
2383
struct ceph_reconnect_state *recon_state = arg;
2384
struct ceph_pagelist *pagelist = recon_state->pagelist;
2385
char *path;
2386
int pathlen, err;
2387
u64 pathbase;
2388
struct dentry *dentry;
2389
2390
ci = cap->ci;
2391
2392
dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
2393
inode, ceph_vinop(inode), cap, cap->cap_id,
2394
ceph_cap_string(cap->issued));
2395
err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
2396
if (err)
2397
return err;
2398
2399
dentry = d_find_alias(inode);
2400
if (dentry) {
2401
path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
2402
if (IS_ERR(path)) {
2403
err = PTR_ERR(path);
2404
goto out_dput;
2405
}
2406
} else {
2407
path = NULL;
2408
pathlen = 0;
2409
}
2410
err = ceph_pagelist_encode_string(pagelist, path, pathlen);
2411
if (err)
2412
goto out_free;
2413
2414
spin_lock(&inode->i_lock);
2415
cap->seq = 0; /* reset cap seq */
2416
cap->issue_seq = 0; /* and issue_seq */
2417
2418
if (recon_state->flock) {
2419
rec.v2.cap_id = cpu_to_le64(cap->cap_id);
2420
rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2421
rec.v2.issued = cpu_to_le32(cap->issued);
2422
rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2423
rec.v2.pathbase = cpu_to_le64(pathbase);
2424
rec.v2.flock_len = 0;
2425
reclen = sizeof(rec.v2);
2426
} else {
2427
rec.v1.cap_id = cpu_to_le64(cap->cap_id);
2428
rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2429
rec.v1.issued = cpu_to_le32(cap->issued);
2430
rec.v1.size = cpu_to_le64(inode->i_size);
2431
ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime);
2432
ceph_encode_timespec(&rec.v1.atime, &inode->i_atime);
2433
rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2434
rec.v1.pathbase = cpu_to_le64(pathbase);
2435
reclen = sizeof(rec.v1);
2436
}
2437
spin_unlock(&inode->i_lock);
2438
2439
if (recon_state->flock) {
2440
int num_fcntl_locks, num_flock_locks;
2441
struct ceph_pagelist_cursor trunc_point;
2442
2443
ceph_pagelist_set_cursor(pagelist, &trunc_point);
2444
do {
2445
lock_flocks();
2446
ceph_count_locks(inode, &num_fcntl_locks,
2447
&num_flock_locks);
2448
rec.v2.flock_len = (2*sizeof(u32) +
2449
(num_fcntl_locks+num_flock_locks) *
2450
sizeof(struct ceph_filelock));
2451
unlock_flocks();
2452
2453
/* pre-alloc pagelist */
2454
ceph_pagelist_truncate(pagelist, &trunc_point);
2455
err = ceph_pagelist_append(pagelist, &rec, reclen);
2456
if (!err)
2457
err = ceph_pagelist_reserve(pagelist,
2458
rec.v2.flock_len);
2459
2460
/* encode locks */
2461
if (!err) {
2462
lock_flocks();
2463
err = ceph_encode_locks(inode,
2464
pagelist,
2465
num_fcntl_locks,
2466
num_flock_locks);
2467
unlock_flocks();
2468
}
2469
} while (err == -ENOSPC);
2470
} else {
2471
err = ceph_pagelist_append(pagelist, &rec, reclen);
2472
}
2473
2474
out_free:
2475
kfree(path);
2476
out_dput:
2477
dput(dentry);
2478
return err;
2479
}
2480
2481
2482
/*
2483
* If an MDS fails and recovers, clients need to reconnect in order to
2484
* reestablish shared state. This includes all caps issued through
2485
* this session _and_ the snap_realm hierarchy. Because it's not
2486
* clear which snap realms the mds cares about, we send everything we
2487
* know about.. that ensures we'll then get any new info the
2488
* recovering MDS might have.
2489
*
2490
* This is a relatively heavyweight operation, but it's rare.
2491
*
2492
* called with mdsc->mutex held.
2493
*/
2494
static void send_mds_reconnect(struct ceph_mds_client *mdsc,
2495
struct ceph_mds_session *session)
2496
{
2497
struct ceph_msg *reply;
2498
struct rb_node *p;
2499
int mds = session->s_mds;
2500
int err = -ENOMEM;
2501
struct ceph_pagelist *pagelist;
2502
struct ceph_reconnect_state recon_state;
2503
2504
pr_info("mds%d reconnect start\n", mds);
2505
2506
pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
2507
if (!pagelist)
2508
goto fail_nopagelist;
2509
ceph_pagelist_init(pagelist);
2510
2511
reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS);
2512
if (!reply)
2513
goto fail_nomsg;
2514
2515
mutex_lock(&session->s_mutex);
2516
session->s_state = CEPH_MDS_SESSION_RECONNECTING;
2517
session->s_seq = 0;
2518
2519
ceph_con_open(&session->s_con,
2520
ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
2521
2522
/* replay unsafe requests */
2523
replay_unsafe_requests(mdsc, session);
2524
2525
down_read(&mdsc->snap_rwsem);
2526
2527
dout("session %p state %s\n", session,
2528
session_state_name(session->s_state));
2529
2530
/* drop old cap expires; we're about to reestablish that state */
2531
discard_cap_releases(mdsc, session);
2532
2533
/* traverse this session's caps */
2534
err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps);
2535
if (err)
2536
goto fail;
2537
2538
recon_state.pagelist = pagelist;
2539
recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK;
2540
err = iterate_session_caps(session, encode_caps_cb, &recon_state);
2541
if (err < 0)
2542
goto fail;
2543
2544
/*
2545
* snaprealms. we provide mds with the ino, seq (version), and
2546
* parent for all of our realms. If the mds has any newer info,
2547
* it will tell us.
2548
*/
2549
for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
2550
struct ceph_snap_realm *realm =
2551
rb_entry(p, struct ceph_snap_realm, node);
2552
struct ceph_mds_snaprealm_reconnect sr_rec;
2553
2554
dout(" adding snap realm %llx seq %lld parent %llx\n",
2555
realm->ino, realm->seq, realm->parent_ino);
2556
sr_rec.ino = cpu_to_le64(realm->ino);
2557
sr_rec.seq = cpu_to_le64(realm->seq);
2558
sr_rec.parent = cpu_to_le64(realm->parent_ino);
2559
err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
2560
if (err)
2561
goto fail;
2562
}
2563
2564
reply->pagelist = pagelist;
2565
if (recon_state.flock)
2566
reply->hdr.version = cpu_to_le16(2);
2567
reply->hdr.data_len = cpu_to_le32(pagelist->length);
2568
reply->nr_pages = calc_pages_for(0, pagelist->length);
2569
ceph_con_send(&session->s_con, reply);
2570
2571
mutex_unlock(&session->s_mutex);
2572
2573
mutex_lock(&mdsc->mutex);
2574
__wake_requests(mdsc, &session->s_waiting);
2575
mutex_unlock(&mdsc->mutex);
2576
2577
up_read(&mdsc->snap_rwsem);
2578
return;
2579
2580
fail:
2581
ceph_msg_put(reply);
2582
up_read(&mdsc->snap_rwsem);
2583
mutex_unlock(&session->s_mutex);
2584
fail_nomsg:
2585
ceph_pagelist_release(pagelist);
2586
kfree(pagelist);
2587
fail_nopagelist:
2588
pr_err("error %d preparing reconnect for mds%d\n", err, mds);
2589
return;
2590
}
2591
2592
2593
/*
2594
* compare old and new mdsmaps, kicking requests
2595
* and closing out old connections as necessary
2596
*
2597
* called under mdsc->mutex.
2598
*/
2599
static void check_new_map(struct ceph_mds_client *mdsc,
2600
struct ceph_mdsmap *newmap,
2601
struct ceph_mdsmap *oldmap)
2602
{
2603
int i;
2604
int oldstate, newstate;
2605
struct ceph_mds_session *s;
2606
2607
dout("check_new_map new %u old %u\n",
2608
newmap->m_epoch, oldmap->m_epoch);
2609
2610
for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) {
2611
if (mdsc->sessions[i] == NULL)
2612
continue;
2613
s = mdsc->sessions[i];
2614
oldstate = ceph_mdsmap_get_state(oldmap, i);
2615
newstate = ceph_mdsmap_get_state(newmap, i);
2616
2617
dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
2618
i, ceph_mds_state_name(oldstate),
2619
ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
2620
ceph_mds_state_name(newstate),
2621
ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
2622
session_state_name(s->s_state));
2623
2624
if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
2625
ceph_mdsmap_get_addr(newmap, i),
2626
sizeof(struct ceph_entity_addr))) {
2627
if (s->s_state == CEPH_MDS_SESSION_OPENING) {
2628
/* the session never opened, just close it
2629
* out now */
2630
__wake_requests(mdsc, &s->s_waiting);
2631
__unregister_session(mdsc, s);
2632
} else {
2633
/* just close it */
2634
mutex_unlock(&mdsc->mutex);
2635
mutex_lock(&s->s_mutex);
2636
mutex_lock(&mdsc->mutex);
2637
ceph_con_close(&s->s_con);
2638
mutex_unlock(&s->s_mutex);
2639
s->s_state = CEPH_MDS_SESSION_RESTARTING;
2640
}
2641
2642
/* kick any requests waiting on the recovering mds */
2643
kick_requests(mdsc, i);
2644
} else if (oldstate == newstate) {
2645
continue; /* nothing new with this mds */
2646
}
2647
2648
/*
2649
* send reconnect?
2650
*/
2651
if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
2652
newstate >= CEPH_MDS_STATE_RECONNECT) {
2653
mutex_unlock(&mdsc->mutex);
2654
send_mds_reconnect(mdsc, s);
2655
mutex_lock(&mdsc->mutex);
2656
}
2657
2658
/*
2659
* kick request on any mds that has gone active.
2660
*/
2661
if (oldstate < CEPH_MDS_STATE_ACTIVE &&
2662
newstate >= CEPH_MDS_STATE_ACTIVE) {
2663
if (oldstate != CEPH_MDS_STATE_CREATING &&
2664
oldstate != CEPH_MDS_STATE_STARTING)
2665
pr_info("mds%d recovery completed\n", s->s_mds);
2666
kick_requests(mdsc, i);
2667
ceph_kick_flushing_caps(mdsc, s);
2668
wake_up_session_caps(s, 1);
2669
}
2670
}
2671
2672
for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) {
2673
s = mdsc->sessions[i];
2674
if (!s)
2675
continue;
2676
if (!ceph_mdsmap_is_laggy(newmap, i))
2677
continue;
2678
if (s->s_state == CEPH_MDS_SESSION_OPEN ||
2679
s->s_state == CEPH_MDS_SESSION_HUNG ||
2680
s->s_state == CEPH_MDS_SESSION_CLOSING) {
2681
dout(" connecting to export targets of laggy mds%d\n",
2682
i);
2683
__open_export_target_sessions(mdsc, s);
2684
}
2685
}
2686
}
2687
2688
2689
2690
/*
2691
* leases
2692
*/
2693
2694
/*
2695
* caller must hold session s_mutex, dentry->d_lock
2696
*/
2697
void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
2698
{
2699
struct ceph_dentry_info *di = ceph_dentry(dentry);
2700
2701
ceph_put_mds_session(di->lease_session);
2702
di->lease_session = NULL;
2703
}
2704
2705
static void handle_lease(struct ceph_mds_client *mdsc,
2706
struct ceph_mds_session *session,
2707
struct ceph_msg *msg)
2708
{
2709
struct super_block *sb = mdsc->fsc->sb;
2710
struct inode *inode;
2711
struct dentry *parent, *dentry;
2712
struct ceph_dentry_info *di;
2713
int mds = session->s_mds;
2714
struct ceph_mds_lease *h = msg->front.iov_base;
2715
u32 seq;
2716
struct ceph_vino vino;
2717
int mask;
2718
struct qstr dname;
2719
int release = 0;
2720
2721
dout("handle_lease from mds%d\n", mds);
2722
2723
/* decode */
2724
if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
2725
goto bad;
2726
vino.ino = le64_to_cpu(h->ino);
2727
vino.snap = CEPH_NOSNAP;
2728
mask = le16_to_cpu(h->mask);
2729
seq = le32_to_cpu(h->seq);
2730
dname.name = (void *)h + sizeof(*h) + sizeof(u32);
2731
dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
2732
if (dname.len != get_unaligned_le32(h+1))
2733
goto bad;
2734
2735
mutex_lock(&session->s_mutex);
2736
session->s_seq++;
2737
2738
/* lookup inode */
2739
inode = ceph_find_inode(sb, vino);
2740
dout("handle_lease %s, mask %d, ino %llx %p %.*s\n",
2741
ceph_lease_op_name(h->action), mask, vino.ino, inode,
2742
dname.len, dname.name);
2743
if (inode == NULL) {
2744
dout("handle_lease no inode %llx\n", vino.ino);
2745
goto release;
2746
}
2747
2748
/* dentry */
2749
parent = d_find_alias(inode);
2750
if (!parent) {
2751
dout("no parent dentry on inode %p\n", inode);
2752
WARN_ON(1);
2753
goto release; /* hrm... */
2754
}
2755
dname.hash = full_name_hash(dname.name, dname.len);
2756
dentry = d_lookup(parent, &dname);
2757
dput(parent);
2758
if (!dentry)
2759
goto release;
2760
2761
spin_lock(&dentry->d_lock);
2762
di = ceph_dentry(dentry);
2763
switch (h->action) {
2764
case CEPH_MDS_LEASE_REVOKE:
2765
if (di && di->lease_session == session) {
2766
if (ceph_seq_cmp(di->lease_seq, seq) > 0)
2767
h->seq = cpu_to_le32(di->lease_seq);
2768
__ceph_mdsc_drop_dentry_lease(dentry);
2769
}
2770
release = 1;
2771
break;
2772
2773
case CEPH_MDS_LEASE_RENEW:
2774
if (di && di->lease_session == session &&
2775
di->lease_gen == session->s_cap_gen &&
2776
di->lease_renew_from &&
2777
di->lease_renew_after == 0) {
2778
unsigned long duration =
2779
le32_to_cpu(h->duration_ms) * HZ / 1000;
2780
2781
di->lease_seq = seq;
2782
dentry->d_time = di->lease_renew_from + duration;
2783
di->lease_renew_after = di->lease_renew_from +
2784
(duration >> 1);
2785
di->lease_renew_from = 0;
2786
}
2787
break;
2788
}
2789
spin_unlock(&dentry->d_lock);
2790
dput(dentry);
2791
2792
if (!release)
2793
goto out;
2794
2795
release:
2796
/* let's just reuse the same message */
2797
h->action = CEPH_MDS_LEASE_REVOKE_ACK;
2798
ceph_msg_get(msg);
2799
ceph_con_send(&session->s_con, msg);
2800
2801
out:
2802
iput(inode);
2803
mutex_unlock(&session->s_mutex);
2804
return;
2805
2806
bad:
2807
pr_err("corrupt lease message\n");
2808
ceph_msg_dump(msg);
2809
}
2810
2811
void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
2812
struct inode *inode,
2813
struct dentry *dentry, char action,
2814
u32 seq)
2815
{
2816
struct ceph_msg *msg;
2817
struct ceph_mds_lease *lease;
2818
int len = sizeof(*lease) + sizeof(u32);
2819
int dnamelen = 0;
2820
2821
dout("lease_send_msg inode %p dentry %p %s to mds%d\n",
2822
inode, dentry, ceph_lease_op_name(action), session->s_mds);
2823
dnamelen = dentry->d_name.len;
2824
len += dnamelen;
2825
2826
msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS);
2827
if (!msg)
2828
return;
2829
lease = msg->front.iov_base;
2830
lease->action = action;
2831
lease->mask = cpu_to_le16(1);
2832
lease->ino = cpu_to_le64(ceph_vino(inode).ino);
2833
lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
2834
lease->seq = cpu_to_le32(seq);
2835
put_unaligned_le32(dnamelen, lease + 1);
2836
memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen);
2837
2838
/*
2839
* if this is a preemptive lease RELEASE, no need to
2840
* flush request stream, since the actual request will
2841
* soon follow.
2842
*/
2843
msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
2844
2845
ceph_con_send(&session->s_con, msg);
2846
}
2847
2848
/*
2849
* Preemptively release a lease we expect to invalidate anyway.
2850
* Pass @inode always, @dentry is optional.
2851
*/
2852
void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
2853
struct dentry *dentry, int mask)
2854
{
2855
struct ceph_dentry_info *di;
2856
struct ceph_mds_session *session;
2857
u32 seq;
2858
2859
BUG_ON(inode == NULL);
2860
BUG_ON(dentry == NULL);
2861
BUG_ON(mask == 0);
2862
2863
/* is dentry lease valid? */
2864
spin_lock(&dentry->d_lock);
2865
di = ceph_dentry(dentry);
2866
if (!di || !di->lease_session ||
2867
di->lease_session->s_mds < 0 ||
2868
di->lease_gen != di->lease_session->s_cap_gen ||
2869
!time_before(jiffies, dentry->d_time)) {
2870
dout("lease_release inode %p dentry %p -- "
2871
"no lease on %d\n",
2872
inode, dentry, mask);
2873
spin_unlock(&dentry->d_lock);
2874
return;
2875
}
2876
2877
/* we do have a lease on this dentry; note mds and seq */
2878
session = ceph_get_mds_session(di->lease_session);
2879
seq = di->lease_seq;
2880
__ceph_mdsc_drop_dentry_lease(dentry);
2881
spin_unlock(&dentry->d_lock);
2882
2883
dout("lease_release inode %p dentry %p mask %d to mds%d\n",
2884
inode, dentry, mask, session->s_mds);
2885
ceph_mdsc_lease_send_msg(session, inode, dentry,
2886
CEPH_MDS_LEASE_RELEASE, seq);
2887
ceph_put_mds_session(session);
2888
}
2889
2890
/*
2891
* drop all leases (and dentry refs) in preparation for umount
2892
*/
2893
static void drop_leases(struct ceph_mds_client *mdsc)
2894
{
2895
int i;
2896
2897
dout("drop_leases\n");
2898
mutex_lock(&mdsc->mutex);
2899
for (i = 0; i < mdsc->max_sessions; i++) {
2900
struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
2901
if (!s)
2902
continue;
2903
mutex_unlock(&mdsc->mutex);
2904
mutex_lock(&s->s_mutex);
2905
mutex_unlock(&s->s_mutex);
2906
ceph_put_mds_session(s);
2907
mutex_lock(&mdsc->mutex);
2908
}
2909
mutex_unlock(&mdsc->mutex);
2910
}
2911
2912
2913
2914
/*
2915
* delayed work -- periodically trim expired leases, renew caps with mds
2916
*/
2917
static void schedule_delayed(struct ceph_mds_client *mdsc)
2918
{
2919
int delay = 5;
2920
unsigned hz = round_jiffies_relative(HZ * delay);
2921
schedule_delayed_work(&mdsc->delayed_work, hz);
2922
}
2923
2924
static void delayed_work(struct work_struct *work)
2925
{
2926
int i;
2927
struct ceph_mds_client *mdsc =
2928
container_of(work, struct ceph_mds_client, delayed_work.work);
2929
int renew_interval;
2930
int renew_caps;
2931
2932
dout("mdsc delayed_work\n");
2933
ceph_check_delayed_caps(mdsc);
2934
2935
mutex_lock(&mdsc->mutex);
2936
renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
2937
renew_caps = time_after_eq(jiffies, HZ*renew_interval +
2938
mdsc->last_renew_caps);
2939
if (renew_caps)
2940
mdsc->last_renew_caps = jiffies;
2941
2942
for (i = 0; i < mdsc->max_sessions; i++) {
2943
struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
2944
if (s == NULL)
2945
continue;
2946
if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
2947
dout("resending session close request for mds%d\n",
2948
s->s_mds);
2949
request_close_session(mdsc, s);
2950
ceph_put_mds_session(s);
2951
continue;
2952
}
2953
if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
2954
if (s->s_state == CEPH_MDS_SESSION_OPEN) {
2955
s->s_state = CEPH_MDS_SESSION_HUNG;
2956
pr_info("mds%d hung\n", s->s_mds);
2957
}
2958
}
2959
if (s->s_state < CEPH_MDS_SESSION_OPEN) {
2960
/* this mds is failed or recovering, just wait */
2961
ceph_put_mds_session(s);
2962
continue;
2963
}
2964
mutex_unlock(&mdsc->mutex);
2965
2966
mutex_lock(&s->s_mutex);
2967
if (renew_caps)
2968
send_renew_caps(mdsc, s);
2969
else
2970
ceph_con_keepalive(&s->s_con);
2971
ceph_add_cap_releases(mdsc, s);
2972
if (s->s_state == CEPH_MDS_SESSION_OPEN ||
2973
s->s_state == CEPH_MDS_SESSION_HUNG)
2974
ceph_send_cap_releases(mdsc, s);
2975
mutex_unlock(&s->s_mutex);
2976
ceph_put_mds_session(s);
2977
2978
mutex_lock(&mdsc->mutex);
2979
}
2980
mutex_unlock(&mdsc->mutex);
2981
2982
schedule_delayed(mdsc);
2983
}
2984
2985
int ceph_mdsc_init(struct ceph_fs_client *fsc)
2986
2987
{
2988
struct ceph_mds_client *mdsc;
2989
2990
mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
2991
if (!mdsc)
2992
return -ENOMEM;
2993
mdsc->fsc = fsc;
2994
fsc->mdsc = mdsc;
2995
mutex_init(&mdsc->mutex);
2996
mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
2997
if (mdsc->mdsmap == NULL)
2998
return -ENOMEM;
2999
3000
init_completion(&mdsc->safe_umount_waiters);
3001
init_waitqueue_head(&mdsc->session_close_wq);
3002
INIT_LIST_HEAD(&mdsc->waiting_for_map);
3003
mdsc->sessions = NULL;
3004
mdsc->max_sessions = 0;
3005
mdsc->stopping = 0;
3006
init_rwsem(&mdsc->snap_rwsem);
3007
mdsc->snap_realms = RB_ROOT;
3008
INIT_LIST_HEAD(&mdsc->snap_empty);
3009
spin_lock_init(&mdsc->snap_empty_lock);
3010
mdsc->last_tid = 0;
3011
mdsc->request_tree = RB_ROOT;
3012
INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
3013
mdsc->last_renew_caps = jiffies;
3014
INIT_LIST_HEAD(&mdsc->cap_delay_list);
3015
spin_lock_init(&mdsc->cap_delay_lock);
3016
INIT_LIST_HEAD(&mdsc->snap_flush_list);
3017
spin_lock_init(&mdsc->snap_flush_lock);
3018
mdsc->cap_flush_seq = 0;
3019
INIT_LIST_HEAD(&mdsc->cap_dirty);
3020
INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
3021
mdsc->num_cap_flushing = 0;
3022
spin_lock_init(&mdsc->cap_dirty_lock);
3023
init_waitqueue_head(&mdsc->cap_flushing_wq);
3024
spin_lock_init(&mdsc->dentry_lru_lock);
3025
INIT_LIST_HEAD(&mdsc->dentry_lru);
3026
3027
ceph_caps_init(mdsc);
3028
ceph_adjust_min_caps(mdsc, fsc->min_caps);
3029
3030
return 0;
3031
}
3032
3033
/*
3034
* Wait for safe replies on open mds requests. If we time out, drop
3035
* all requests from the tree to avoid dangling dentry refs.
3036
*/
3037
static void wait_requests(struct ceph_mds_client *mdsc)
3038
{
3039
struct ceph_mds_request *req;
3040
struct ceph_fs_client *fsc = mdsc->fsc;
3041
3042
mutex_lock(&mdsc->mutex);
3043
if (__get_oldest_req(mdsc)) {
3044
mutex_unlock(&mdsc->mutex);
3045
3046
dout("wait_requests waiting for requests\n");
3047
wait_for_completion_timeout(&mdsc->safe_umount_waiters,
3048
fsc->client->options->mount_timeout * HZ);
3049
3050
/* tear down remaining requests */
3051
mutex_lock(&mdsc->mutex);
3052
while ((req = __get_oldest_req(mdsc))) {
3053
dout("wait_requests timed out on tid %llu\n",
3054
req->r_tid);
3055
__unregister_request(mdsc, req);
3056
}
3057
}
3058
mutex_unlock(&mdsc->mutex);
3059
dout("wait_requests done\n");
3060
}
3061
3062
/*
3063
* called before mount is ro, and before dentries are torn down.
3064
* (hmm, does this still race with new lookups?)
3065
*/
3066
void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
3067
{
3068
dout("pre_umount\n");
3069
mdsc->stopping = 1;
3070
3071
drop_leases(mdsc);
3072
ceph_flush_dirty_caps(mdsc);
3073
wait_requests(mdsc);
3074
3075
/*
3076
* wait for reply handlers to drop their request refs and
3077
* their inode/dcache refs
3078
*/
3079
ceph_msgr_flush();
3080
}
3081
3082
/*
3083
* wait for all write mds requests to flush.
3084
*/
3085
static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
3086
{
3087
struct ceph_mds_request *req = NULL, *nextreq;
3088
struct rb_node *n;
3089
3090
mutex_lock(&mdsc->mutex);
3091
dout("wait_unsafe_requests want %lld\n", want_tid);
3092
restart:
3093
req = __get_oldest_req(mdsc);
3094
while (req && req->r_tid <= want_tid) {
3095
/* find next request */
3096
n = rb_next(&req->r_node);
3097
if (n)
3098
nextreq = rb_entry(n, struct ceph_mds_request, r_node);
3099
else
3100
nextreq = NULL;
3101
if ((req->r_op & CEPH_MDS_OP_WRITE)) {
3102
/* write op */
3103
ceph_mdsc_get_request(req);
3104
if (nextreq)
3105
ceph_mdsc_get_request(nextreq);
3106
mutex_unlock(&mdsc->mutex);
3107
dout("wait_unsafe_requests wait on %llu (want %llu)\n",
3108
req->r_tid, want_tid);
3109
wait_for_completion(&req->r_safe_completion);
3110
mutex_lock(&mdsc->mutex);
3111
ceph_mdsc_put_request(req);
3112
if (!nextreq)
3113
break; /* next dne before, so we're done! */
3114
if (RB_EMPTY_NODE(&nextreq->r_node)) {
3115
/* next request was removed from tree */
3116
ceph_mdsc_put_request(nextreq);
3117
goto restart;
3118
}
3119
ceph_mdsc_put_request(nextreq); /* won't go away */
3120
}
3121
req = nextreq;
3122
}
3123
mutex_unlock(&mdsc->mutex);
3124
dout("wait_unsafe_requests done\n");
3125
}
3126
3127
void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
3128
{
3129
u64 want_tid, want_flush;
3130
3131
if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
3132
return;
3133
3134
dout("sync\n");
3135
mutex_lock(&mdsc->mutex);
3136
want_tid = mdsc->last_tid;
3137
want_flush = mdsc->cap_flush_seq;
3138
mutex_unlock(&mdsc->mutex);
3139
dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
3140
3141
ceph_flush_dirty_caps(mdsc);
3142
3143
wait_unsafe_requests(mdsc, want_tid);
3144
wait_event(mdsc->cap_flushing_wq, check_cap_flush(mdsc, want_flush));
3145
}
3146
3147
/*
3148
* true if all sessions are closed, or we force unmount
3149
*/
3150
bool done_closing_sessions(struct ceph_mds_client *mdsc)
3151
{
3152
int i, n = 0;
3153
3154
if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
3155
return true;
3156
3157
mutex_lock(&mdsc->mutex);
3158
for (i = 0; i < mdsc->max_sessions; i++)
3159
if (mdsc->sessions[i])
3160
n++;
3161
mutex_unlock(&mdsc->mutex);
3162
return n == 0;
3163
}
3164
3165
/*
3166
* called after sb is ro.
3167
*/
3168
void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
3169
{
3170
struct ceph_mds_session *session;
3171
int i;
3172
struct ceph_fs_client *fsc = mdsc->fsc;
3173
unsigned long timeout = fsc->client->options->mount_timeout * HZ;
3174
3175
dout("close_sessions\n");
3176
3177
/* close sessions */
3178
mutex_lock(&mdsc->mutex);
3179
for (i = 0; i < mdsc->max_sessions; i++) {
3180
session = __ceph_lookup_mds_session(mdsc, i);
3181
if (!session)
3182
continue;
3183
mutex_unlock(&mdsc->mutex);
3184
mutex_lock(&session->s_mutex);
3185
__close_session(mdsc, session);
3186
mutex_unlock(&session->s_mutex);
3187
ceph_put_mds_session(session);
3188
mutex_lock(&mdsc->mutex);
3189
}
3190
mutex_unlock(&mdsc->mutex);
3191
3192
dout("waiting for sessions to close\n");
3193
wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc),
3194
timeout);
3195
3196
/* tear down remaining sessions */
3197
mutex_lock(&mdsc->mutex);
3198
for (i = 0; i < mdsc->max_sessions; i++) {
3199
if (mdsc->sessions[i]) {
3200
session = get_session(mdsc->sessions[i]);
3201
__unregister_session(mdsc, session);
3202
mutex_unlock(&mdsc->mutex);
3203
mutex_lock(&session->s_mutex);
3204
remove_session_caps(session);
3205
mutex_unlock(&session->s_mutex);
3206
ceph_put_mds_session(session);
3207
mutex_lock(&mdsc->mutex);
3208
}
3209
}
3210
WARN_ON(!list_empty(&mdsc->cap_delay_list));
3211
mutex_unlock(&mdsc->mutex);
3212
3213
ceph_cleanup_empty_realms(mdsc);
3214
3215
cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3216
3217
dout("stopped\n");
3218
}
3219
3220
static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
3221
{
3222
dout("stop\n");
3223
cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3224
if (mdsc->mdsmap)
3225
ceph_mdsmap_destroy(mdsc->mdsmap);
3226
kfree(mdsc->sessions);
3227
ceph_caps_finalize(mdsc);
3228
}
3229
3230
void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
3231
{
3232
struct ceph_mds_client *mdsc = fsc->mdsc;
3233
3234
dout("mdsc_destroy %p\n", mdsc);
3235
ceph_mdsc_stop(mdsc);
3236
3237
/* flush out any connection work with references to us */
3238
ceph_msgr_flush();
3239
3240
fsc->mdsc = NULL;
3241
kfree(mdsc);
3242
dout("mdsc_destroy %p done\n", mdsc);
3243
}
3244
3245
3246
/*
3247
* handle mds map update.
3248
*/
3249
void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
3250
{
3251
u32 epoch;
3252
u32 maplen;
3253
void *p = msg->front.iov_base;
3254
void *end = p + msg->front.iov_len;
3255
struct ceph_mdsmap *newmap, *oldmap;
3256
struct ceph_fsid fsid;
3257
int err = -EINVAL;
3258
3259
ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
3260
ceph_decode_copy(&p, &fsid, sizeof(fsid));
3261
if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
3262
return;
3263
epoch = ceph_decode_32(&p);
3264
maplen = ceph_decode_32(&p);
3265
dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
3266
3267
/* do we need it? */
3268
ceph_monc_got_mdsmap(&mdsc->fsc->client->monc, epoch);
3269
mutex_lock(&mdsc->mutex);
3270
if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
3271
dout("handle_map epoch %u <= our %u\n",
3272
epoch, mdsc->mdsmap->m_epoch);
3273
mutex_unlock(&mdsc->mutex);
3274
return;
3275
}
3276
3277
newmap = ceph_mdsmap_decode(&p, end);
3278
if (IS_ERR(newmap)) {
3279
err = PTR_ERR(newmap);
3280
goto bad_unlock;
3281
}
3282
3283
/* swap into place */
3284
if (mdsc->mdsmap) {
3285
oldmap = mdsc->mdsmap;
3286
mdsc->mdsmap = newmap;
3287
check_new_map(mdsc, newmap, oldmap);
3288
ceph_mdsmap_destroy(oldmap);
3289
} else {
3290
mdsc->mdsmap = newmap; /* first mds map */
3291
}
3292
mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
3293
3294
__wake_requests(mdsc, &mdsc->waiting_for_map);
3295
3296
mutex_unlock(&mdsc->mutex);
3297
schedule_delayed(mdsc);
3298
return;
3299
3300
bad_unlock:
3301
mutex_unlock(&mdsc->mutex);
3302
bad:
3303
pr_err("error decoding mdsmap %d\n", err);
3304
return;
3305
}
3306
3307
static struct ceph_connection *con_get(struct ceph_connection *con)
3308
{
3309
struct ceph_mds_session *s = con->private;
3310
3311
if (get_session(s)) {
3312
dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref));
3313
return con;
3314
}
3315
dout("mdsc con_get %p FAIL\n", s);
3316
return NULL;
3317
}
3318
3319
static void con_put(struct ceph_connection *con)
3320
{
3321
struct ceph_mds_session *s = con->private;
3322
3323
dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref) - 1);
3324
ceph_put_mds_session(s);
3325
}
3326
3327
/*
3328
* if the client is unresponsive for long enough, the mds will kill
3329
* the session entirely.
3330
*/
3331
static void peer_reset(struct ceph_connection *con)
3332
{
3333
struct ceph_mds_session *s = con->private;
3334
struct ceph_mds_client *mdsc = s->s_mdsc;
3335
3336
pr_warning("mds%d closed our session\n", s->s_mds);
3337
send_mds_reconnect(mdsc, s);
3338
}
3339
3340
static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
3341
{
3342
struct ceph_mds_session *s = con->private;
3343
struct ceph_mds_client *mdsc = s->s_mdsc;
3344
int type = le16_to_cpu(msg->hdr.type);
3345
3346
mutex_lock(&mdsc->mutex);
3347
if (__verify_registered_session(mdsc, s) < 0) {
3348
mutex_unlock(&mdsc->mutex);
3349
goto out;
3350
}
3351
mutex_unlock(&mdsc->mutex);
3352
3353
switch (type) {
3354
case CEPH_MSG_MDS_MAP:
3355
ceph_mdsc_handle_map(mdsc, msg);
3356
break;
3357
case CEPH_MSG_CLIENT_SESSION:
3358
handle_session(s, msg);
3359
break;
3360
case CEPH_MSG_CLIENT_REPLY:
3361
handle_reply(s, msg);
3362
break;
3363
case CEPH_MSG_CLIENT_REQUEST_FORWARD:
3364
handle_forward(mdsc, s, msg);
3365
break;
3366
case CEPH_MSG_CLIENT_CAPS:
3367
ceph_handle_caps(s, msg);
3368
break;
3369
case CEPH_MSG_CLIENT_SNAP:
3370
ceph_handle_snap(mdsc, s, msg);
3371
break;
3372
case CEPH_MSG_CLIENT_LEASE:
3373
handle_lease(mdsc, s, msg);
3374
break;
3375
3376
default:
3377
pr_err("received unknown message type %d %s\n", type,
3378
ceph_msg_type_name(type));
3379
}
3380
out:
3381
ceph_msg_put(msg);
3382
}
3383
3384
/*
3385
* authentication
3386
*/
3387
static int get_authorizer(struct ceph_connection *con,
3388
void **buf, int *len, int *proto,
3389
void **reply_buf, int *reply_len, int force_new)
3390
{
3391
struct ceph_mds_session *s = con->private;
3392
struct ceph_mds_client *mdsc = s->s_mdsc;
3393
struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3394
int ret = 0;
3395
3396
if (force_new && s->s_authorizer) {
3397
ac->ops->destroy_authorizer(ac, s->s_authorizer);
3398
s->s_authorizer = NULL;
3399
}
3400
if (s->s_authorizer == NULL) {
3401
if (ac->ops->create_authorizer) {
3402
ret = ac->ops->create_authorizer(
3403
ac, CEPH_ENTITY_TYPE_MDS,
3404
&s->s_authorizer,
3405
&s->s_authorizer_buf,
3406
&s->s_authorizer_buf_len,
3407
&s->s_authorizer_reply_buf,
3408
&s->s_authorizer_reply_buf_len);
3409
if (ret)
3410
return ret;
3411
}
3412
}
3413
3414
*proto = ac->protocol;
3415
*buf = s->s_authorizer_buf;
3416
*len = s->s_authorizer_buf_len;
3417
*reply_buf = s->s_authorizer_reply_buf;
3418
*reply_len = s->s_authorizer_reply_buf_len;
3419
return 0;
3420
}
3421
3422
3423
static int verify_authorizer_reply(struct ceph_connection *con, int len)
3424
{
3425
struct ceph_mds_session *s = con->private;
3426
struct ceph_mds_client *mdsc = s->s_mdsc;
3427
struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3428
3429
return ac->ops->verify_authorizer_reply(ac, s->s_authorizer, len);
3430
}
3431
3432
static int invalidate_authorizer(struct ceph_connection *con)
3433
{
3434
struct ceph_mds_session *s = con->private;
3435
struct ceph_mds_client *mdsc = s->s_mdsc;
3436
struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3437
3438
if (ac->ops->invalidate_authorizer)
3439
ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
3440
3441
return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
3442
}
3443
3444
static const struct ceph_connection_operations mds_con_ops = {
3445
.get = con_get,
3446
.put = con_put,
3447
.dispatch = dispatch,
3448
.get_authorizer = get_authorizer,
3449
.verify_authorizer_reply = verify_authorizer_reply,
3450
.invalidate_authorizer = invalidate_authorizer,
3451
.peer_reset = peer_reset,
3452
};
3453
3454
/* eof */
3455
3456